Expected behavior
When an error occurs and the partition consumer is closed, subsequent retries or partition expansion logic should recreate the closed partition consumer to ensure no partitions are permanently skipped.
Actual behavior
Once a partition consumer is closed due to an error, it is never recreated. This results in missed consumption on that partition.
Steps to reproduce
- Create a consumer on a partitioned topic.
- Trigger an error at consumer_impl.go#L418.
- Observe that the partition consumer is closed.
- Wait for retry or trigger partition expansion.
- Verify that the closed consumer is not recreated, and messages on that partition are no longer consumed.
System configuration
Pulsar version: 0.14.0, 0.15.1, 0.16.0
Relevant Code Snippets with Bug Notes
func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
ticker := time.NewTicker(period)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopDiscoveryCh:
return
case <-ticker.C:
c.log.Debug("Auto discovering new partitions")
c.internalTopicSubscribeToPartitions() // Uncaught error
}
}
}()
return func() {
ticker.Stop()
close(stopDiscoveryCh)
wg.Wait()
}
}
if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range c.consumers {
if c != nil {
c.Close()
}
}
return err
}
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
}
}
Expected behavior
When an error occurs and the partition consumer is closed, subsequent retries or partition expansion logic should recreate the closed partition consumer to ensure no partitions are permanently skipped.
Actual behavior
Once a partition consumer is closed due to an error, it is never recreated. This results in missed consumption on that partition.
Steps to reproduce
System configuration
Pulsar version: 0.14.0, 0.15.1, 0.16.0
Relevant Code Snippets with Bug Notes
internalTopicSubscribeToPartitionsand calls it on ticker