diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index 05afe7191..2b067e2a4 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -19,10 +19,13 @@ package admin import ( "context" + "io" + "net/http" "net/url" "strconv" "strings" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" ) @@ -747,17 +750,35 @@ type Namespaces interface { // UpdatePropertiesWithContext updates the properties of a namespace UpdatePropertiesWithContext(ctx context.Context, namespace utils.NameSpaceName, properties map[string]string) error + // SetProperty sets a namespace property for the given key + SetProperty(namespace utils.NameSpaceName, key, value string) error + + // SetPropertyWithContext sets a namespace property for the given key + SetPropertyWithContext(ctx context.Context, namespace utils.NameSpaceName, key, value string) error + // GetProperties returns the properties of a namespace GetProperties(namespace utils.NameSpaceName) (map[string]string, error) // GetPropertiesWithContext returns the properties of a namespace GetPropertiesWithContext(ctx context.Context, namespace utils.NameSpaceName) (map[string]string, error) + // GetProperty returns the namespace property value for the given key, or nil if the key does not exist + GetProperty(namespace utils.NameSpaceName, key string) (*string, error) + + // GetPropertyWithContext returns the namespace property value for the given key, or nil if the key does not exist + GetPropertyWithContext(ctx context.Context, namespace utils.NameSpaceName, key string) (*string, error) + // RemoveProperties clears the properties of a namespace RemoveProperties(namespace utils.NameSpaceName) error // RemovePropertiesWithContext clears the properties of a namespace RemovePropertiesWithContext(ctx context.Context, namespace utils.NameSpaceName) error + + // RemoveProperty removes the namespace property for the given key and returns the removed value, if any + RemoveProperty(namespace utils.NameSpaceName, key string) (*string, error) + + // RemovePropertyWithContext removes the namespace property for the given key and returns the removed value, if any + RemovePropertyWithContext(ctx context.Context, namespace utils.NameSpaceName, key string) (*string, error) } type namespaces struct { @@ -2107,6 +2128,26 @@ func (n *namespaces) UpdatePropertiesWithContext( return n.pulsar.Client.PutWithContext(ctx, endpoint, properties) } +func (n *namespaces) SetProperty(namespace utils.NameSpaceName, key, value string) error { + return n.SetPropertyWithContext(context.Background(), namespace, key, value) +} + +func (n *namespaces) SetPropertyWithContext( + ctx context.Context, + namespace utils.NameSpaceName, + key, value string, +) error { + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "property", key, value) + return n.pulsar.Client.PutWithCustomMediaTypeWithContext( + ctx, + endpoint, + nil, + nil, + nil, + rest.ApplicationJSON, + ) +} + func (n *namespaces) GetProperties(namespace utils.NameSpaceName) (map[string]string, error) { return n.GetPropertiesWithContext(context.Background(), namespace) } @@ -2121,6 +2162,18 @@ func (n *namespaces) GetPropertiesWithContext( return properties, err } +func (n *namespaces) GetProperty(namespace utils.NameSpaceName, key string) (*string, error) { + return n.GetPropertyWithContext(context.Background(), namespace, key) +} + +func (n *namespaces) GetPropertyWithContext( + ctx context.Context, + namespace utils.NameSpaceName, + key string, +) (*string, error) { + return n.requestPropertyValueWithContext(ctx, http.MethodGet, namespace, key) +} + func (n *namespaces) RemoveProperties(namespace utils.NameSpaceName) error { return n.RemovePropertiesWithContext(context.Background(), namespace) } @@ -2130,6 +2183,51 @@ func (n *namespaces) RemovePropertiesWithContext(ctx context.Context, namespace return n.pulsar.Client.DeleteWithContext(ctx, endpoint) } +func (n *namespaces) RemoveProperty(namespace utils.NameSpaceName, key string) (*string, error) { + return n.RemovePropertyWithContext(context.Background(), namespace, key) +} + +func (n *namespaces) RemovePropertyWithContext( + ctx context.Context, + namespace utils.NameSpaceName, + key string, +) (*string, error) { + return n.requestPropertyValueWithContext(ctx, http.MethodDelete, namespace, key) +} + +func (n *namespaces) requestPropertyValueWithContext( + ctx context.Context, + method string, + namespace utils.NameSpaceName, + key string, +) (*string, error) { + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "property", key) + resp, err := n.pulsar.Client.MakeRequestWithContext(ctx, method, endpoint) + if err != nil { + if adminErr, ok := err.(rest.Error); ok && adminErr.Code == http.StatusNotFound { + return nil, nil + } + return nil, err + } + defer safeRespClose(resp) + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return decodeNamespacePropertyValue(body), nil +} + +func decodeNamespacePropertyValue(body []byte) *string { + value, err := decodeOptionalJSON[string](body) + if err == nil { + return value + } + + raw := strings.TrimSpace(string(body)) + return &raw +} + // nolint: revive // It's ok here to use a built-in function name (max) func (n *namespaces) SetMaxTopicsPerNamespace(namespace utils.NameSpaceName, max int) error { return n.SetMaxTopicsPerNamespaceWithContext(context.Background(), namespace, max) diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go index f14027dd4..a44316b75 100644 --- a/pulsaradmin/pkg/admin/namespace_test.go +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -18,6 +18,9 @@ package admin import ( + "io" + "net/http" + "net/url" "os" "testing" "time" @@ -33,6 +36,24 @@ func ptr(n int) *int { return &n } +func strPtr(s string) *string { + return &s +} + +func mustNamespaceName(t *testing.T, namespace string) *utils.NameSpaceName { + t.Helper() + + ns, err := utils.GetNamespaceName(namespace) + require.NoError(t, err) + return ns +} + +type namespacePropertyRequest struct { + method string + path string + body string +} + func TestSetTopicAutoCreation(t *testing.T) { config := &config.Config{} admin, err := New(config) @@ -504,8 +525,7 @@ func TestNamespaces_Properties(t *testing.T) { require.NoError(t, err) require.NotNil(t, admin) - namespace, err := utils.GetNamespaceName("public/default") - assert.Equal(t, err, nil) + namespace := mustNamespaceName(t, "public/default") // Namespace properties are expected to be set and retrieved successfully properties := map[string]string{ @@ -518,12 +538,236 @@ func TestNamespaces_Properties(t *testing.T) { assert.Equal(t, err, nil) assert.Equal(t, actualProperties, properties) + propertyValue, err := admin.Namespaces().GetProperty(*namespace, "key-1") + assert.NoError(t, err) + require.NotNil(t, propertyValue) + assert.Equal(t, "value-1", *propertyValue) + + err = admin.Namespaces().SetProperty(*namespace, "key-2", "value-2") + assert.NoError(t, err) + + propertyValue, err = admin.Namespaces().GetProperty(*namespace, "key-2") + assert.NoError(t, err) + require.NotNil(t, propertyValue) + assert.Equal(t, "value-2", *propertyValue) + + err = admin.Namespaces().SetProperty(*namespace, "key-2", "value-2-updated") + assert.NoError(t, err) + + propertyValue, err = admin.Namespaces().GetProperty(*namespace, "key-2") + assert.NoError(t, err) + require.NotNil(t, propertyValue) + assert.Equal(t, "value-2-updated", *propertyValue) + + // Single-key property endpoints follow the upstream path-based API, which does + // not guarantee round-tripping empty string values. Validate empty values via + // the full properties endpoint instead. + err = admin.Namespaces().UpdateProperties(*namespace, map[string]string{ + "key-empty": "", + }) + assert.NoError(t, err) + + actualProperties, err = admin.Namespaces().GetProperties(*namespace) + assert.NoError(t, err) + assert.Equal(t, map[string]string{ + "key-1": "value-1", + "key-2": "value-2-updated", + "key-empty": "", + }, actualProperties) + + removedValue, err := admin.Namespaces().RemoveProperty(*namespace, "key-2") + assert.NoError(t, err) + require.NotNil(t, removedValue) + assert.Equal(t, "value-2-updated", *removedValue) + + propertyValue, err = admin.Namespaces().GetProperty(*namespace, "key-2") + assert.NoError(t, err) + assert.Nil(t, propertyValue) + + actualProperties, err = admin.Namespaces().GetProperties(*namespace) + assert.NoError(t, err) + assert.Equal(t, map[string]string{ + "key-1": "value-1", + "key-empty": "", + }, actualProperties) + // All namespace properties are expected to be deleted successfully err = admin.Namespaces().RemoveProperties(*namespace) assert.Equal(t, err, nil) actualPropertiesAfterRemoveCall, err := admin.Namespaces().GetProperties(*namespace) assert.Equal(t, err, nil) assert.Equal(t, actualPropertiesAfterRemoveCall, map[string]string{}) + + propertyValue, err = admin.Namespaces().GetProperty(*namespace, "key-1") + assert.NoError(t, err) + assert.Nil(t, propertyValue) +} + +func TestNamespaces_SinglePropertyEndpointsAndDecoding(t *testing.T) { + requests := make([]namespacePropertyRequest, 0, 5) + client, pulsarClient := newTopicPolicyTestClient(t, func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + + requests = append(requests, namespacePropertyRequest{ + method: r.Method, + path: r.URL.EscapedPath(), + body: string(body), + }) + + switch len(requests) { + case 1: + w.WriteHeader(http.StatusNoContent) + case 2: + _, err = w.Write([]byte(`"json-value"`)) + require.NoError(t, err) + case 3: + _, err = w.Write([]byte(`"json-value"`)) + require.NoError(t, err) + case 4, 5: + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte(`{"reason":"Property not found"}`)) + require.NoError(t, err) + default: + t.Fatalf("unexpected request %d", len(requests)) + } + }) + + namespace := mustNamespaceName(t, "public/default") + + err := client.Namespaces().SetProperty(*namespace, "json-key", "json-value") + require.NoError(t, err) + + value, err := client.Namespaces().GetProperty(*namespace, "json-key") + require.NoError(t, err) + require.NotNil(t, value) + assert.Equal(t, "json-value", *value) + + removed, err := client.Namespaces().RemoveProperty(*namespace, "json-key") + require.NoError(t, err) + require.NotNil(t, removed) + assert.Equal(t, "json-value", *removed) + + missing, err := client.Namespaces().GetProperty(*namespace, "missing-key") + require.NoError(t, err) + assert.Nil(t, missing) + + removedMissing, err := client.Namespaces().RemoveProperty(*namespace, "missing-key") + require.NoError(t, err) + assert.Nil(t, removedMissing) + + expectedSetPath := pulsarClient.endpoint("/namespaces", namespace.String(), "property", "json-key", "json-value") + expectedJSONPath := pulsarClient.endpoint("/namespaces", namespace.String(), "property", "json-key") + expectedMissingPath := pulsarClient.endpoint("/namespaces", namespace.String(), "property", "missing-key") + + decodedExpectedSetPath, err := url.PathUnescape(expectedSetPath) + require.NoError(t, err) + decodedExpectedJSONPath, err := url.PathUnescape(expectedJSONPath) + require.NoError(t, err) + decodedExpectedMissingPath, err := url.PathUnescape(expectedMissingPath) + require.NoError(t, err) + + require.Len(t, requests, 5) + + assert.Equal(t, http.MethodPut, requests[0].method) + assert.Equal(t, decodedExpectedSetPath, requests[0].path) + assert.Empty(t, requests[0].body) + + assert.Equal(t, http.MethodGet, requests[1].method) + assert.Equal(t, decodedExpectedJSONPath, requests[1].path) + + assert.Equal(t, http.MethodDelete, requests[2].method) + assert.Equal(t, decodedExpectedJSONPath, requests[2].path) + + assert.Equal(t, http.MethodGet, requests[3].method) + assert.Equal(t, decodedExpectedMissingPath, requests[3].path) + + assert.Equal(t, http.MethodDelete, requests[4].method) + assert.Equal(t, decodedExpectedMissingPath, requests[4].path) +} + +func TestNamespaces_SinglePropertyPlainTextFallback(t *testing.T) { + callCount := 0 + client, _ := newTopicPolicyTestClient(t, func(w http.ResponseWriter, r *http.Request) { + callCount++ + + switch callCount { + case 1: + assert.Equal(t, http.MethodGet, r.Method) + _, err := w.Write([]byte("plain-value")) + require.NoError(t, err) + case 2: + assert.Equal(t, http.MethodDelete, r.Method) + _, err := w.Write([]byte("removed-plain-value")) + require.NoError(t, err) + default: + t.Fatalf("unexpected request %d", callCount) + } + }) + + namespace := mustNamespaceName(t, "public/default") + + value, err := client.Namespaces().GetProperty(*namespace, "plain-key") + require.NoError(t, err) + require.NotNil(t, value) + assert.Equal(t, "plain-value", *value) + + removed, err := client.Namespaces().RemoveProperty(*namespace, "plain-key") + require.NoError(t, err) + require.NotNil(t, removed) + assert.Equal(t, "removed-plain-value", *removed) +} + +func TestDecodeNamespacePropertyValue(t *testing.T) { + tests := []struct { + name string + body []byte + want *string + }{ + { + name: "empty body is unset", + body: []byte(""), + want: nil, + }, + { + name: "whitespace body is unset", + body: []byte(" \n\t "), + want: nil, + }, + { + name: "null body is unset", + body: []byte("null"), + want: nil, + }, + { + name: "json string", + body: []byte(`"json-value"`), + want: strPtr("json-value"), + }, + { + name: "empty json string", + body: []byte(`""`), + want: strPtr(""), + }, + { + name: "plain text fallback", + body: []byte("plain-value"), + want: strPtr("plain-value"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := decodeNamespacePropertyValue(tt.body) + if tt.want == nil { + assert.Nil(t, got) + return + } + + require.NotNil(t, got) + assert.Equal(t, *tt.want, *got) + }) + } } func TestNamespaces_SetMaxTopicsPerNamespace(t *testing.T) { diff --git a/pulsaradmin/pkg/admin/response_decode.go b/pulsaradmin/pkg/admin/response_decode.go new file mode 100644 index 000000000..4626f1768 --- /dev/null +++ b/pulsaradmin/pkg/admin/response_decode.go @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "encoding/json" + "strings" +) + +func decodeOptionalJSON[T any](body []byte) (*T, error) { + if isUnsetPolicyBody(body) { + return nil, nil + } + + var out T + if err := json.Unmarshal(body, &out); err != nil { + return nil, err + } + return &out, nil +} + +func isUnsetPolicyBody(body []byte) bool { + trimmed := strings.TrimSpace(string(body)) + return trimmed == "" || trimmed == "null" +} diff --git a/pulsaradmin/pkg/admin/topic_policies.go b/pulsaradmin/pkg/admin/topic_policies.go index a754e4287..b94277bd4 100644 --- a/pulsaradmin/pkg/admin/topic_policies.go +++ b/pulsaradmin/pkg/admin/topic_policies.go @@ -19,7 +19,6 @@ package admin import ( "context" - "encoding/json" "fmt" "strconv" "strings" @@ -227,18 +226,6 @@ func (t *topicPolicies) scopedDeleteWithContext( return t.pulsar.Client.DeleteWithQueryParamsWithContext(ctx, endpoint, t.scopedQueryParams(params)) } -func decodeOptionalJSON[T any](body []byte) (*T, error) { - if isUnsetPolicyBody(body) { - return nil, nil - } - - var out T - if err := json.Unmarshal(body, &out); err != nil { - return nil, err - } - return &out, nil -} - func decodeOptionalSchemaCompatibilityStrategy(body []byte) (*utils.SchemaCompatibilityStrategy, error) { if isUnsetPolicyBody(body) { return nil, nil @@ -257,11 +244,6 @@ func decodeOptionalSchemaCompatibilityStrategy(body []byte) (*utils.SchemaCompat return &strategy, nil } -func isUnsetPolicyBody(body []byte) bool { - trimmed := strings.TrimSpace(string(body)) - return trimmed == "" || trimmed == "null" -} - func (t *topicPolicies) GetMessageTTL( ctx context.Context, topic utils.TopicName,