Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions pulsaradmin/pkg/admin/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package admin

import (
"context"
"io"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)

Expand Down Expand Up @@ -747,17 +750,35 @@ type Namespaces interface {
// UpdatePropertiesWithContext updates the properties of a namespace
UpdatePropertiesWithContext(ctx context.Context, namespace utils.NameSpaceName, properties map[string]string) error

// SetProperty sets a namespace property for the given key
SetProperty(namespace utils.NameSpaceName, key, value string) error

// SetPropertyWithContext sets a namespace property for the given key
SetPropertyWithContext(ctx context.Context, namespace utils.NameSpaceName, key, value string) error

Comment thread
freeznet marked this conversation as resolved.
// GetProperties returns the properties of a namespace
GetProperties(namespace utils.NameSpaceName) (map[string]string, error)

// GetPropertiesWithContext returns the properties of a namespace
GetPropertiesWithContext(ctx context.Context, namespace utils.NameSpaceName) (map[string]string, error)

// GetProperty returns the namespace property value for the given key, or nil if the key does not exist
GetProperty(namespace utils.NameSpaceName, key string) (*string, error)

// GetPropertyWithContext returns the namespace property value for the given key, or nil if the key does not exist
GetPropertyWithContext(ctx context.Context, namespace utils.NameSpaceName, key string) (*string, error)

// RemoveProperties clears the properties of a namespace
RemoveProperties(namespace utils.NameSpaceName) error

// RemovePropertiesWithContext clears the properties of a namespace
RemovePropertiesWithContext(ctx context.Context, namespace utils.NameSpaceName) error

// RemoveProperty removes the namespace property for the given key and returns the removed value, if any
RemoveProperty(namespace utils.NameSpaceName, key string) (*string, error)

// RemovePropertyWithContext removes the namespace property for the given key and returns the removed value, if any
RemovePropertyWithContext(ctx context.Context, namespace utils.NameSpaceName, key string) (*string, error)
}

type namespaces struct {
Expand Down Expand Up @@ -2107,6 +2128,26 @@ func (n *namespaces) UpdatePropertiesWithContext(
return n.pulsar.Client.PutWithContext(ctx, endpoint, properties)
}

func (n *namespaces) SetProperty(namespace utils.NameSpaceName, key, value string) error {
return n.SetPropertyWithContext(context.Background(), namespace, key, value)
}

func (n *namespaces) SetPropertyWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
key, value string,
) error {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "property", key, value)
return n.pulsar.Client.PutWithCustomMediaTypeWithContext(
ctx,
endpoint,
nil,
nil,
nil,
rest.ApplicationJSON,
)
}

func (n *namespaces) GetProperties(namespace utils.NameSpaceName) (map[string]string, error) {
return n.GetPropertiesWithContext(context.Background(), namespace)
}
Expand All @@ -2121,6 +2162,18 @@ func (n *namespaces) GetPropertiesWithContext(
return properties, err
}

func (n *namespaces) GetProperty(namespace utils.NameSpaceName, key string) (*string, error) {
return n.GetPropertyWithContext(context.Background(), namespace, key)
}

func (n *namespaces) GetPropertyWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
key string,
) (*string, error) {
return n.requestPropertyValueWithContext(ctx, http.MethodGet, namespace, key)
}

func (n *namespaces) RemoveProperties(namespace utils.NameSpaceName) error {
return n.RemovePropertiesWithContext(context.Background(), namespace)
}
Expand All @@ -2130,6 +2183,51 @@ func (n *namespaces) RemovePropertiesWithContext(ctx context.Context, namespace
return n.pulsar.Client.DeleteWithContext(ctx, endpoint)
}

func (n *namespaces) RemoveProperty(namespace utils.NameSpaceName, key string) (*string, error) {
return n.RemovePropertyWithContext(context.Background(), namespace, key)
}

func (n *namespaces) RemovePropertyWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
key string,
) (*string, error) {
return n.requestPropertyValueWithContext(ctx, http.MethodDelete, namespace, key)
}

func (n *namespaces) requestPropertyValueWithContext(
ctx context.Context,
method string,
namespace utils.NameSpaceName,
key string,
) (*string, error) {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "property", key)
resp, err := n.pulsar.Client.MakeRequestWithContext(ctx, method, endpoint)
if err != nil {
if adminErr, ok := err.(rest.Error); ok && adminErr.Code == http.StatusNotFound {
return nil, nil
}
return nil, err
}
defer safeRespClose(resp)

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return decodeNamespacePropertyValue(body), nil
}

func decodeNamespacePropertyValue(body []byte) *string {
value, err := decodeOptionalJSON[string](body)
if err == nil {
return value
}
Comment thread
freeznet marked this conversation as resolved.

raw := strings.TrimSpace(string(body))
return &raw
}

// nolint: revive // It's ok here to use a built-in function name (max)
func (n *namespaces) SetMaxTopicsPerNamespace(namespace utils.NameSpaceName, max int) error {
return n.SetMaxTopicsPerNamespaceWithContext(context.Background(), namespace, max)
Expand Down
Loading
Loading