kafka: add kafka_produce_batch_validation and rework timestamp validation#27529
kafka: add kafka_produce_batch_validation and rework timestamp validation#27529rockwotj merged 4 commits intoredpanda-data:devfrom
kafka: add kafka_produce_batch_validation and rework timestamp validation#27529Conversation
c587808 to
3f44c51
Compare
Maybe mention timequeries and time-based retention specifically? |
andrwng
left a comment
There was a problem hiding this comment.
Nice, pretty much LGTM, just a couple thoughts
| auto max_timestamp_missing = header.max_timestamp | ||
| == model::timestamp::missing() |
There was a problem hiding this comment.
I imagine this is maybe a gray area, if the value is some arbitrary negative number, should we also treat it as missing?
There was a problem hiding this comment.
I imagine this is maybe a gray area, if the value is some arbitrary negative number, should we also treat it as missing?
Negative timestamps are supported by Kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support
There was a problem hiding this comment.
I thought that's just a proposal but indeed, just checked, kafka broker will happily accept negative timestamps.
CI test resultstest results on build#72099
test results on build#72124
test results on build#72154
test results on build#72220
test results on build#72221
test results on build#72226
test results on build#72227
test results on build#72262
|
3f44c51 to
a594333
Compare
kafka: set max_timestamp if == missing() in batch_adapterkafka: set or reject batches if max_timestamp is missing
Retry command for Build#72124please wait until all jobs are finished before running the slash command |
src/v/config/configuration.cc
Outdated
| , log_message_timestamp_reject_unset( | ||
| *this, | ||
| "log_message_timestamp_reject_unset", | ||
| "Controls whether batches produced to Redpanda with an unset " |
There was a problem hiding this comment.
Maybe something like this:
Controls the handling of batches produced to Redpanda when the max_timestamp value is unset.
- When set to
false, each batch is accepted and themax_timestampis derived from the maximum timestamp of records in the batch. A WARN level log line is displayed to highlight potential additional costs when a batch is compressed and Redpanda must decompress it. - When set to
true, the batch is rejected and an ERROR level log line is displayed.
There was a problem hiding this comment.
Repushed with a new cluster property. Can I get a re-review please @asimms41?
asimms41
left a comment
There was a problem hiding this comment.
Added suggestions for rewording.
|
@nvartolomei I have added a commit which rejects batches with |
9ff45ac to
9834600
Compare
kafka: set or reject batches if max_timestamp is missingkafka: set max_timestamp if missing
rockwotj
left a comment
There was a problem hiding this comment.
Nice, LGTM, just one comment.
| }; | ||
|
|
||
| template<> | ||
| struct convert<model::kafka_batch_validation_mode> { |
There was a problem hiding this comment.
so much boilerplate 😢
Thanks for doing this!
9834600 to
c00e5e2
Compare
|
/microbench |
|
Performance change detected in https://buildkite.com/redpanda/redpanda/builds/72274#01994e66-f605-4766-a1ff-09c3196414cc: See https://redpandadata.atlassian.net/wiki/x/LQAqLg for docs |
|
Note there will be a lot of noise in these as we just changed instance type for microbenches. The kafka produce ones are probably real though. Best to confirm locally. |
|
I think the change is mostly because the benchmark did not previously measure the work we were doing in the Kafka protocol layer to iterate over the batches. |
|
Maybe e2e benchmarks is what we want? I guess we can see how those shake out in the nightly rights right @StephanDollberg? Is there a way to do the OMB benchmarking as a slash command? |
|
Before After |
|
Worth noting that this benchmark does not test the now compressed path ( |
|
If we had a benchmark that measured the work done in kafka_batch_adapter and the produce path, I'd expect that delta to be much more reasonable. |
|
@WillemKauf if you change the benchmark to use compressed batches with the default cluster config I'd expect the changes to go back to normal. Can you confirm? |
Slash command OMB is not a yet thing. |
I assume you mean compressed batches with After with diff: ss::future<>
produce_partition_fixture::run_test(size_t data_size, measured_region region) {
BOOST_TEST_CHECKPOINT("HERE");
+ scoped_config cfg;
+ cfg.get("kafka_produce_batch_validation")
+ .set_value(model::kafka_batch_validation_mode::legacy);
model::topic_partition tp = model::topic_partition(
t, model::partition_id(0));
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset{0});
constexpr size_t num_records = 100;
for (size_t i = 0; i < num_records; ++i) {
builder.add_raw_kv(iobuf{}, rand_iobuf(data_size));
}
+ builder.set_compression(model::compression::lz4);
auto batch = std::move(builder).build(); |
|
I'm a bit confused how those results are better than the baseline... Anyways, my recommendation is as follows:
|
|
|
/backport v25.2.x |
|
Failed to create a backport PR to v25.2.x branch. I tried: |
Note the OMB benchmarks don't use compressed batches which to my understanding is the main regression path (besides from the normal one shown in the microbenchmark but we think that's only because of it now measuring more of what normally is on the produce path)? |
I mean the java client also sets max timestamp so it also won't hit a perf impact unless strict validation is enabled. I mostly want to ensure that "well behaved clients" don't see a perf impact. |
|
@WillemKauf, is it accurate to say this PR implements KIP-31 & KIP-32? |
No, it is not. |
|
@WillemKauf, sorry, I should have posted a link: https://issues.apache.org/jira/browse/KAFKA-2511 |
|
No it's not the same as that issue either. This is adding validation that isn't in a KIP but mirrors upstream Apache Kafka. |
redpandacannot leavemax_timestampunset in record batches - several subsystems (timequeries, retention, archival) depend heavily on themax_timestampof batches being properly set.Set the
max_timestampmanually by iterating over the records in the record batch, taking the maximumtimestamp_deltaand adding it to the batch'sfirst_timestamp(which must be set by clients, as a hard rule). This may incur additional cost for compressed batches, and is gated by the level ofkafka_produce_batch_validation.kafka_produce_batch_validationis a new cluster property which shall be used in the future for more extensive batch validation in addition to the timestamp fixes here.Log lines are added to alert the client in case of a batch accepted without a maximum timestamp set for
kafka_produce_batch_validation == legacyor in the case of expensive decompression operation in the produce path forkafka_produce_batch_validation == relaxed, letting them know to update their client to set themax_timestamp.Opened for backports.
Backports Required
Release Notes
Features
kafka_produce_batch_validationwhich controls the level of validation performed on batches in theredpandaproduce path.