Expected behavior
no message loss
Actual behavior
During the Consumer reconnection process, if two consecutive reconnectToBroker calls occur, it leads to message loss. Specifically: messages received from Broker during the first reconnection (e.g., M1, M2) are cleared during the second reconnection, and users ultimately only see subsequent messages (e.g., M3, M4).
Steps to reproduce
Prerequisites:
- Consumer uses Durable subscription mode (default mode)
- Messages have not been acked yet
- Broker owner changed
Trigger Conditions:
- Broker sends CommandCloseConsumer to close the Consumer
- Before the first reconnection completes, TCP connection breaks triggering a second reconnection
Maybe Race Condition in Duplicate Reconnection:
handleCloseConsumer calls ConnectionClosed before deleting the handler. If the TCP connection breaks before deletion, the Close method will call ConnectionClosed again
// First call: Broker actively closes
func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed(closeConsumer) // First call
c.DeleteConsumeHandler(consumerID) // Delete handler afterwards
}
}
// Second call: TCP connection breaks
func (c *connection) Close() {
c.closeOnce.Do(func() {
listeners, consumerHandlers, cnx := c.closeAndEmptyObservers()
// If DeleteConsumeHandler hasn't executed, snapshot still contains the consumer
for _, handler := range consumerHandlers {
handler.ConnectionClosed(nil) // Second call
}
})
}
Then, Key Problem Points Maybe:
The second reconection clears previous local queue: pc.clearReceiverQueue clears dispatcher.messages, causing M1, M2 to be cleared during the second reconnection ?
// pulsar/consumer_partition.go
func (pc *partitionConsumer) grabConn() error {
// ...
if seekMsgID := pc.seekMessageID.get(); seekMsgID != nil {
pc.startMessageID.set(seekMsgID)
pc.seekMessageID.set(nil)
} else {
pc.startMessageID.set(pc.clearReceiverQueue())
}
// In Durable mode, the StartMessageId is not sent to the broker
if pc.options.subscriptionMode != Durable {
cmdSubscribe.StartMessageId = convertToMessageIDData(pc.startMessageID.get())
}
// ...
}
Duplicate reconnected log
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=696 name=qmjlt subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=792 name=xicnk subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=861 name=keabn subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=547 name=wxznx subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=996 name=ebecn subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=696 name=qmjlt subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=792 name=xicnk subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=861 name=keabn subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=547 name=wxznx subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=996 name=ebecn subscription=sub-2 topic="persistent://public/default/test-partition-68"
System configuration
Pulsar version: 4.1
pulsar-client-go: 0.18.0
Expected behavior
no message loss
Actual behavior
During the Consumer reconnection process, if two consecutive reconnectToBroker calls occur, it leads to message loss. Specifically: messages received from Broker during the first reconnection (e.g., M1, M2) are cleared during the second reconnection, and users ultimately only see subsequent messages (e.g., M3, M4).
Steps to reproduce
Prerequisites:
Trigger Conditions:
Maybe Race Condition in Duplicate Reconnection:
handleCloseConsumercallsConnectionClosedbefore deleting the handler. If the TCP connection breaks before deletion, theClosemethod will callConnectionClosedagainThen, Key Problem Points Maybe:
The second reconection clears previous local queue:
pc.clearReceiverQueueclears dispatcher.messages, causing M1, M2 to be cleared during the second reconnection ?Duplicate reconnected log
System configuration
Pulsar version: 4.1
pulsar-client-go: 0.18.0