Skip to content

kafka: add kafka_produce_batch_validation and rework timestamp validation#27529

Merged
rockwotj merged 4 commits intoredpanda-data:devfrom
WillemKauf:kba_set_max_ts
Sep 15, 2025
Merged

kafka: add kafka_produce_batch_validation and rework timestamp validation#27529
rockwotj merged 4 commits intoredpanda-data:devfrom
WillemKauf:kba_set_max_ts

Conversation

@WillemKauf
Copy link
Copy Markdown
Contributor

@WillemKauf WillemKauf commented Sep 11, 2025

redpanda cannot leave max_timestamp unset in record batches - several subsystems (timequeries, retention, archival) depend heavily on the max_timestamp of batches being properly set.

Set the max_timestamp manually by iterating over the records in the record batch, taking the maximum timestamp_delta and adding it to the batch's first_timestamp (which must be set by clients, as a hard rule). This may incur additional cost for compressed batches, and is gated by the level of kafka_produce_batch_validation.

kafka_produce_batch_validation is a new cluster property which shall be used in the future for more extensive batch validation in addition to the timestamp fixes here.

Log lines are added to alert the client in case of a batch accepted without a maximum timestamp set for kafka_produce_batch_validation == legacy or in the case of expensive decompression operation in the produce path for kafka_produce_batch_validation == relaxed, letting them know to update their client to set the max_timestamp.

Opened for backports.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.2.x
  • v25.1.x
  • v24.3.x

Release Notes

Features

  • Adds a new cluster config kafka_produce_batch_validation which controls the level of validation performed on batches in the redpanda produce path.

@andrwng
Copy link
Copy Markdown
Contributor

andrwng commented Sep 11, 2025

Fixes a bug in which max_timestamp could be left unset in batches, leading to buggy behavior with a number of subsystems in redpanda.

Maybe mention timequeries and time-based retention specifically?

Copy link
Copy Markdown
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, pretty much LGTM, just a couple thoughts

Comment on lines +225 to +226
auto max_timestamp_missing = header.max_timestamp
== model::timestamp::missing()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this is maybe a gray area, if the value is some arbitrary negative number, should we also treat it as missing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this is maybe a gray area, if the value is some arbitrary negative number, should we also treat it as missing?

Negative timestamps are supported by Kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support

Copy link
Copy Markdown
Contributor

@nvartolomei nvartolomei Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that's just a proposal but indeed, just checked, kafka broker will happily accept negative timestamps.

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Sep 11, 2025

CI test results

test results on build#72099
test_class test_method test_arguments test_kind job_url test_status passed reason
AuditLogTestOauth test_kafka_oauth {"authz_match": "acl"} integration https://buildkite.com/redpanda/redpanda/builds/72099#01993a0e-8205-4bc4-9617-7564b4e85b7a FLAKY 14/21 upstream reliability is '97.9020979020979'. current run reliability is '66.66666666666666'. drift is 31.23543 and the allowed drift is set to 50. The test should PASS
PartitionBalancerTest test_recovery_mode_rebalance_finish null integration https://buildkite.com/redpanda/redpanda/builds/72099#01993a0b-aaa5-4a58-9df5-171da74b02c7 FLAKY 17/21 upstream reliability is '96.62921348314607'. current run reliability is '80.95238095238095'. drift is 15.67683 and the allowed drift is set to 50. The test should PASS
test results on build#72124
test_class test_method test_arguments test_kind job_url test_status passed reason
AuditLogTestOauth test_kafka_oauth {"authz_match": "acl"} integration https://buildkite.com/redpanda/redpanda/builds/72124#01993c19-ea7c-4852-b017-3527a45379eb FLAKY 8/21 upstream reliability is '97.87234042553192'. current run reliability is '38.095238095238095'. drift is 59.7771 and the allowed drift is set to 50. The test should FAIL
CloudStorageScrubberTest test_scrubber {"cloud_storage_type": 2} integration https://buildkite.com/redpanda/redpanda/builds/72124#01993c20-7bf2-475a-b9f6-ed225314d51a FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
SchemaEvolutionE2ETests test_legal_schema_evolution {"catalog_type": "rest_hadoop", "cloud_storage_type": 1, "produce_mode": "avro", "query_engine": "trino", "test_case": "promote_column"} integration https://buildkite.com/redpanda/redpanda/builds/72124#01993c5c-beb0-44d8-8ac2-fed0ad152305 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
SimpleEndToEndTest test_consumer_interruption null integration https://buildkite.com/redpanda/redpanda/builds/72124#01993c19-ea84-4f5a-8b3c-9b136fa0851f FLAKY 19/21 upstream reliability is '99.29328621908127'. current run reliability is '90.47619047619048'. drift is 8.8171 and the allowed drift is set to 50. The test should PASS
test results on build#72154
test_class test_method test_arguments test_kind job_url test_status passed reason
SchemaEvolutionE2ETests test_old_schema_writer {"catalog_type": "rest_hadoop", "cloud_storage_type": 1, "produce_mode": "avro", "query_engine": "trino", "test_case": "add_column"} integration https://buildkite.com/redpanda/redpanda/builds/72154#01993e79-b7b9-4a3c-b229-a8a4edf40e9b FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
test results on build#72220
test_class test_method test_arguments test_kind job_url test_status passed reason
AuditLogTestOauth test_kafka_oauth {"audit_transport_mode": "rpc", "authz_match": "acl"} integration https://buildkite.com/redpanda/redpanda/builds/72220#019941ba-36b8-4d25-a271-74233220698f FLAKY 19/21 upstream reliability is '100.0'. current run reliability is '90.47619047619048'. drift is 9.52381 and the allowed drift is set to 50. The test should PASS
PartitionBalancerTest test_recovery_mode_rebalance_finish null integration https://buildkite.com/redpanda/redpanda/builds/72220#019941bf-3e12-48c0-9601-550ebac8d222 FLAKY 15/21 upstream reliability is '94.9238578680203'. current run reliability is '71.42857142857143'. drift is 23.49529 and the allowed drift is set to 50. The test should PASS
test results on build#72221
test_class test_method test_arguments test_kind job_url test_status passed reason
MasterTestSuite test_chunk_dl_with_random_http_errors unit https://buildkite.com/redpanda/redpanda/builds/72221#019943b2-c58f-4e6e-826c-d287d8f5df89 FAIL 0/1
AuditLogTestOauth test_kafka_oauth {"audit_transport_mode": "rpc", "authz_match": "acl"} integration https://buildkite.com/redpanda/redpanda/builds/72221#019943cf-9352-411c-b742-7aab7a9bcfbd FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
test results on build#72226
test_class test_method test_arguments test_kind job_url test_status passed reason
CloudStorageScrubberTest test_scrubber {"cloud_storage_type": 2} integration https://buildkite.com/redpanda/redpanda/builds/72226#01994474-2a82-43d9-a396-39e8611b73a6 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
test results on build#72227
test_class test_method test_arguments test_kind job_url test_status passed reason
AuditLogTestEscapeHatch test_escape_hatch {"audit_transport_mode": "kclient"} integration https://buildkite.com/redpanda/redpanda/builds/72227#019944da-8e2b-4387-a73a-231af4f5b05d FAIL 0/1
IncrementalFollowerFetchingTest test_incremental_fetch_from_follower {"follower_offline": true} integration https://buildkite.com/redpanda/redpanda/builds/72227#019944da-8e2c-48c2-b5ba-e67ef4da36a4 FAIL 0/1
test results on build#72262
test_class test_method test_arguments test_kind job_url test_status passed reason
DatalakeBlockedCatalogTest test_block_cloud_retention_before_translation {"cloud_storage_type": 2, "with_spillover": true} integration https://buildkite.com/redpanda/redpanda/builds/72262#01994db4-b2a3-40eb-892e-9087904604f4 FAIL 0/1
SchemaEvolutionE2ETests test_old_schema_writer {"catalog_type": "nessie", "cloud_storage_type": 2, "produce_mode": "proto3", "query_engine": "spark", "test_case": "reorder_columns"} integration https://buildkite.com/redpanda/redpanda/builds/72262#01994d9b-ac6f-436a-98ae-9cbac23b385d FAIL 0/1
RandomNodeOperationsTest test_node_operations {"cloud_storage_type": 1, "compaction_mode": "chunked_sliding_window", "enable_failures": true, "mixed_versions": true, "with_iceberg": false} integration https://buildkite.com/redpanda/redpanda/builds/72262#01994dc0-0351-4179-b137-472cfafdb775 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
SimpleEndToEndTest test_consumer_interruption null integration https://buildkite.com/redpanda/redpanda/builds/72262#01994dc0-d6f0-4c34-9da1-35d5ad832db9 FLAKY 19/21 upstream reliability is '98.99665551839465'. current run reliability is '90.47619047619048'. drift is 8.52047 and the allowed drift is set to 50. The test should PASS
IdempotentProducerRecoveryTest test_java_client_recovery_on_producer_eviction null integration https://buildkite.com/redpanda/redpanda/builds/72262#01994ddb-4bc0-4edc-bab7-6c72f5c70f33 FAIL 0/1

@WillemKauf WillemKauf requested a review from a team as a code owner September 12, 2025 03:07
@WillemKauf WillemKauf changed the title kafka: set max_timestamp if == missing() in batch_adapter kafka: set or reject batches if max_timestamp is missing Sep 12, 2025
@WillemKauf WillemKauf requested a review from andrwng September 12, 2025 03:15
@vbotbuildovich
Copy link
Copy Markdown
Collaborator

Retry command for Build#72124

please wait until all jobs are finished before running the slash command

/ci-repeat 1
tests/rptest/tests/audit_log_test.py::AuditLogTestOauth.test_kafka_oauth@{"authz_match":"acl"}

, log_message_timestamp_reject_unset(
*this,
"log_message_timestamp_reject_unset",
"Controls whether batches produced to Redpanda with an unset "
Copy link
Copy Markdown
Contributor

@asimms41 asimms41 Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this:

Controls the handling of batches produced to Redpanda when the max_timestamp value is unset.

  • When set to false, each batch is accepted and the max_timestamp is derived from the maximum timestamp of records in the batch. A WARN level log line is displayed to highlight potential additional costs when a batch is compressed and Redpanda must decompress it.
  • When set to true, the batch is rejected and an ERROR level log line is displayed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edited, thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repushed with a new cluster property. Can I get a re-review please @asimms41?

Copy link
Copy Markdown
Contributor

@asimms41 asimms41 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added suggestions for rewording.

@WillemKauf
Copy link
Copy Markdown
Contributor Author

@nvartolomei I have added a commit which rejects batches with first_timestamp > max_timestamp as requested.

@WillemKauf WillemKauf force-pushed the kba_set_max_ts branch 2 times, most recently from 9ff45ac to 9834600 Compare September 12, 2025 19:08
@WillemKauf WillemKauf changed the title kafka: set or reject batches if max_timestamp is missing kafka: set max_timestamp if missing Sep 12, 2025
Copy link
Copy Markdown
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, LGTM, just one comment.

};

template<>
struct convert<model::kafka_batch_validation_mode> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so much boilerplate 😢

Thanks for doing this!

@rockwotj rockwotj requested review from a team, StephanDollberg and piyushredpanda and removed request for a team and piyushredpanda September 15, 2025 13:54
@WillemKauf
Copy link
Copy Markdown
Contributor Author

/microbench

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

Performance change detected in https://buildkite.com/redpanda/redpanda/builds/72274#01994e66-f605-4766-a1ff-09c3196414cc:

Performance changes detected in 188 tests
serde_rpbench_big_10mb.serialize: inst -> -2.60pct
serde_rpbench_big_1mb.deserialize: inst -> +0.31pct
serde_rpbench_big_1mb.serialize: inst -> -4.88pct
serde_rpbench_small.deserialize: inst -> +0.76pct
serde_rpbench_small.serialize: inst -> +0.26pct
delta_for_rpbench_deltafor_bench.delta_column_append_tx: inst -> +0.12pct
delta_for_rpbench_deltafor_bench.delta_column_at_4K: inst -> +0.29pct
delta_for_rpbench_deltafor_bench.delta_column_at_4M: inst -> +0.13pct
delta_for_rpbench_deltafor_bench.delta_column_find_4K: inst -> +0.21pct
delta_for_rpbench_deltafor_bench.delta_column_find_4M: inst -> +0.13pct
delta_for_rpbench_deltafor_bench.delta_frame_append_tx: inst -> +0.11pct
delta_for_rpbench_deltafor_bench.delta_frame_at_4K: inst -> +0.38pct
delta_for_rpbench_deltafor_bench.delta_frame_find_4K: inst -> +0.36pct
delta_for_rpbench_deltafor_bench.xor_column_append_tx: inst -> +0.13pct
delta_for_rpbench_deltafor_bench.xor_column_at_4K: inst -> +0.30pct
delta_for_rpbench_deltafor_bench.xor_column_at_with_index_4K: inst -> -0.87pct
delta_for_rpbench_deltafor_bench.xor_column_find_4K: inst -> +0.20pct
delta_for_rpbench_deltafor_bench.xor_frame_append_tx: inst -> +0.10pct
delta_for_rpbench_deltafor_bench.xor_frame_at_4K: inst -> +0.35pct
delta_for_rpbench_deltafor_bench.xor_frame_at_with_index_4K: inst -> +0.15pct
delta_for_rpbench_deltafor_bench.xor_frame_find_4K: inst -> +0.33pct
zstd_stream_rpbench_async_stream_zstd.10mb_compress: allocs -> -0.03pct
zstd_stream_rpbench_async_stream_zstd.10mb_compress: inst -> -0.28pct
zstd_stream_rpbench_async_stream_zstd.1mb_compress: inst -> -0.27pct
zstd_stream_rpbench_async_stream_zstd.1mb_uncompress: inst -> -0.11pct
zstd_stream_rpbench_gzip_10mb.compress: inst -> +0.01pct
zstd_stream_rpbench_gzip_1mb.compress: inst -> +0.01pct
zstd_stream_rpbench_lz4_10mb.uncompress: inst -> +0.01pct
zstd_stream_rpbench_lz4_1mb.compress: inst -> +0.05pct
zstd_stream_rpbench_lz4_1mb.uncompress: inst -> -0.01pct
zstd_stream_rpbench_streaming_zstd_10mb.compress: inst -> -0.25pct
zstd_stream_rpbench_streaming_zstd_1mb.compress: inst -> -0.25pct
zstd_stream_rpbench_streaming_zstd_1mb.uncompress: inst -> +0.18pct
cloud_storage_rpbench_cstore_bench.column_store_deserialize_baseline: inst -> -1.80pct
cloud_storage_rpbench_cstore_bench.column_store_deserialize_result: inst -> +0.94pct
cloud_storage_rpbench_cstore_bench.column_store_find_no_hints: inst -> -0.16pct
cloud_storage_rpbench_cstore_bench.column_store_find_result: inst -> -0.99pct
cloud_storage_rpbench_cstore_bench.column_store_lower_bound_result: inst -> -0.95pct
cloud_storage_rpbench_cstore_bench.column_store_serialize_result: inst -> -0.74pct
cloud_storage_rpbench_cstore_bench.column_store_upper_bound_result: inst -> -0.95pct
cloud_storage_rpbench_cstore_bench.cs_iteration_precompute_end_test_1000: inst -> -0.29pct
cloud_storage_rpbench_cstore_bench.cs_iteration_recompute_end_test_1000: inst -> -14.84pct
cloud_storage_rpbench_cstore_bench.cs_iteration_recompute_end_test_10000: inst -> -14.97pct
hashing_bench_rpbench_murmur_hash_x86_32.iobuf: inst -> +0.41pct
wasm_transform_rpbench_Memset.Speed_100_MiB: inst -> -1.60pct
wasm_transform_rpbench_Memset.Speed_50_MiB: inst -> -0.76pct
wasm_transform_rpbench_Memset.Speed_80_MiB: inst -> -1.28pct
wasm_transform_rpbench_WasmBenchTest_BatchSize10_RecordSize1_KiB.IdentityTransform: inst -> +1.72pct
wasm_transform_rpbench_WasmBenchTest_BatchSize10_RecordSize512.IdentityTransform: inst -> +3.50pct
heartbeat_bench_rpbench_fixture.test_new_hb_reply_full: allocs -> -1.05pct
heartbeat_bench_rpbench_fixture.test_new_hb_reply_full: inst -> -0.04pct
heartbeat_bench_rpbench_fixture.test_new_hb_reply_lw: inst -> +36.68pct
heartbeat_bench_rpbench_fixture.test_new_hb_request_full: allocs -> -1.12pct
heartbeat_bench_rpbench_fixture.test_new_hb_request_full: inst -> -0.04pct
heartbeat_bench_rpbench_fixture.test_new_hb_request_lw: inst -> +15.87pct
heartbeat_bench_rpbench_fixture.test_old_hb_reply: inst -> +0.85pct
heartbeat_bench_rpbench_fixture.test_old_hb_request: inst -> +0.83pct
kafka_fetch_plan_rpbench_fetch_plan.t100p1_no_auth: inst -> -0.81pct
kafka_fetch_plan_rpbench_fetch_plan.t100p1_yes_auth: inst -> -0.24pct
kafka_fetch_plan_rpbench_fetch_plan.t1p100_no_auth: inst -> -1.87pct
kafka_fetch_plan_rpbench_fetch_plan.t1p100_yes_auth: inst -> -1.64pct
kafka_fetch_plan_rpbench_fetch_plan.t1p1_no_auth: inst -> -0.44pct
kafka_fetch_plan_rpbench_fetch_plan.t1p1_no_auth_ids: inst -> +0.10pct
kafka_fetch_plan_rpbench_fetch_plan.t1p1_yes_auth: inst -> -0.18pct
kafka_fetch_plan_rpbench_fetch_plan.t1p1_yes_auth_ids: inst -> +0.10pct
rpc_serialization_rpbench_big_10mb.serialize: inst -> +2.19pct
rpc_serialization_rpbench_big_1mb.deserialize: inst -> +0.23pct
rpc_serialization_rpbench_big_1mb.serialize: inst -> +9.76pct
rpc_serialization_rpbench_small.deserialize: inst -> +0.77pct
rpc_serialization_rpbench_small.serialize: inst -> +0.30pct
container_rpbench_VectorBenchTest_chunked_vector_int64_t_10000.Fill: inst -> -0.02pct
container_rpbench_VectorBenchTest_chunked_vector_int64_t_64.Fifo: inst -> -0.50pct
container_rpbench_VectorBenchTest_chunked_vector_int64_t_64.Fill: inst -> -0.65pct
container_rpbench_VectorBenchTest_chunked_vector_int64_t_64.Lifo: inst -> -0.50pct
container_rpbench_VectorBenchTest_chunked_vector_sstring_64.Fifo: inst -> +0.32pct
container_rpbench_VectorBenchTest_chunked_vector_sstring_64.Fill: inst -> +0.35pct
container_rpbench_VectorBenchTest_chunked_vector_sstring_64.Lifo: inst -> +0.23pct
container_rpbench_VectorBenchTest_std_vector_int64_t_10000.Fifo: inst -> -0.02pct
container_rpbench_VectorBenchTest_std_vector_int64_t_10000.Fill: inst -> -0.03pct
container_rpbench_VectorBenchTest_std_vector_int64_t_10000.Lifo: inst -> -0.03pct
container_rpbench_VectorBenchTest_std_vector_int64_t_64.Fifo: inst -> -0.61pct
container_rpbench_VectorBenchTest_std_vector_int64_t_64.Fill: inst -> -0.71pct
container_rpbench_VectorBenchTest_std_vector_int64_t_64.Lifo: inst -> -0.71pct
container_rpbench_VectorBenchTest_std_vector_sstring_64.Fifo: inst -> +0.36pct
container_rpbench_VectorBenchTest_std_vector_sstring_64.Fill: inst -> +0.37pct
container_rpbench_VectorBenchTest_std_vector_sstring_64.Lifo: inst -> +0.32pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_KiB_dispatched: allocs -> +42.71pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_KiB_dispatched: inst -> +59.23pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_KiB_produced: allocs -> +40.23pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_KiB_produced: inst -> +47.36pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_KiB_submitted: allocs -> +45.11pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_KiB_submitted: inst -> +67.02pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_dispatched: allocs -> +207.02pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_dispatched: inst -> +300.16pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_produced: allocs -> +172.04pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_produced: inst -> +176.09pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_submitted: allocs -> +213.09pct
kafka_produce_partition_rpbench_produce_partition_fixture.1_submitted: inst -> +311.10pct
kafka_produce_partition_rpbench_produce_partition_fixture.4_KiB_dispatched: allocs -> +30.73pct
kafka_produce_partition_rpbench_produce_partition_fixture.4_KiB_dispatched: inst -> +30.52pct
kafka_produce_partition_rpbench_produce_partition_fixture.4_KiB_produced: allocs -> +28.51pct
kafka_produce_partition_rpbench_produce_partition_fixture.4_KiB_produced: inst -> +24.59pct
kafka_produce_partition_rpbench_produce_partition_fixture.4_KiB_submitted: allocs -> +37.66pct
kafka_produce_partition_rpbench_produce_partition_fixture.4_KiB_submitted: inst -> +50.67pct
kafka_produce_partition_rpbench_produce_partition_fixture.8_KiB_dispatched: allocs -> +22.24pct
kafka_produce_partition_rpbench_produce_partition_fixture.8_KiB_dispatched: inst -> +53.49pct
kafka_produce_partition_rpbench_produce_partition_fixture.8_KiB_produced: allocs -> +20.63pct
kafka_produce_partition_rpbench_produce_partition_fixture.8_KiB_produced: inst -> +43.50pct
kafka_produce_partition_rpbench_produce_partition_fixture.8_KiB_submitted: allocs -> +38.03pct
kafka_produce_partition_rpbench_produce_partition_fixture.8_KiB_submitted: inst -> +97.51pct
vint_rpbench_vint_bench.make_stream: inst -> +0.84pct
crypto_bench_rpbench_openssl_perf_test.hmac_sha256_1k: inst -> -73.83pct
crypto_bench_rpbench_openssl_perf_test.hmac_sha512_1k: inst -> -0.07pct
crypto_bench_rpbench_openssl_perf_test.md5_1k: inst -> +0.03pct
crypto_bench_rpbench_openssl_perf_test.sha256_1k: inst -> -86.80pct
crypto_bench_rpbench_openssl_perf_test.sha512_1k: inst -> -0.03pct
crypto_bench_fips_rpbench_openssl_perf_test.hmac_sha256_1k: inst -> -72.47pct
crypto_bench_fips_rpbench_openssl_perf_test.hmac_sha512_1k: inst -> -0.15pct
crypto_bench_fips_rpbench_openssl_perf_test.md5_1k: inst -> +0.03pct
crypto_bench_fips_rpbench_openssl_perf_test.sha256_1k: inst -> -85.87pct
crypto_bench_fips_rpbench_openssl_perf_test.sha512_1k: inst -> -0.06pct
health_monitor_rpbench_node_health_report.deserialize_many_partitions: inst -> +1.66pct
health_monitor_rpbench_node_health_report.deserialize_many_topics: inst -> +1.53pct
health_monitor_rpbench_node_health_report.deserialize_many_topics_replicated_partitions: inst -> +1.59pct
health_monitor_rpbench_node_health_report.serialize_many_topics: inst -> +0.24pct
health_monitor_rpbench_node_health_report.serialize_many_topics_replicated_partitions: inst -> +0.14pct
coro_rpbench_coro_bench.chain_then5: allocs -> -44.80pct
coro_rpbench_coro_bench.chain_then5: inst -> -0.02pct
coro_rpbench_coro_bench.co_await_ready: inst -> -0.03pct
coro_rpbench_coro_bench.co_await_ready_collect: inst -> +5.39pct
coro_rpbench_coro_bench.empty_coro: inst -> -0.04pct
coro_rpbench_coro_bench.nested_then5: allocs -> -44.97pct
coro_rpbench_coro_bench.nested_then5: inst -> -0.03pct
coro_rpbench_coro_bench.ss_now: allocs -> -46.20pct
coro_rpbench_coro_bench.ss_now: inst -> -0.03pct
kafka_fetch_rpbench_large_fetch_t.multi_partition_fetch_version_max: allocs -> -0.07pct
kafka_fetch_rpbench_small_fetch_t.multi_partition_fetch_version_with_epoch_validation: allocs -> -0.03pct
iceberg_uri_rpbench_from_uri.s3_compat: inst -> +0.46pct
record_multiplexer_rpbench_record_multiplexer_bench_fixture.protobuf_381_byte_message_linear_40_fields: inst -> +0.05pct
record_multiplexer_rpbench_record_multiplexer_bench_fixture.protobuf_384_byte_message_nested_24_levels: inst -> +0.09pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransform: inst -> +0.14pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransformNested_100_20: inst -> -0.19pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransformNested_100_5: inst -> -0.20pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransformNested_10_1: inst -> +0.53pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransformNested_10_5: inst -> -0.12pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransformNested_20_1: inst -> +0.55pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransformNested_20_5: inst -> -0.28pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransformNested_5_1: inst -> +0.51pct
iceberg_compatibility_rpbench_StructVisitation.ApplyTransformNested_5_5: inst -> -0.09pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssign: inst -> +0.17pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssignNested_100_20: inst -> -0.07pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssignNested_100_5: inst -> -0.44pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssignNested_10_1: inst -> +0.62pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssignNested_10_5: inst -> -0.76pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssignNested_20_1: inst -> +0.65pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssignNested_20_5: inst -> -0.60pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssignNested_5_1: inst -> +0.59pct
iceberg_compatibility_rpbench_StructVisitation.BasicFieldAssignNested_5_5: inst -> -0.43pct
iobuf_rpbench_iobuf.append_bench_large: inst -> +29.25pct
iobuf_rpbench_iobuf.append_bench_medium: inst -> +18.35pct
iobuf_rpbench_iobuf.append_bench_small: inst -> +1.36pct
tx_range_manifest_rpbench_sax_deserializer.100k: inst -> +0.67pct
column_writer_rpbench_column_writer_bench.bool_type_compressed: inst -> -0.08pct
column_writer_rpbench_column_writer_bench.bool_type_uncompressed: inst -> -0.04pct
column_writer_rpbench_column_writer_bench.byte_array_type_400_byte_uncompressed: inst -> -1.81pct
column_writer_rpbench_column_writer_bench.byte_array_type_4_byte_compressed: inst -> +0.18pct
column_writer_rpbench_column_writer_bench.byte_array_type_4_byte_uncompressed: inst -> +0.32pct
column_writer_rpbench_column_writer_bench.f32_type_compressed: inst -> -0.07pct
column_writer_rpbench_column_writer_bench.f32_type_uncompressed: inst -> -0.06pct
column_writer_rpbench_column_writer_bench.f64_type_compressed: inst -> -0.04pct
column_writer_rpbench_column_writer_bench.f64_type_uncompressed: inst -> -0.03pct
column_writer_rpbench_column_writer_bench.i32_type_compressed: inst -> -0.07pct
column_writer_rpbench_column_writer_bench.i32_type_uncompressed: inst -> -0.06pct
column_writer_rpbench_column_writer_bench.i64_type_compressed: inst -> -0.04pct
column_writer_rpbench_column_writer_bench.i64_type_uncompressed: inst -> -0.02pct
api_rpbench_sr_bench_fixture.lookup_x1_1: allocs -> -0.01pct
api_rpbench_sr_bench_fixture.post_x1_1: allocs -> -0.01pct
parse_rpbench_json_parse_test.binary: inst -> +0.09pct
async_algorithm_rpbench_algo_bench.async_for_each_big: inst -> -0.02pct
async_algorithm_rpbench_algo_bench.coro_maybe_yield_loop_big: inst -> -0.04pct
async_algorithm_rpbench_algo_bench.maybe_yield_loop_big: allocs -> -0.25pct
async_algorithm_rpbench_algo_bench.maybe_yield_loop_big: inst -> -0.03pct
async_algorithm_rpbench_algo_bench.std_for_each_big: allocs -> -0.03pct
async_algorithm_rpbench_algo_bench.std_for_each_big: inst -> -7.41pct
async_algorithm_rpbench_algo_bench.sync_std_for_each_big: allocs -> -25.67pct
parser_rpbench_json_parser_bench.rapidjson: inst -> +0.95pct
parser_rpbench_json_parser_bench.serde: inst -> +0.35pct
string_rpbench_string_parser.1MiB_unicode: inst -> +0.06pct

See https://redpandadata.atlassian.net/wiki/x/LQAqLg for docs

@StephanDollberg
Copy link
Copy Markdown
Member

Note there will be a lot of noise in these as we just changed instance type for microbenches. The kafka produce ones are probably real though. Best to confirm locally.

@rockwotj
Copy link
Copy Markdown
Contributor

I think the change is mostly because the benchmark did not previously measure the work we were doing in the Kafka protocol layer to iterate over the batches.

@rockwotj
Copy link
Copy Markdown
Contributor

Maybe e2e benchmarks is what we want? I guess we can see how those shake out in the nightly rights right @StephanDollberg? Is there a way to do the OMB benchmarking as a slash command?

@WillemKauf
Copy link
Copy Markdown
Contributor Author

WillemKauf commented Sep 15, 2025

bazel run --config=release //src/v/kafka/server/tests:kafka_produce_partition_rpbench, before and after, ran locally on my machine:

Before

test                                        iterations      median         mad         min         max      allocs       tasks        inst      cycles
produce_partition_fixture.1_submitted            53567     3.510us     0.000ns     3.510us     3.510us      96.759       0.000     31427.8     16253.6
produce_partition_fixture.1_KiB_submitted         2592    29.482us     0.000ns    29.482us    29.482us     468.587       0.000    161112.7    117567.9
produce_partition_fixture.4_KiB_submitted          686    71.870us     0.000ns    71.870us    71.870us     641.249       0.000    184896.3    362959.4
produce_partition_fixture.8_KiB_submitted          346   129.756us     0.000ns   129.756us   129.756us     863.829       0.000    241310.9    671726.5
produce_partition_fixture.1_dispatched           52992     3.520us     0.000ns     3.520us     3.520us      97.011       0.067     31495.9     16600.8
produce_partition_fixture.1_KiB_dispatched        2624    26.351us     0.000ns    26.351us    26.351us     472.426       1.381    165246.8    134053.1
produce_partition_fixture.4_KiB_dispatched         687    83.723us     0.000ns    83.723us    83.723us     663.881      14.870    201365.1    399629.7
produce_partition_fixture.8_KiB_dispatched         346   176.524us     0.000ns   176.524us   176.524us     920.910      47.751    280598.9    776424.3
produce_partition_fixture.1_produced             53690     9.271us     0.000ns     9.271us     9.271us     116.117      39.364     55521.2     28706.0
produce_partition_fixture.1_KiB_produced          2621    53.113us     0.000ns    53.113us    53.113us     493.312      71.718    201828.3    160024.4
produce_partition_fixture.4_KiB_produced           686   171.362us     0.000ns   171.362us   171.362us     691.574     168.948    279893.0    470416.3
produce_partition_fixture.8_KiB_produced           347   344.720us     0.000ns   344.720us   344.720us     956.686     297.461    416435.6    903877.9

After

test                                        iterations      median         mad         min         max      allocs       tasks        inst      cycles
produce_partition_fixture.1_submitted            43213     7.520us     0.000ns     7.520us     7.520us     297.206       0.000    125819.8     37329.4
produce_partition_fixture.1_KiB_submitted         2545    34.937us     0.000ns    34.937us    34.937us     663.838       0.000    270507.4    146428.4
produce_partition_fixture.4_KiB_submitted          681    80.163us     0.000ns    80.163us    80.163us     838.828       0.000    289754.1    408927.5
produce_partition_fixture.8_KiB_submitted          344   137.049us     0.000ns   137.049us   137.049us    1034.544       0.000    350855.8    698763.5
produce_partition_fixture.1_dispatched           43926     7.425us     0.000ns     7.425us     7.425us     298.023       0.094    126256.9     37190.6
produce_partition_fixture.1_KiB_dispatched        2581    31.746us     0.000ns    31.746us    31.746us     673.347       1.698    275771.2    161612.5
produce_partition_fixture.4_KiB_dispatched         682    94.695us     0.000ns    94.695us    94.695us     864.642      16.491    306432.5    447595.4
produce_partition_fixture.8_KiB_dispatched         345   191.242us     0.000ns   191.242us   191.242us    1120.814      51.078    393689.0    844941.7
produce_partition_fixture.1_produced             44815    13.155us     0.000ns    13.155us    13.155us     317.160      39.419    150126.5     48958.2
produce_partition_fixture.1_KiB_produced          2588    59.333us     0.000ns    59.333us    59.333us     694.475      71.921    315944.7    189996.8
produce_partition_fixture.4_KiB_produced           682   182.601us     0.000ns   182.601us   182.601us     892.974     169.116    384790.6    523774.2
produce_partition_fixture.8_KiB_produced           345   367.543us     0.000ns   367.543us   367.543us    1157.768     297.551    522419.7   1008136.0

@WillemKauf
Copy link
Copy Markdown
Contributor Author

Worth noting that this benchmark does not test the now compressed path (max_timestamp is set in the produced batches), this extra cost is perhaps from the previously unmeasured iteration (as Tyler notes).

@rockwotj
Copy link
Copy Markdown
Contributor

If we had a benchmark that measured the work done in kafka_batch_adapter and the produce path, I'd expect that delta to be much more reasonable.

@rockwotj
Copy link
Copy Markdown
Contributor

@WillemKauf if you change the benchmark to use compressed batches with the default cluster config I'd expect the changes to go back to normal. Can you confirm?

@StephanDollberg
Copy link
Copy Markdown
Member

Maybe e2e benchmarks is what we want? I guess we can see how those shake out in the nightly rights right @StephanDollberg? Is there a way to do the OMB benchmarking as a slash command?

Slash command OMB is not a yet thing.

@WillemKauf
Copy link
Copy Markdown
Contributor Author

WillemKauf commented Sep 15, 2025

use compressed batches with the default cluster config

I assume you mean compressed batches with legacy cluster config. Here are those results:

After with diff:

ss::future<>
produce_partition_fixture::run_test(size_t data_size, measured_region region) {
    BOOST_TEST_CHECKPOINT("HERE");
+   scoped_config cfg;
+   cfg.get("kafka_produce_batch_validation")
+    .set_value(model::kafka_batch_validation_mode::legacy);
    model::topic_partition tp = model::topic_partition(
      t, model::partition_id(0));

    storage::record_batch_builder builder(
      model::record_batch_type::raft_data, model::offset{0});

    constexpr size_t num_records = 100;
    for (size_t i = 0; i < num_records; ++i) {
        builder.add_raw_kv(iobuf{}, rand_iobuf(data_size));
    }
+   builder.set_compression(model::compression::lz4);
    auto batch = std::move(builder).build();
test                                        iterations      median         mad         min         max      allocs       tasks        inst      cycles
produce_partition_fixture.1_submitted            47819     3.448us     0.000ns     3.448us     3.448us      93.629       0.000     30724.4     16247.5
produce_partition_fixture.1_KiB_submitted         2496    16.487us     0.000ns    16.487us    16.487us     173.260       0.000     49381.5     83222.3
produce_partition_fixture.4_KiB_submitted          670    57.014us     0.000ns    57.014us    57.014us     357.349       0.000     98229.7    290247.8
produce_partition_fixture.8_KiB_submitted          337   107.950us     0.000ns   107.950us   107.950us     596.018       0.000    164555.6    562021.7
produce_partition_fixture.1_dispatched           48175     3.496us     0.000ns     3.496us     3.496us      93.839       0.061     30820.0     16506.8
produce_partition_fixture.1_KiB_dispatched        2540    17.853us     0.000ns    17.853us    17.853us     175.319       1.056     50845.1     89510.6
produce_partition_fixture.4_KiB_dispatched         672    71.034us     0.000ns    71.034us    71.034us     372.649      18.542    110561.9    316201.7
produce_partition_fixture.8_KiB_dispatched         337   143.879us     0.000ns   143.879us   143.879us     639.519      40.092    189628.7    633117.4
produce_partition_fixture.1_produced             47857     9.549us     0.000ns     9.549us     9.549us     112.973      39.296     55997.9     29780.0
produce_partition_fixture.1_KiB_produced          2534    45.168us     0.000ns    45.168us    45.168us     195.693      71.230     89571.4    116596.8
produce_partition_fixture.4_KiB_produced           671   158.839us     0.000ns   158.839us   158.839us     401.052     169.653    183224.0    392845.3
produce_partition_fixture.8_KiB_produced           338   322.000us     0.000ns   322.000us   322.000us     674.553     296.775    317244.3    782710.2

@rockwotj
Copy link
Copy Markdown
Contributor

I'm a bit confused how those results are better than the baseline... Anyways, my recommendation is as follows:

  • We merge this PR as is
  • Watch the OMB benchmarks closely
  • After a few days we backport the changes to 25.2.x for cloud clusters. I don't think we need the 25.1.x backport unless someone asks? This also prevents the issue with cluster configs Willem noted in slack.

@WillemKauf
Copy link
Copy Markdown
Contributor Author

We merge this PR as is

:shipit:

@rockwotj rockwotj merged commit fe4ef27 into redpanda-data:dev Sep 15, 2025
25 checks passed
@vbotbuildovich
Copy link
Copy Markdown
Collaborator

/backport v25.2.x

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

Failed to create a backport PR to v25.2.x branch. I tried:

git remote add upstream https://github.com/redpanda-data/redpanda.git
git fetch --all
git checkout -b backport-pr-27529-v25.2.x-331 remotes/upstream/v25.2.x
git cherry-pick -x 146f1b1c1d 7f148e955c 58731ae695 c87baf0fd5

Workflow run logs.

@StephanDollberg
Copy link
Copy Markdown
Member

StephanDollberg commented Sep 16, 2025

Watch the OMB benchmarks closely

Note the OMB benchmarks don't use compressed batches which to my understanding is the main regression path (besides from the normal one shown in the microbenchmark but we think that's only because of it now measuring more of what normally is on the produce path)?

@rockwotj
Copy link
Copy Markdown
Contributor

Watch the OMB benchmarks closely

Note the OMB benchmarks don't use compressed batches which to my understanding is the main regression path (besides from the normal one shown in the microbenchmark but we think that's only because of it now measuring more of what normally is on the produce path)?

I mean the java client also sets max timestamp so it also won't hit a perf impact unless strict validation is enabled. I mostly want to ensure that "well behaved clients" don't see a perf impact.

@c4milo
Copy link
Copy Markdown
Contributor

c4milo commented Sep 16, 2025

@WillemKauf, is it accurate to say this PR implements KIP-31 & KIP-32?

@WillemKauf
Copy link
Copy Markdown
Contributor Author

is it accurate to say this PR implements KIP-31 & KIP-32?

No, it is not.

@c4milo
Copy link
Copy Markdown
Contributor

c4milo commented Sep 16, 2025

@WillemKauf, sorry, I should have posted a link: https://issues.apache.org/jira/browse/KAFKA-2511

@rockwotj
Copy link
Copy Markdown
Contributor

No it's not the same as that issue either. This is adding validation that isn't in a KIP but mirrors upstream Apache Kafka.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants