diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 08c825e3e9..45c0a0e72e 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -150,32 +150,37 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix - if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil && - r != nil && - r.Partitions > 0 { - retryTopic = oldRetryTopic - } - - if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil && - r != nil && - r.Partitions > 0 { - dlqTopic = oldDlqTopic + // Check for old topic naming format. + // When DLQ policy is not provided, check both old topics for backward compatibility. + checkTopicIsExists := func(topic string) bool { + r, err := client.lookupService.GetPartitionedTopicMetadata(topic) + return err == nil && r != nil && r.Partitions > 0 + } + resolveTopic := func(current, old, defaultTopic string) string { + if current != "" { + return current + } + if checkTopicIsExists(old) { + return old + } + return defaultTopic } - if options.DLQ == nil { options.DLQ = &DLQPolicy{ - MaxDeliveries: MaxReconsumeTimes, - DeadLetterTopic: dlqTopic, - RetryLetterTopic: retryTopic, - } - } else { - if options.DLQ.DeadLetterTopic == "" { - options.DLQ.DeadLetterTopic = dlqTopic - } - if options.DLQ.RetryLetterTopic == "" { - options.DLQ.RetryLetterTopic = retryTopic + MaxDeliveries: MaxReconsumeTimes, } } + options.DLQ.DeadLetterTopic = resolveTopic( + options.DLQ.DeadLetterTopic, + oldDlqTopic, + dlqTopic, + ) + options.DLQ.RetryLetterTopic = resolveTopic( + options.DLQ.RetryLetterTopic, + oldRetryTopic, + retryTopic, + ) + if options.Topic != "" && len(options.Topics) == 0 { options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic} options.Topic = "" diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 9f20ed8417..144ea9c6fa 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5710,3 +5710,70 @@ func TestSelectConnectionForSameConsumer(t *testing.T) { "The consumer uses a different connection when reconnecting") } } + +// lookupServiceWrapper embeds the original LookupService and only overrides +// GetPartitionedTopicMetadata to record the topics that were queried. +type lookupServiceWrapper struct { + internal.LookupService + mu sync.Mutex + calledTopics []string +} + +func (w *lookupServiceWrapper) GetPartitionedTopicMetadata(topic string) (*internal.PartitionedTopicMetadata, error) { + w.mu.Lock() + w.calledTopics = append(w.calledTopics, topic) + w.mu.Unlock() + return &internal.PartitionedTopicMetadata{Partitions: 0}, nil +} + +// TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata verifies that when custom DLQ and Retry +// topics are provided in DLQPolicy, the resolveTopic function should NOT call GetPartitionedTopicMetadata +// to check old-format DLQ/Retry topics. This ensures the optimization path works correctly. +func TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata(t *testing.T) { + // Create a real client with a short operation timeout so that the subsequent + // consumer creation (which requires a broker connection) fails quickly. + c, err := NewClient(ClientOptions{ + URL: serviceURL, + MaxConnectionsPerBroker: 10, + OperationTimeout: 1 * time.Second, + }) + assert.NoError(t, err) + defer c.Close() + + // Replace the client's internal lookupService with our wrapper to intercept + // and record all GetPartitionedTopicMetadata calls. + realClient := c.(*client) + wrapper := &lookupServiceWrapper{LookupService: realClient.lookupService} + realClient.lookupService = wrapper + + // Subscribe with custom DLQ and Retry topics specified. + // The Subscribe call will fail due to no broker connection, but the + // resolveTopic logic executes before the connection attempt. + _, _ = c.Subscribe(ConsumerOptions{ + Topic: "persistent://public/default/test-topic", + SubscriptionName: "test-subscription", + RetryEnable: true, + DLQ: &DLQPolicy{ + MaxDeliveries: 3, + DeadLetterTopic: "persistent://public/default/my-dlq-topic", + RetryLetterTopic: "persistent://public/default/my-retry-topic", + }, + }) + + // These are the old-format topics that resolveTopic would check via + // GetPartitionedTopicMetadata if no custom topics were provided. + oldDlqTopic := "persistent://public/default/test-subscription" + DlqTopicSuffix + oldRetryTopic := "persistent://public/default/test-subscription" + RetryTopicSuffix + + // Verify that GetPartitionedTopicMetadata was never called with old-format topics. + // When custom DLQ/Retry topics are provided, resolveTopic should return them directly + // without checking whether old-format topics exist. + wrapper.mu.Lock() + defer wrapper.mu.Unlock() + for _, topic := range wrapper.calledTopics { + assert.NotEqual(t, oldDlqTopic, topic, + "GetPartitionedTopicMetadata should not be called with old DLQ topic when custom DLQ topic is provided") + assert.NotEqual(t, oldRetryTopic, topic, + "GetPartitionedTopicMetadata should not be called with old Retry topic when custom Retry topic is provided") + } +}