Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,21 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::user},
1h,
{.min = 0ms, .max = serde::max_serializable_ms})
, kafka_produce_batch_validation(
*this,
"kafka_produce_batch_validation",
"Controls the level of validation performed on batches produced to "
"Redpanda. When set to `legacy`, there is minimal validation performed "
"on the produce path. When set to `relaxed`, full validation is "
"performed on uncompressed batches and on compressed batches with the "
"`max_timestamp` value left unset. When set to `strict`, full validation "
"of uncompressed and compressed batches is performed. This should be the "
"default in environments where producing clients are not trusted.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
model::kafka_batch_validation_mode::relaxed,
{model::kafka_batch_validation_mode::legacy,
model::kafka_batch_validation_mode::relaxed,
model::kafka_batch_validation_mode::strict})
, log_compression_type(
*this,
"log_compression_type",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ struct configuration final : public config_store {
log_message_timestamp_before_max_ms;
bounded_property<std::chrono::milliseconds>
log_message_timestamp_after_max_ms;
enum_property<model::kafka_batch_validation_mode>
kafka_produce_batch_validation;
enum_property<model::compression> log_compression_type;
property<size_t> fetch_max_bytes;
property<bool> use_fetch_scheduler_group;
Expand Down
17 changes: 17 additions & 0 deletions src/v/config/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -741,4 +741,21 @@ struct convert<config::audit_failure_policy> {
}
};

template<>
struct convert<model::kafka_batch_validation_mode> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so much boilerplate 😢

Thanks for doing this!

using type = model::kafka_batch_validation_mode;

static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); }

static bool decode(const Node& node, type& rhs) {
auto value = node.as<std::string>();
auto mode = model::kafka_batch_validation_mode_from_string(value);
if (!mode) {
return false;
}
rhs = mode.value();
return true;
}
};

} // namespace YAML
4 changes: 4 additions & 0 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,10 @@ consteval std::string_view property_type_name() {
return "string";
} else if constexpr (std::is_same_v<type, config::audit_failure_policy>) {
return "string";
} else if constexpr (std::is_same_v<
type,
model::kafka_batch_validation_mode>) {
return "string";
} else {
static_assert(
base::unsupported_type<T>::value, "Type name not defined");
Expand Down
6 changes: 6 additions & 0 deletions src/v/config/rjson_serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,10 @@ void rjson_serialize(
stringize(w, policy);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w,
const model::kafka_batch_validation_mode& m) {
stringize(w, m);
}

} // namespace json
3 changes: 3 additions & 0 deletions src/v/config/rjson_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,7 @@ void rjson_serialize(
void rjson_serialize(
json::Writer<json::StringBuffer>&, config::audit_failure_policy);

void rjson_serialize(
json::Writer<json::StringBuffer>&, const model::kafka_batch_validation_mode&);

} // namespace json
34 changes: 0 additions & 34 deletions src/v/kafka/protocol/kafka_batch_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,40 +188,6 @@ iobuf kafka_batch_adapter::adapt(iobuf&& kbatch) {
auto new_batch = model::record_batch(
header, std::move(records), model::record_batch::tag_ctor_ng{});

/**
* Perform some type of validation on the uncompressed input. In this case
* we make sure that the records can be materialized but we avoid
* re-encoding them using the lazy-record optimization.
*
* TODO(kafka): we should also be validating compressed batches - the
* reference implementation does this, and this also means they are able to
* set the max_timestamp when using timestamp_type::create_time.
*/
if (!new_batch.compressed()) {
int64_t max_timestamp_delta = 0;
try {
new_batch.for_each_record([&max_timestamp_delta](model::record r) {
max_timestamp_delta = std::max(
r.timestamp_delta(), max_timestamp_delta);
});
} catch (const std::exception& e) {
vlog(klog.error, "Parsing uncompressed records: {}", e.what());
return remainder;
}
// If using append_time, we'll override the timestamp in the produce
// handler. Otherwise if the client does not set the max_timestamp we
// need to so that timequeries work correctly.
if (
header.max_timestamp == model::timestamp::missing()
&& header.attrs.timestamp_type()
== model::timestamp_type::create_time) {
model::timestamp max_timestamp(
header.first_timestamp() + max_timestamp_delta);
new_batch.set_max_timestamp(
model::timestamp_type::create_time, max_timestamp);
}
}
Comment on lines -191 to -223
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC our kafka client uses this logic too. I think this is OK to remove for our use cases outside of DR. In DR we probably want this validation, etc. Can we flag to the DR team to look into this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit of code is removed but the logic of setting timestamps in the uncompressed case is still present in produce_validation.cc L243-244 IIRC (apologies, on phone so linking is difficult)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now what I am saying is that when consuming our kafka client was using this code to do some light validation. Now that this code is moved into the produce handler our consume API handling in the kafka client no longer has any of this validation applied. I will post in the DR channel about this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, that totally went over my head. I see what you are referring to now.

Happy to fix this in this PR if it's a regression.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's OK, let's leave as is.


batch = std::move(new_batch);
return remainder;
}
Expand Down
3 changes: 3 additions & 0 deletions src/v/kafka/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ redpanda_cc_library(
"handlers/metadata.cc",
"handlers/offset_for_leader_epoch.cc",
"handlers/produce.cc",
"handlers/produce_validation.cc",
"handlers/txn_offset_commit.cc",
"member.cc",
"protocol_utils.cc",
Expand Down Expand Up @@ -206,6 +207,7 @@ redpanda_cc_library(
"handlers/offset_fetch.h",
"handlers/offset_for_leader_epoch.h",
"handlers/produce.h",
"handlers/produce_validation.h",
"handlers/sasl_authenticate.h",
"handlers/sasl_handshake.h",
"handlers/sync_group.h",
Expand Down Expand Up @@ -339,6 +341,7 @@ redpanda_cc_library(
"//src/v/kafka/protocol/schemata:produce_request",
"//src/v/metrics",
"//src/v/model",
"//src/v/model:batch_compression",
"//src/v/net",
"//src/v/pandaproxy/schema_registry:config",
"//src/v/pandaproxy/schema_registry:server",
Expand Down
113 changes: 17 additions & 96 deletions src/v/kafka/server/handlers/produce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "kafka/data/replicated_partition.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/kafka_batch_adapter.h"
#include "kafka/server/handlers/produce_validation.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
Expand All @@ -36,6 +37,9 @@
#include <fmt/ostream.h>

#include <chrono>
#include <exception>
#include <expected>
#include <functional>

namespace kafka {
namespace {
Expand Down Expand Up @@ -201,98 +205,14 @@ ss::future<produce_response::partition> finalize_request_with_error_code(
.error_message = std::move(err_msg)});
}

/**
* @brief Validate the timestamps of the batch, they have to be within a window
* to the broker's time. returns an optional containing an error message if the
* batch should be rejected, or may set the passed `batch`'s `max_timestamp` to
* the current timestamp if using `AppendTime`.
*/
auto validate_batch_timestamps(
const model::ntp& ntp,
model::record_batch& batch,
model::timestamp_type timestamp_type,
std::chrono::milliseconds message_timestamp_before_max_ms,
std::chrono::milliseconds message_timestamp_after_max_ms,
kafka::kafka_probe& probe) -> std::optional<ss::sstring> {
auto broker_time = model::timestamp::now();
const auto& header = batch.header();
/*
* For append time setting we have to recompute
* the CRC.
*/
if (timestamp_type == model::timestamp_type::append_time) {
batch.set_max_timestamp(
model::timestamp_type::append_time, broker_time);
return std::nullopt;
}

// `message.timestamp_type` is `CreateTime`. Validate record timestamps
// using `message.timestamp.before.max.ms` and
// `message.timestamp.after.max.ms`.

// reject if first_timestamp is too far in the past
if (
broker_time > header.first_timestamp
&& (broker_time - header.first_timestamp)
> model::timestamp(message_timestamp_before_max_ms.count())) {
thread_local static ss::logger::rate_limit rate(despam_interval);
klog.log(
ss::log_level::warn,
rate,
"produce request timestamp for {} was before the threshold set by "
"message.timestamp.before.max.ms: {}, broker time: {}, timestamp: "
"{}. Rejecting batch {}",
ntp,
message_timestamp_before_max_ms,
broker_time,
header.first_timestamp,
header);
probe.produce_bad_create_time();
return ssx::sformat(
"Timestamp {} of message with offset {} is out of range. The "
"timestamp should be within [{}, {}] of the broker time.",
header.first_timestamp,
header.base_offset,
message_timestamp_before_max_ms,
message_timestamp_after_max_ms);
}

// reject if max_timestamp is too far in the future.
if (
broker_time < header.max_timestamp
&& (header.max_timestamp - broker_time)
> model::timestamp(message_timestamp_after_max_ms.count())) {
thread_local static ss::logger::rate_limit rate(despam_interval);
klog.log(
ss::log_level::warn,
rate,
"produce request timestamp for {} was past the threshold set by "
"message.timestamp.after.max.ms: {}, broker time: {}, timestamp: {}. "
"Rejecting batch {}",
ntp,
message_timestamp_after_max_ms,
broker_time,
header.max_timestamp,
header);
probe.produce_bad_create_time();
return ssx::sformat(
"Timestamp {} of message with offset {} is out of range. The "
"timestamp should be within [{}, {}] of the broker time.",
header.max_timestamp,
header.last_offset(),
message_timestamp_before_max_ms,
message_timestamp_after_max_ms);
}

return std::nullopt;
}
struct topic_configuration_context {
size_t batch_max_bytes;
model::timestamp_type timestamp_type;
std::chrono::milliseconds message_timestamp_before_max_ms;
std::chrono::milliseconds message_timestamp_after_max_ms;
const cluster::topic_properties* properties;
};

/**
* \brief handle writing to a single topic partition.
*/
Expand Down Expand Up @@ -321,21 +241,21 @@ partition_produce_stages produce_topic_partition(
auto batch = std::make_unique<model::record_batch>(
std::move(part.records->adapter.batch.value()));

// validate the batch timestamps by checking skew against broker time
auto validate_timestamp = validate_batch_timestamps(
ntp,
*batch,
cfg_ctx.timestamp_type,
cfg_ctx.message_timestamp_before_max_ms,
cfg_ctx.message_timestamp_after_max_ms,
octx.rctx.probe());
if (validate_timestamp.has_value()) {
auto validate_batch_res = validate_batch(
{.batch = *batch,
.timestamp_type = cfg_ctx.timestamp_type,
.message_timestamp_before_max_ms
= cfg_ctx.message_timestamp_before_max_ms,
.message_timestamp_after_max_ms = cfg_ctx.message_timestamp_after_max_ms,
.probe = octx.rctx.probe()});

if (validate_batch_res.has_value()) {
auto dispatch_f = ss::now();
auto f = ss::make_ready_future<produce_response::partition>(
produce_response::partition{
.partition_index = ntp.tp.partition,
.error_code = error_code::invalid_timestamp,
.error_message = std::move(validate_timestamp).value()});
.error_code = validate_batch_res->err,
.error_message = std::move(validate_batch_res->msg)});
return partition_produce_stages{
.dispatched = std::move(dispatch_f), .produced = std::move(f)};
}
Expand Down Expand Up @@ -656,6 +576,7 @@ std::vector<topic_produce_stages> produce_topics(produce_ctx& octx) {

return topics;
}

} // namespace

produce_response produce_request::make_error_response(
Expand Down
Loading