Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,32 +150,37 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix

if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
r != nil &&
r.Partitions > 0 {
retryTopic = oldRetryTopic
}

if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
r != nil &&
r.Partitions > 0 {
dlqTopic = oldDlqTopic
// Check for old topic naming format.
// When DLQ policy is not provided, check both old topics for backward compatibility.
checkTopicIsExists := func(topic string) bool {
r, err := client.lookupService.GetPartitionedTopicMetadata(topic)
return err == nil && r != nil && r.Partitions > 0
}
resolveTopic := func(current, old, defaultTopic string) string {
if current != "" {
return current
}
if checkTopicIsExists(old) {
return old
}
return defaultTopic
}

if options.DLQ == nil {
options.DLQ = &DLQPolicy{
MaxDeliveries: MaxReconsumeTimes,
DeadLetterTopic: dlqTopic,
RetryLetterTopic: retryTopic,
}
} else {
if options.DLQ.DeadLetterTopic == "" {
options.DLQ.DeadLetterTopic = dlqTopic
}
if options.DLQ.RetryLetterTopic == "" {
options.DLQ.RetryLetterTopic = retryTopic
MaxDeliveries: MaxReconsumeTimes,
}
}
options.DLQ.DeadLetterTopic = resolveTopic(
options.DLQ.DeadLetterTopic,
oldDlqTopic,
dlqTopic,
)
options.DLQ.RetryLetterTopic = resolveTopic(
options.DLQ.RetryLetterTopic,
oldRetryTopic,
retryTopic,
)

if options.Topic != "" && len(options.Topics) == 0 {
options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
options.Topic = ""
Expand Down
67 changes: 67 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5710,3 +5710,70 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when reconnecting")
}
}

// lookupServiceWrapper embeds the original LookupService and only overrides
// GetPartitionedTopicMetadata to record the topics that were queried.
type lookupServiceWrapper struct {
internal.LookupService
mu sync.Mutex
calledTopics []string
}

func (w *lookupServiceWrapper) GetPartitionedTopicMetadata(topic string) (*internal.PartitionedTopicMetadata, error) {
w.mu.Lock()
w.calledTopics = append(w.calledTopics, topic)
w.mu.Unlock()
return &internal.PartitionedTopicMetadata{Partitions: 0}, nil
}

// TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata verifies that when custom DLQ and Retry
// topics are provided in DLQPolicy, the resolveTopic function should NOT call GetPartitionedTopicMetadata
// to check old-format DLQ/Retry topics. This ensures the optimization path works correctly.
func TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata(t *testing.T) {
// Create a real client with a short operation timeout so that the subsequent
// consumer creation (which requires a broker connection) fails quickly.
c, err := NewClient(ClientOptions{
URL: serviceURL,
MaxConnectionsPerBroker: 10,
OperationTimeout: 1 * time.Second,
})
assert.NoError(t, err)
defer c.Close()

// Replace the client's internal lookupService with our wrapper to intercept
// and record all GetPartitionedTopicMetadata calls.
realClient := c.(*client)
wrapper := &lookupServiceWrapper{LookupService: realClient.lookupService}
realClient.lookupService = wrapper

// Subscribe with custom DLQ and Retry topics specified.
// The Subscribe call will fail due to no broker connection, but the
// resolveTopic logic executes before the connection attempt.
_, _ = c.Subscribe(ConsumerOptions{
Topic: "persistent://public/default/test-topic",
SubscriptionName: "test-subscription",
RetryEnable: true,
DLQ: &DLQPolicy{
MaxDeliveries: 3,
DeadLetterTopic: "persistent://public/default/my-dlq-topic",
RetryLetterTopic: "persistent://public/default/my-retry-topic",
},
})

// These are the old-format topics that resolveTopic would check via
// GetPartitionedTopicMetadata if no custom topics were provided.
oldDlqTopic := "persistent://public/default/test-subscription" + DlqTopicSuffix
oldRetryTopic := "persistent://public/default/test-subscription" + RetryTopicSuffix

// Verify that GetPartitionedTopicMetadata was never called with old-format topics.
// When custom DLQ/Retry topics are provided, resolveTopic should return them directly
// without checking whether old-format topics exist.
wrapper.mu.Lock()
defer wrapper.mu.Unlock()
for _, topic := range wrapper.calledTopics {
assert.NotEqual(t, oldDlqTopic, topic,
"GetPartitionedTopicMetadata should not be called with old DLQ topic when custom DLQ topic is provided")
assert.NotEqual(t, oldRetryTopic, topic,
"GetPartitionedTopicMetadata should not be called with old Retry topic when custom Retry topic is provided")
}
}
Loading