Skip to content

MultiTopicsReaderImpl – Start from separate MessageId for each topic/ partition #9301

@aryemazouz

Description

@aryemazouz

Currently in ReaderConfigurationData the API allow to ‘setStartMessageId’ only from single message ID and this apply to all consumers in the MultiTopicsReaderImpl.

Is it possible to add start message per partition / topic.

Today if I want to avoid loosing data I have to give MessageId.earliest and after that to execute the seek (as my code does).
But the annoying issue is that after I create the reader it start accept messages from Pulsar and till the seek is actually executed I consume events, I can avoid handling them because I keep tracking the cursor per topic / partition but I still need to consume them (to ensure exactly once).
It will be nicer if I could give the above map between topic and the start message ID to avoid it (and if I don’t have the last MessageId in my map it will be MessageId.earliest).

The best way is to give Function<String, MessageId> to reader configuration between Consumer.getTopic and the start message ID.

  • I am using key hash feature (pulsar broker / client version 2.7.0).
  • It will also be helpful add add support for multiple topics (by pattern / list) to the MultiTopicsReaderImpl

`/**
* Create reader
*/
private Reader<byte[]> createReader() throws PulsarClientException {
ReaderConfigurationData<byte[]> cloned = ...;
cloned.setStartMessageId(MessageId.earliest);
cloned.setReaderName(...);
cloned.setKeyHashRanges(Arrays.asList(new Range(myRange.getStart(), myRange.getEnd()))); //Using key hash feature
Reader<byte[]> reader = client.getShared().createReaderAsync(cloned).join();
seek(reader);
return reader;
}

/**
 * Execute seek by partitions
 */
private void seek(Reader reader) throws PulsarClientException {
    if(reader instanceof MultiTopicsReaderImpl){
        MultiTopicsReaderImpl multiTopicsReader = (MultiTopicsReaderImpl)reader;
        List<ConsumerImpl> consumers = multiTopicsReader.getMultiTopicsConsumer().getConsumers();
        for(Consumer consumer : consumers){
            seek(consumer.getTopic(),  messageId -> consumer.seek(messageId));
        }
    }
    else if(reader instanceof ReaderImpl){
        seek(reader.getTopic(), messageId -> reader.seek(messageId));
    }
    else{
        throw new IllegalArgumentException("Unknown reader type: " + reader.getClass());
    }
}

/**
 * Find the relevant message ID to seek on the given consumer / reader
 */
private void seek(String topic, ExceptionalConsumer<MessageId> seek){
    PartitionedTopicCursor<MessageId, Message<T>> partitionCursor = cursors.get(topic);	//Out internal cursor tracking
    MessageId seekTo;
    if(partitionCursor == null){
        cursors.put(topic, new PartitionedTopicCursor(.../*Init new one to keep tracking on this partition*/));
		
		return; //no need to seek
    }
	
	
	MessageId seekTo seekTo = partitionCursor.getStartFrom();
	seek.accept(seekTo);	
}`

Additional context
Add any other context or screenshots about the feature request here.

Metadata

Metadata

Assignees

Labels

help wantedtype/enhancementThe enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions