config: add support for message.timestamp.{before/after}.max.ms#27419
config: add support for message.timestamp.{before/after}.max.ms#27419WillemKauf merged 8 commits intoredpanda-data:devfrom
config: add support for message.timestamp.{before/after}.max.ms#27419Conversation
259c469 to
c3bd8b5
Compare
78e73a3 to
d38c385
Compare
Retry command for Build#71583please wait until all jobs are finished before running the slash command |
CI test resultstest results on build#71583
test results on build#71645
|
rockwotj
left a comment
There was a problem hiding this comment.
Nice, just a few small bits on the validation itself (also CI failures)
| return std::nullopt; | ||
| batch.set_max_timestamp( | ||
| model::timestamp_type::append_time, broker_time); | ||
| } |
There was a problem hiding this comment.
Oops, this disappeared, thanks
| auto broker_timepoint = model::duration_since_epoch(broker_time); | ||
|
|
||
| // reject if first_timestamp is too far in the past | ||
| auto first_timepoint = model::duration_since_epoch(header.first_timestamp); | ||
| if ( | ||
| broker_timepoint > first_timepoint | ||
| && std::chrono::duration_cast<std::chrono::milliseconds>( | ||
| broker_timepoint - first_timepoint) | ||
| > message_timestamp_before_max_ms) { |
There was a problem hiding this comment.
Remind me why this check can't be:
auto min_timepoint = broker_time - message_timestamp_before_max_ms;
if (first_timestamp < min_timepoint) {
// error
}
Is it something about overflow?
There was a problem hiding this comment.
Yes, message_timestamp_before_max_ms is a bounded property that defaults to serde::max_serializable_ms (9223372036854), so subtracting this value from broker time would be an underflow (until Friday, April 11, 2262 11:47:16.854 PM 😄 )
The checks here are safe to under/overflow in their current implementation.
There was a problem hiding this comment.
ugh I miss absl::Time that has saturating logic instead...
| auto max_timepoint = model::duration_since_epoch(header.max_timestamp); | ||
| if ( | ||
| broker_timepoint < max_timepoint | ||
| && std::chrono::duration_cast<std::chrono::milliseconds>( | ||
| max_timepoint - broker_timepoint) | ||
| > message_timestamp_after_max_ms) { |
There was a problem hiding this comment.
ditto, why can't this be:
auto max_timepoint = broker_timepoint + message_timestamp_after_max_ms;
if (header_max_timestamp > max_timepoint) {
// error
}
``
There was a problem hiding this comment.
Potential overflow, better to subtract.
9616654 to
ec487f6
Compare
|
Also please mention the property deprecation in the release notes |
Done, i'll wait for a docs review of the cluster property description as well before merging |
…time Some clients (looking at you Sarama!) don't set a max_timestamp for batches when being produced. In Apache Kafka, all incoming batches have the max_timestamp batch header field set BY THE BROKER. Code references: Here for uncompressed batches: https://github.com/apache/kafka/blob/e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L275 Here for compressed batches: https://github.com/apache/kafka/blob/e124d3975bdb3a9ec85eee2fba7a1b0a6967d3a6/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L404 When the timestamp_type is log_append then we do set the max_timestamp in the broker: https://github.com/redpanda-data/redpanda/blob/be699729fbbb0b48cc684d4417383ad1320103b7/src/v/kafka/server/handlers/produce.cc#L309 But don't do anything in the create_time case. This has the effect of breaking timequeries, which only look at the max_timestamp in many places of the storage layer (which is similar to the Kafka behavior, but they have the max_timestamp set correctly). However historically we've not decompressed batches on the produce path (which is technically a validation bug). Since that has many perf impliciations and doing that decompressing would presumably need at least a cluster config flag, we only partially fix this bug when the batch is uncompressed. For compressed batches we should really just do the right thing and validate the batch (but maybe behind a flag?).
We have topic properties that will actively reject messages now, so these alerts are un-needed and would only serve to confuse users due to a mismatched configuration between the alert and the other properties which reject the records.
…ax_ms` This commit properly enforces these configs within the produce path. There is also a subtle reworking of `validate_batch_timestamps()`, though there are no other functional changes in the reworking.
A variety of test cases that produce records with timestamps in the past,
future, and present, to assert that `message.timestamp.{before/after}.max.ms`
are being correctly enforced.
…s()`
Kafka records can be produced with timestamps up to `int64_t::max()`.
Before, validation checks for timestamps in the produce path were
implemented via the following:
```
auto broker_time = model::timestamp::now();
auto broker_timepoint = model::duration_since_epoch(broker_time);
auto max_timepoint = model::duration_since_epoch(header.max_timestamp);
if (broker_timepoint < max_timepoint
&& std::chrono::duration_cast<std::chrono::milliseconds>(
max_timepoint - broker_timepoint)
> message_timestamp_after_max_ms) {
...
}
```
where `model::duration_since_epoch()` is defined as:
```
inline timestamp_clock::duration duration_since_epoch(timestamp ts) {
return std::chrono::duration_cast<timestamp_clock::duration>(
std::chrono::milliseconds{ts.value()});
}
```
where `timestamp_clock::duration` is `std::chrono::system_clock::duration`,
which can be `microseconds` or `nanoseconds`. Uh oh.
So, in the case that the `max_timepoint` is near `int64_t::max()`, we
effectively `duration_cast` a `std::chrono::milliseconds` type to a
`std::chrono::microseconds` type (multiplying underlying value by 1000),
subtract two values, and then attempt to cast back to a
`std::chrono::milliseconds` type, which is only guaranteed to be a signed
integer of at least 45 bits [1].
This can lead to overflow. Fix the issue by keeping everything as a
`model::timestamp`, with `int64_t` comparisions and no opportunity
for narrowing conversions.
[1]:
https://en.cppreference.com/w/cpp/chrono/duration.html
ec487f6 to
99c7a2b
Compare
|
Force push to:
@rockwotj please take a look, I can squash the commit in with the original change if you'd like |
| > message_timestamp_before_max_ms) { | ||
| broker_time > header.first_timestamp | ||
| && (broker_time - header.first_timestamp) | ||
| > model::timestamp(message_timestamp_before_max_ms.count())) { |
There was a problem hiding this comment.
this is now an int64_t comparison. Using .count() is fine since model::timestamp uses int64_t and expects milliseconds.
There was a problem hiding this comment.
duration_since_epoch() is a pretty dangerous function, I think.
There was a problem hiding this comment.
Agreed, I am actually going to remove it after this PR goes in.
There was a problem hiding this comment.
I'll think about adding some lower level kafka tests here, it would be easier to ensure we aren't getting overflows in a fixture test than in Ducktape. I'll look at a quick follow-up here.
Implements KIP-937 as described here.
Introduces
message.timestamp.{before/after}.max.msandlog_message_timestamp_{before/after}_max_msat the topic and cluster level.These can be used to reject records produced with timestamps outside the set bounds by returning Kafka error code 32 (
INVALID_TIMESTAMP).Backports Required
Release Notes
Features
message.timestamp.{before/after}.max.ms.log_message_timestamp_alert_{before/after}_mscluster properties.