Skip to content

config: add support for message.timestamp.{before/after}.max.ms#27419

Merged
WillemKauf merged 8 commits intoredpanda-data:devfrom
WillemKauf:kafka_time_properties
Sep 2, 2025
Merged

config: add support for message.timestamp.{before/after}.max.ms#27419
WillemKauf merged 8 commits intoredpanda-data:devfrom
WillemKauf:kafka_time_properties

Conversation

@WillemKauf
Copy link
Copy Markdown
Contributor

@WillemKauf WillemKauf commented Aug 29, 2025

Implements KIP-937 as described here.

Introduces message.timestamp.{before/after}.max.ms and log_message_timestamp_{before/after}_max_ms at the topic and cluster level.

These can be used to reject records produced with timestamps outside the set bounds by returning Kafka error code 32 (INVALID_TIMESTAMP).

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.2.x
  • v25.1.x
  • v24.3.x

Release Notes

Features

  • Adds support for KIP-937 by implementing message.timestamp.{before/after}.max.ms.
  • Deprecates log_message_timestamp_alert_{before/after}_ms cluster properties.

@WillemKauf WillemKauf requested a review from a team as a code owner August 29, 2025 00:02
@WillemKauf WillemKauf force-pushed the kafka_time_properties branch 3 times, most recently from 259c469 to c3bd8b5 Compare August 29, 2025 00:07
@rockwotj rockwotj self-requested a review August 29, 2025 00:11
@WillemKauf WillemKauf force-pushed the kafka_time_properties branch 2 times, most recently from 78e73a3 to d38c385 Compare August 29, 2025 02:34
@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Aug 29, 2025

Retry command for Build#71583

please wait until all jobs are finished before running the slash command

/ci-repeat 1
tests/rptest/tests/retention_policy_test.py::BogusTimestampTest.test_bogus_timestamps@{"mixed_timestamps":true,"use_broker_timestamps":false}
tests/rptest/tests/retention_policy_test.py::BogusTimestampTest.test_bogus_timestamps@{"mixed_timestamps":true,"use_broker_timestamps":true}
tests/rptest/tests/retention_policy_test.py::BogusTimestampTest.test_bogus_timestamps@{"mixed_timestamps":false,"use_broker_timestamps":false}
tests/rptest/tests/retention_policy_test.py::BogusTimestampTest.test_bogus_timestamps@{"mixed_timestamps":false,"use_broker_timestamps":true}
tests/rptest/tests/compatibility/kafka_streams_test.py::KafkaStreamsSessionWindow.test_kafka_streams

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Aug 29, 2025

CI test results

test results on build#71583
test_class test_method test_arguments test_kind job_url test_status passed reason
KafkaStreamsSessionWindow test_kafka_streams null integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cb-96c5-4af5-9987-524f78dd0b93 FAIL 0/21 The test has failed across all retries
KafkaStreamsSessionWindow test_kafka_streams null integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cd-2b65-4142-b267-4ab9a2f78c94 FAIL 0/21 The test has failed across all retries
BogusTimestampTest test_bogus_timestamps {"mixed_timestamps": false, "use_broker_timestamps": false} integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cb-96c4-4b4d-97d3-f1b6294dbddb FAIL 0/21 The test has failed across all retries
BogusTimestampTest test_bogus_timestamps {"mixed_timestamps": false, "use_broker_timestamps": false} integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cd-2b65-4b54-8361-feb77bda5753 FAIL 0/21 The test has failed across all retries
BogusTimestampTest test_bogus_timestamps {"mixed_timestamps": false, "use_broker_timestamps": true} integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cb-96c5-4af5-9987-524f78dd0b93 FAIL 0/21 The test has failed across all retries
BogusTimestampTest test_bogus_timestamps {"mixed_timestamps": false, "use_broker_timestamps": true} integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cd-2b65-4142-b267-4ab9a2f78c94 FAIL 0/21 The test has failed across all retries
BogusTimestampTest test_bogus_timestamps {"mixed_timestamps": true, "use_broker_timestamps": false} integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cb-96be-4c38-9586-77501cea24d5 FAIL 0/21 The test has failed across all retries
BogusTimestampTest test_bogus_timestamps {"mixed_timestamps": true, "use_broker_timestamps": false} integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cd-2b5f-4696-bbd0-e2789096dec7 FAIL 0/21 The test has failed across all retries
BogusTimestampTest test_bogus_timestamps {"mixed_timestamps": true, "use_broker_timestamps": true} integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cb-96bf-4220-b0da-3eb6987c0546 FAIL 0/21 The test has failed across all retries
BogusTimestampTest test_bogus_timestamps {"mixed_timestamps": true, "use_broker_timestamps": true} integration https://buildkite.com/redpanda/redpanda/builds/71583#0198f3cd-2b60-4e1d-b037-f39be9a5a603 FAIL 0/21 The test has failed across all retries
test results on build#71645
test_class test_method test_arguments test_kind job_url test_status passed reason
RandomNodeOperationsTest test_node_operations {"cloud_storage_type": 1, "compaction_mode": "sliding_window", "enable_failures": false, "mixed_versions": true, "with_iceberg": false} integration https://buildkite.com/redpanda/redpanda/builds/71645#0198f7dd-7cd2-4cb0-be78-28f42a0c41aa FLAKY 20/21 upstream reliability is '99.13419913419914'. current run reliability is '95.23809523809523'. drift is 3.8961 and the allowed drift is set to 50. The test should PASS

Copy link
Copy Markdown
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, just a few small bits on the validation itself (also CI failures)

return std::nullopt;
batch.set_max_timestamp(
model::timestamp_type::append_time, broker_time);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing return

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this disappeared, thanks

Comment on lines +231 to +240
auto broker_timepoint = model::duration_since_epoch(broker_time);

// reject if first_timestamp is too far in the past
auto first_timepoint = model::duration_since_epoch(header.first_timestamp);
if (
broker_timepoint > first_timepoint
&& std::chrono::duration_cast<std::chrono::milliseconds>(
broker_timepoint - first_timepoint)
> message_timestamp_before_max_ms) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remind me why this check can't be:

auto min_timepoint = broker_time - message_timestamp_before_max_ms;
if (first_timestamp < min_timepoint) {
  // error
}

Is it something about overflow?

Copy link
Copy Markdown
Contributor Author

@WillemKauf WillemKauf Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, message_timestamp_before_max_ms is a bounded property that defaults to serde::max_serializable_ms (9223372036854), so subtracting this value from broker time would be an underflow (until Friday, April 11, 2262 11:47:16.854 PM 😄 )

The checks here are safe to under/overflow in their current implementation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh I miss absl::Time that has saturating logic instead...

Comment on lines +263 to +270
auto max_timepoint = model::duration_since_epoch(header.max_timestamp);
if (
broker_timepoint < max_timepoint
&& std::chrono::duration_cast<std::chrono::milliseconds>(
max_timepoint - broker_timepoint)
> message_timestamp_after_max_ms) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, why can't this be:

auto max_timepoint = broker_timepoint + message_timestamp_after_max_ms;
if (header_max_timestamp > max_timepoint) {
  // error
}
``

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential overflow, better to subtract.

@WillemKauf WillemKauf force-pushed the kafka_time_properties branch 2 times, most recently from 9616654 to ec487f6 Compare August 29, 2025 16:20
@WillemKauf WillemKauf requested a review from rockwotj August 29, 2025 16:34
rockwotj
rockwotj previously approved these changes Aug 29, 2025
Copy link
Copy Markdown
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rockwotj
Copy link
Copy Markdown
Contributor

Also please mention the property deprecation in the release notes

@WillemKauf
Copy link
Copy Markdown
Contributor Author

Also please mention the property deprecation in the release notes

Done, i'll wait for a docs review of the cluster property description as well before merging

WillemKauf and others added 8 commits August 29, 2025 16:08
…time

Some clients (looking at you Sarama!) don't set a max_timestamp for
batches when being produced. In Apache Kafka, all incoming batches have
the max_timestamp batch header field set BY THE BROKER. Code references:

Here for uncompressed batches:
https://github.com/apache/kafka/blob/e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L275

Here for compressed batches:
https://github.com/apache/kafka/blob/e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L404

When the timestamp_type is log_append then we do set the max_timestamp
in the broker:
https://github.com/redpanda-data/redpanda/blob/be699729fbbb0b48cc684d4417383ad1320103b7/src/v/kafka/server/handlers/produce.cc#L309

But don't do anything in the create_time case. This has the effect of
breaking timequeries, which only look at the max_timestamp in many
places of the storage layer (which is similar to the Kafka behavior, but
they have the max_timestamp set correctly).

However historically we've not decompressed batches on the produce path
(which is technically a validation bug). Since that has many perf
impliciations and doing that decompressing would presumably need at
least a cluster config flag, we only partially fix this bug when the
batch is uncompressed. For compressed batches we should really just
do the right thing and validate the batch (but maybe behind a flag?).
We have topic properties that will actively reject messages now,
so these alerts are un-needed and would only serve to confuse users
due to a mismatched configuration between the alert and the other
properties which reject the records.
…ax_ms`

This commit properly enforces these configs within the produce path.

There is also a subtle reworking of `validate_batch_timestamps()`,
though there are no other functional changes in the reworking.
A variety of test cases that produce records with timestamps in the past,
future, and present, to assert that `message.timestamp.{before/after}.max.ms`
are being correctly enforced.
…s()`

Kafka records can be produced with timestamps up to `int64_t::max()`.

Before, validation checks for timestamps in the produce path were
implemented via the following:

```
auto broker_time = model::timestamp::now();
auto broker_timepoint = model::duration_since_epoch(broker_time);
auto max_timepoint = model::duration_since_epoch(header.max_timestamp);
if (broker_timepoint < max_timepoint
  && std::chrono::duration_cast<std::chrono::milliseconds>(
       max_timepoint - broker_timepoint)
       > message_timestamp_after_max_ms) {
   ...
}
```

where `model::duration_since_epoch()` is defined as:

```
inline timestamp_clock::duration duration_since_epoch(timestamp ts) {
    return std::chrono::duration_cast<timestamp_clock::duration>(
      std::chrono::milliseconds{ts.value()});
}
```

where `timestamp_clock::duration` is `std::chrono::system_clock::duration`,
which can be `microseconds` or `nanoseconds`. Uh oh.

So, in the case that the `max_timepoint` is near `int64_t::max()`, we
effectively `duration_cast` a `std::chrono::milliseconds` type to a
`std::chrono::microseconds` type (multiplying underlying value by 1000),
subtract two values, and then attempt to cast back to a
`std::chrono::milliseconds` type, which is only guaranteed to be a signed
integer of at least 45 bits [1].

This can lead to overflow. Fix the issue by keeping everything as a
`model::timestamp`, with `int64_t` comparisions and no opportunity
for narrowing conversions.

[1]:

https://en.cppreference.com/w/cpp/chrono/duration.html
@WillemKauf
Copy link
Copy Markdown
Contributor Author

Force push to:

  • Fix cluster property description per docs review
  • Add a new commit that better handles under/overflow in validate_batch_timestamps()

@rockwotj please take a look, I can squash the commit in with the original change if you'd like

@WillemKauf WillemKauf requested a review from rockwotj August 29, 2025 21:26
> message_timestamp_before_max_ms) {
broker_time > header.first_timestamp
&& (broker_time - header.first_timestamp)
> model::timestamp(message_timestamp_before_max_ms.count())) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now an int64_t comparison. Using .count() is fine since model::timestamp uses int64_t and expects milliseconds.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duration_since_epoch() is a pretty dangerous function, I think.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I am actually going to remove it after this PR goes in.

Copy link
Copy Markdown
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any tests we should add for 99c7a2b?

Copy link
Copy Markdown
Contributor Author

@WillemKauf WillemKauf Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll think about adding some lower level kafka tests here, it would be easier to ensure we aren't getting overflows in a fixture test than in Ducktape. I'll look at a quick follow-up here.

Copy link
Copy Markdown
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@WillemKauf WillemKauf merged commit 55adcf1 into redpanda-data:dev Sep 2, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants