Skip to content

Consumer message loss due to duplicate reconnection #1461

@bluecucumber1989-commits

Description

Expected behavior

no message loss

Actual behavior

During the Consumer reconnection process, if two consecutive reconnectToBroker calls occur, it leads to message loss. Specifically: messages received from Broker during the first reconnection (e.g., M1, M2) are cleared during the second reconnection, and users ultimately only see subsequent messages (e.g., M3, M4).

Steps to reproduce

Prerequisites:

  1. Consumer uses Durable subscription mode (default mode)
  2. Messages have not been acked yet
  3. Broker owner changed

Trigger Conditions:

  1. Broker sends CommandCloseConsumer to close the Consumer
  2. Before the first reconnection completes, TCP connection breaks triggering a second reconnection

Maybe Race Condition in Duplicate Reconnection:
handleCloseConsumer calls ConnectionClosed before deleting the handler. If the TCP connection breaks before deletion, the Close method will call ConnectionClosed again

// First call: Broker actively closes
func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) {
    consumerID := closeConsumer.GetConsumerId()
    c.log.Infof("Broker notification of Closed consumer: %d", consumerID)

    if consumer, ok := c.consumerHandler(consumerID); ok {
        consumer.ConnectionClosed(closeConsumer)  // First call
        c.DeleteConsumeHandler(consumerID)        // Delete handler afterwards
    }
}

// Second call: TCP connection breaks
func (c *connection) Close() {
    c.closeOnce.Do(func() {
        listeners, consumerHandlers, cnx := c.closeAndEmptyObservers()
        
        // If DeleteConsumeHandler hasn't executed, snapshot still contains the consumer
        for _, handler := range consumerHandlers {
            handler.ConnectionClosed(nil)  // Second call
        }
    })
} 

Then, Key Problem Points Maybe:
The second reconection clears previous local queue: pc.clearReceiverQueue clears dispatcher.messages, causing M1, M2 to be cleared during the second reconnection ?

// pulsar/consumer_partition.go
func (pc *partitionConsumer) grabConn() error {
    // ...
    if seekMsgID := pc.seekMessageID.get(); seekMsgID != nil {
        pc.startMessageID.set(seekMsgID)
        pc.seekMessageID.set(nil)
    } else {
        pc.startMessageID.set(pc.clearReceiverQueue())
    }

    // In Durable mode, the StartMessageId is not sent to the broker
    if pc.options.subscriptionMode != Durable {
        cmdSubscribe.StartMessageId = convertToMessageIDData(pc.startMessageID.get())
    }
    // ...
} 

Duplicate reconnected log

time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=696 name=qmjlt subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=792 name=xicnk subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=861 name=keabn subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=547 name=wxznx subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=996 name=ebecn subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=696 name=qmjlt subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=792 name=xicnk subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=861 name=keabn subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=547 name=wxznx subscription=sub-2 topic="persistent://public/default/test-partition-68"
time="2026-01-22T20:49:33+08:00" level=info msg="Reconnected consumer to broker" consumerID=996 name=ebecn subscription=sub-2 topic="persistent://public/default/test-partition-68"

System configuration

Pulsar version: 4.1
pulsar-client-go: 0.18.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions