kafka: set max_timestamp if missing and using timestamp_type::create_time#25200
Closed
rockwotj wants to merge 1 commit intoredpanda-data:devfrom
Closed
kafka: set max_timestamp if missing and using timestamp_type::create_time#25200rockwotj wants to merge 1 commit intoredpanda-data:devfrom
rockwotj wants to merge 1 commit intoredpanda-data:devfrom
Conversation
…time Some clients (looking at you Sarama!) don't set a max_timestamp for batches when being produced. In Apache Kafka, all incoming batches have the max_timestamp batch header field set BY THE BROKER. Code references: Here for uncompressed batches: https://github.com/apache/kafka/blob/e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L275 Here for compressed batches: https://github.com/apache/kafka/blob/e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L404 When the timestamp_type is log_append then we do set the max_timestamp in the broker: https://github.com/redpanda-data/redpanda/blob/be699729fbbb0b48cc684d4417383ad1320103b7/src/v/kafka/server/handlers/produce.cc#L309 But don't do anything in the create_time case. This has the effect of breaking timequeries, which only look at the max_timestamp in many places of the storage layer (which is similar to the Kafka behavior, but they have the max_timestamp set correctly). However historically we've not decompressed batches on the produce path (which is technically a validation bug). Since that has many perf impliciations and doing that decompressing would presumably need at least a cluster config flag, we only partially fix this bug when the batch is uncompressed. For compressed batches we should really just do the right thing and validate the batch (but maybe behind a flag?).
Contributor
Author
|
Will add a test and send out for review |
Contributor
Author
|
Closing this, we fixed the client that was not setting max_timestamp, so this is lower priority for now. |
8 tasks
WillemKauf
added a commit
to WillemKauf/redpanda
that referenced
this pull request
Aug 26, 2025
…ion_ms`
The motivating case for `broker_time_based_retention` was the
fact that records with bad timestamps produced in the future
could lead to time-based retention being stuck indefinitely [1].
However, using _only_ the `broker_ts` can lead to unexpected
behavior when e.g. replicating data from an existing cluster
using MM2, as the timestamps of the Kafka records themselves are
correctly preserved, but internally, `redpanda` data structures
are not.
To avoid the potentially curious behavior of a divergence in
retention enforcement, take the minimum of the record's timestamp
as written and the current broker time. This will achieve the
original goal of preventing future timestamps from blocking
retention enforcement, while also avoiding any unexpected
behavior with past record timestamps.
We also need to deal with the case where a client may have left
a batch's `max_timestamp` unset, in which case it is marked with `{-1}` [2].
Clamp any non-positive values for `max_ts` to `broker_ts` in this case.
[1]:
* redpanda-data#9820
* redpanda-data#12991
[2]:
* redpanda-data#25200
7 tasks
WillemKauf
added a commit
to WillemKauf/redpanda
that referenced
this pull request
Aug 28, 2025
…ion_ms`
The motivating case for `broker_time_based_retention` was the
fact that records with bad timestamps produced in the future
could lead to time-based retention being stuck indefinitely [1].
However, using _only_ the `broker_ts` can lead to unexpected
behavior when e.g. replicating data from an existing cluster
using MM2, as the timestamps of the Kafka records themselves are
correctly preserved, but internally, `redpanda` data structures
are not.
To avoid the potentially curious behavior of a divergence in
retention enforcement, take the minimum of the record's timestamp
as written and the current broker time. This will achieve the
original goal of preventing future timestamps from blocking
retention enforcement, while also avoiding any unexpected
behavior with past record timestamps.
We also need to deal with the case where a client may have left
a batch's `max_timestamp` unset, in which case it is marked with `{-1}` [2].
Clamp any non-positive values for `max_ts` to `broker_ts` in this case.
[1]:
* redpanda-data#9820
* redpanda-data#12991
[2]:
* redpanda-data#25200
WillemKauf
added a commit
to WillemKauf/redpanda
that referenced
this pull request
Sep 9, 2025
…ion_ms`
The motivating case for `broker_time_based_retention` was the
fact that records with bad timestamps produced in the future
could lead to time-based retention being stuck indefinitely [1].
However, using _only_ the `broker_ts` can lead to unexpected
behavior when e.g. replicating data from an existing cluster
using MM2, as the timestamps of the Kafka records themselves are
correctly preserved, but internally, `redpanda` data structures
are not.
To avoid the potentially curious behavior of a divergence in
retention enforcement, take the minimum of the record's timestamp
as written and the current broker time. This will achieve the
original goal of preventing future timestamps from blocking
retention enforcement, while also avoiding any unexpected
behavior with past record timestamps.
We also need to deal with the case where a client may have left
a batch's `max_timestamp` unset, in which case it is marked with `{-1}` [2].
Clamp any non-positive values for `max_ts` to `broker_ts` in this case.
[1]:
* redpanda-data#9820
* redpanda-data#12991
[2]:
* redpanda-data#25200
WillemKauf
added a commit
to WillemKauf/redpanda
that referenced
this pull request
Sep 9, 2025
…ion_ms`
The motivating case for `broker_time_based_retention` was the
fact that records with bad timestamps produced in the future
could lead to time-based retention being stuck indefinitely [1].
However, using _only_ the `broker_ts` can lead to unexpected
behavior when e.g. replicating data from an existing cluster
using MM2, as the timestamps of the Kafka records themselves are
correctly preserved, but internally, `redpanda` data structures
are not.
To avoid the potentially curious behavior of a divergence in
retention enforcement, take the minimum of the record's timestamp
as written and the current broker time. This will achieve the
original goal of preventing future timestamps from blocking
retention enforcement, while also avoiding any unexpected
behavior with past record timestamps.
We also need to deal with the case where a client may have left
a batch's `max_timestamp` unset, in which case it is marked with `{-1}` [2].
Clamp any non-positive values for `max_ts` to `broker_ts` in this case.
[1]:
* redpanda-data#9820
* redpanda-data#12991
[2]:
* redpanda-data#25200
WillemKauf
added a commit
to WillemKauf/redpanda
that referenced
this pull request
Sep 11, 2025
…ion_ms`
The motivating case for `broker_time_based_retention` was the
fact that records with bad timestamps produced in the future
could lead to time-based retention being stuck indefinitely [1].
However, using _only_ the `broker_ts` can lead to unexpected
behavior when e.g. replicating data from an existing cluster
using MM2, as the timestamps of the Kafka records themselves are
correctly preserved, but internally, `redpanda` data structures
are not.
To avoid the potentially curious behavior of a divergence in
retention enforcement, take the minimum of the record's timestamp
as written and the current broker time. This will achieve the
original goal of preventing future timestamps from blocking
retention enforcement, while also avoiding any unexpected
behavior with past record timestamps.
We also need to deal with the case where a client may have left
a batch's `max_timestamp` unset, in which case it is marked with `{-1}` [2].
Clamp any non-positive values for `max_ts` to `broker_ts` in this case.
[1]:
* redpanda-data#9820
* redpanda-data#12991
[2]:
* redpanda-data#25200
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Some clients (looking at you Sarama!) don't set a max_timestamp for
batches when being produced. In Apache Kafka, all incoming batches have
the max_timestamp batch header field set BY THE BROKER. Code references:
Here for uncompressed batches:
https://github.com/apache/kafka/blob/e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L275
Here for compressed batches:
https://github.com/apache/kafka/blob/e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L404
When the timestamp_type is log_append then we do set the max_timestamp
in the broker:
redpanda/src/v/kafka/server/handlers/produce.cc
Line 309 in be69972
But don't do anything in the create_time case. This has the effect of
breaking timequeries, which only look at the max_timestamp in many
places of the storage layer (which is similar to the Kafka behavior, but
they have the max_timestamp set correctly).
However historically we've not decompressed batches on the produce path
(which is technically a validation bug). Since that has many perf
impliciations and doing that decompressing would presumably need at
least a cluster config flag, we only partially fix this bug when the
batch is uncompressed. For compressed batches we should really just
do the right thing and validate the batch (but maybe behind a flag?).
Note that we only set the max_timestamp if the client doesn't set it,
which deviates from Apache Kafka behavior. I could be convinced to always
set it, but then we'd do extra crc computations if there was a client bug.
Backports Required
Release Notes
Bug Fixes
message.timestamp.type=CreateTimeand the producer leaves themax_timestampbatch header as the missing timestamp (-1).