-
Notifications
You must be signed in to change notification settings - Fork 726
kafka: add kafka_produce_batch_validation and rework timestamp validation
#27529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -188,40 +188,6 @@ iobuf kafka_batch_adapter::adapt(iobuf&& kbatch) { | |
| auto new_batch = model::record_batch( | ||
| header, std::move(records), model::record_batch::tag_ctor_ng{}); | ||
|
|
||
| /** | ||
| * Perform some type of validation on the uncompressed input. In this case | ||
| * we make sure that the records can be materialized but we avoid | ||
| * re-encoding them using the lazy-record optimization. | ||
| * | ||
| * TODO(kafka): we should also be validating compressed batches - the | ||
| * reference implementation does this, and this also means they are able to | ||
| * set the max_timestamp when using timestamp_type::create_time. | ||
| */ | ||
| if (!new_batch.compressed()) { | ||
| int64_t max_timestamp_delta = 0; | ||
| try { | ||
| new_batch.for_each_record([&max_timestamp_delta](model::record r) { | ||
| max_timestamp_delta = std::max( | ||
| r.timestamp_delta(), max_timestamp_delta); | ||
| }); | ||
| } catch (const std::exception& e) { | ||
| vlog(klog.error, "Parsing uncompressed records: {}", e.what()); | ||
| return remainder; | ||
| } | ||
| // If using append_time, we'll override the timestamp in the produce | ||
| // handler. Otherwise if the client does not set the max_timestamp we | ||
| // need to so that timequeries work correctly. | ||
| if ( | ||
| header.max_timestamp == model::timestamp::missing() | ||
| && header.attrs.timestamp_type() | ||
| == model::timestamp_type::create_time) { | ||
| model::timestamp max_timestamp( | ||
| header.first_timestamp() + max_timestamp_delta); | ||
| new_batch.set_max_timestamp( | ||
| model::timestamp_type::create_time, max_timestamp); | ||
| } | ||
| } | ||
|
Comment on lines
-191
to
-223
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC our kafka client uses this logic too. I think this is OK to remove for our use cases outside of DR. In DR we probably want this validation, etc. Can we flag to the DR team to look into this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This bit of code is removed but the logic of setting timestamps in the uncompressed case is still present in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now what I am saying is that when consuming our kafka client was using this code to do some light validation. Now that this code is moved into the produce handler our consume API handling in the kafka client no longer has any of this validation applied. I will post in the DR channel about this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, sorry, that totally went over my head. I see what you are referring to now. Happy to fix this in this PR if it's a regression.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it's OK, let's leave as is. |
||
|
|
||
| batch = std::move(new_batch); | ||
| return remainder; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so much boilerplate 😢
Thanks for doing this!