Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/v/cluster_link/group_mirroring_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,14 @@ build_offset_commit_request(group_mirroring_task::group_offsets g_offsets) {
template<typename ApiT>
kafka::api_version
get_max_supported(kafka::client::api_version_range remote_supported) {
return std::min(remote_supported.max, ApiT::max_valid);
constexpr auto client_supported_max = [] {
if constexpr (std::same_as<ApiT, kafka::offset_fetch_api>) {
return kafka::api_version{7};
} else {
return ApiT::max_valid;
}
}();
return std::min(remote_supported.max, client_supported_max);
}

kafka::list_groups_request make_list_groups_request() {
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster_link/tests/group_mirroring_task_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ class group_mirroring_task_test : public seastar_test {

auto it = source_cluster_group.find(of_req.data.group_id);
if (it == source_cluster_group.end()) {
co_return kafka::offset_fetch_response(of_req.data.topics);
co_return kafka::offset_fetch_response(
std::move(of_req.data.topics));
}

if (it->second.coordinator_id != id) {
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,11 @@ client::consumer_assignment(const group_id& g_id, const member_id& name) {
ss::future<offset_fetch_response> client::consumer_offset_fetch(
const group_id& g_id,
const member_id& name,
std::vector<offset_fetch_request_topic> topics) {
chunked_vector<offset_fetch_request_topic> topics) {
return get_consumer(g_id, name)
.then([this, topics{std::move(topics)}](shared_consumer_t c) mutable {
return gated_retry_with_mitigation([c, topics{std::move(topics)}]() {
return c->offset_fetch(topics);
return c->offset_fetch(topics.copy());
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class client {
ss::future<offset_fetch_response> consumer_offset_fetch(
const group_id& g_id,
const member_id& m_id,
std::vector<offset_fetch_request_topic> topics);
chunked_vector<offset_fetch_request_topic> topics);

ss::future<offset_commit_response> consumer_offset_commit(
const group_id& g_id,
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/client/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,11 @@ ss::future<describe_groups_response> consumer::describe_group() {
}

ss::future<offset_fetch_response>
consumer::offset_fetch(std::vector<offset_fetch_request_topic> topics) {
consumer::offset_fetch(chunked_vector<offset_fetch_request_topic> topics) {
refresh_inactivity_timer();
auto req_builder = [topics{std::move(topics)}, group_id{_group_id}]() {
return offset_fetch_request{
.data{.group_id = group_id, .topics = topics}};
.data{.group_id = group_id, .topics = topics.copy()}};
};
return req_res(std::move(req_builder))
.then([this](offset_fetch_response res) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class consumer final : public ss::enable_lw_shared_from_this<consumer> {
ss::future<leave_group_response> leave();
ss::future<> subscribe(chunked_vector<model::topic> topics);
ss::future<offset_fetch_response>
offset_fetch(std::vector<offset_fetch_request_topic> topics);
offset_fetch(chunked_vector<offset_fetch_request_topic> topics);
ss::future<offset_commit_response>
offset_commit(chunked_vector<offset_commit_request_topic> topics);
ss::future<fetch_response>
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/client/test/consumer_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@

namespace {

std::vector<kafka::offset_fetch_request_topic>
chunked_vector<kafka::offset_fetch_request_topic>
offset_request_from_assignment(kc::assignment assignment) {
auto topics = std::vector<kafka::offset_fetch_request_topic>{};
auto topics = chunked_vector<kafka::offset_fetch_request_topic>{};
topics.reserve(assignment.size());
std::transform(
std::make_move_iterator(assignment.begin()),
Expand Down
58 changes: 39 additions & 19 deletions src/v/kafka/protocol/offset_fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,55 @@ struct offset_fetch_response final {

offset_fetch_response_data data;

offset_fetch_response() = default;

offset_fetch_response(error_code error) { data.error_code = error; }

offset_fetch_response(const offset_fetch_request&, error_code error)
: offset_fetch_response(error) {}

offset_fetch_response(
std::optional<std::vector<offset_fetch_request_topic>>& topics) {
data.error_code = error_code::none;
if (topics) {
template<typename Topics>
static auto get_topics(auto topics) {
Topics result;
if (topics.has_value()) {
result.reserve(topics->size());
for (auto& topic : *topics) {
chunked_vector<offset_fetch_response_partition> partitions;
decltype(Topics::value_type::partitions) partitions;
partitions.reserve(topic.partition_indexes.size());
for (auto id : topic.partition_indexes) {
offset_fetch_response_partition p = {
partitions.push_back({
.partition_index = id,
.committed_offset = model::offset(-1),
.committed_leader_epoch = kafka::leader_epoch{-1},
.metadata = "",
.error_code = error_code::none,
};
partitions.push_back(std::move(p));
});
}
offset_fetch_response_topic t = {
.name = topic.name,
result.push_back({
.name = std::move(topic.name),
.partitions = std::move(partitions),
};
data.topics.push_back(std::move(t));
});
}
}
return result;
}

static offset_fetch_response_group
make_group(offset_fetch_request_group request) {
using topics = decltype(offset_fetch_response_group::topics);
return offset_fetch_response_group{
.group_id = std::move(request.group_id),
.topics = get_topics<topics>(std::move(request.topics)),
.error_code = error_code::none};
}

offset_fetch_response() = default;

explicit offset_fetch_response(error_code error) {
data.error_code = error;
}

explicit offset_fetch_response(
const offset_fetch_request&, error_code error)
: offset_fetch_response(error) {}

explicit offset_fetch_response(
std::optional<chunked_vector<offset_fetch_request_topic>> topics) {
data.error_code = error_code::none;
data.topics = get_topics<decltype(data.topics)>(std::move(topics));
}

void encode(protocol::encoder& writer, api_version version) {
Expand Down
27 changes: 22 additions & 5 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,30 @@
"OffsetFetchRequestData": {
"Topics": {
"PartitionIndexes": ("model::partition_id", "int32"),
}
},
"Groups": {
"Topics": {
"PartitionIndexes": ("model::partition_id", "int32"),
}
},
},
"OffsetFetchResponseData": {
"Topics": {
"Partitions": {
"PartitionIndex": ("model::partition_id", "int32"),
"CommittedOffset": ("model::offset", "int64"),
"CommittedLeaderEpoch": ("kafka::leader_epoch", "int32"),
},
}
}
},
"Groups": {
"Topics": {
"Partitions": {
"PartitionIndex": ("model::partition_id", "int32"),
"CommittedOffset": ("model::offset", "int64"),
"CommittedLeaderEpoch": ("kafka::leader_epoch", "int32"),
}
}
},
},
"OffsetCommitRequestData": {
"Topics": {
Expand Down Expand Up @@ -567,7 +581,6 @@
"metadata_response_partition": "chunked_vector",
"metadata_response_topic": "chunked_vector",
"partition_data": "chunked_vector",
"offset_fetch_response_partition": "chunked_vector",
"int32_t": "std::vector",
"model::node_id": "std::vector",
"model::partition_id": "std::vector",
Expand All @@ -577,7 +590,6 @@
"createable_topic_config": "std::vector",
"creatable_topic_configs": "std::vector",
"creatable_replica_assignment": "std::vector",
"offset_fetch_request_topic": "std::vector",
"partition_produce_response": "std::vector",
"creatable_acl_result": "std::vector",
"offset_delete_request_partition": "std::vector",
Expand All @@ -599,9 +611,14 @@ def make_context_field(path):
# a listing of expected struct types
STRUCT_TYPES = [
"ApiVersion",
"OffsetFetchRequestGroup",
"OffsetFetchRequestTopic",
"OffsetFetchRequestTopics",
"OffsetFetchResponseGroup",
"OffsetFetchResponseTopic",
"OffsetFetchResponseTopics",
"OffsetFetchResponsePartition",
"OffsetFetchResponsePartitions",
"OffsetCommitRequestTopic",
"OffsetCommitRequestPartition",
"OffsetCommitResponseTopic",
Expand Down
27 changes: 21 additions & 6 deletions src/v/kafka/protocol/schemata/offset_fetch_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
{
"apiKey": 9,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "OffsetFetchRequest",
// In version 0, the request read offsets from ZK.
//
Expand All @@ -30,19 +31,33 @@
// Version 6 is the first flexible version.
//
// Version 7 is adding the require stable flag.
"validVersions": "0-7",
//
// Version 8 is adding support for fetching offsets for multiple groups at a time
"validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
"about": "The group to fetch offsets for." },
{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+",
{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7",
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
"about": "The topic name."},
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
"about": "The partition indexes we would like to fetch offsets for." }
]},
{ "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
"about": "Each group we would like to fetch offsets for", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID."},
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
"about": "The topic name."},
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
"about": "The partition indexes we would like to fetch offsets for." }
]}
]},
{"name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
"about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partition."}
"about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."}
]
}
49 changes: 38 additions & 11 deletions src/v/kafka/protocol/schemata/offset_fetch_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,57 @@
// Version 6 is the first flexible version.
//
// Version 7 adds pending offset commit as new error response on partition level.
"validVersions": "0-7",
//
// Version 8 is adding support for fetching offsets for multiple groups
"validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+",
{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7",
"about": "The responses per topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+",
{ "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7",
"about": "The responses per partition", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
{ "name": "PartitionIndex", "type": "int32", "versions": "0-7",
"about": "The partition index." },
{ "name": "CommittedOffset", "type": "int64", "versions": "0+",
{ "name": "CommittedOffset", "type": "int64", "versions": "0-7",
"about": "The committed message offset." },
{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1",
{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1",
"ignorable": true, "about": "The leader epoch." },
{ "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+",
{ "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7",
"about": "The partition metadata." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
{ "name": "ErrorCode", "type": "int16", "versions": "0-7",
"about": "The error code, or 0 if there was no error." }
]}
]},
{ "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable": true,
"about": "The top-level error code, or 0 if there was no error." }
{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true,
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
"about": "The responses per group id.", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID." },
{ "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
"about": "The responses per topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+",
"about": "The responses per partition", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "8+",
"about": "The partition index." },
{ "name": "CommittedOffset", "type": "int64", "versions": "8+",
"about": "The committed message offset." },
{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "8+", "default": "-1",
"ignorable": true, "about": "The leader epoch." },
{ "name": "Metadata", "type": "string", "versions": "8+", "nullableVersions": "8+",
"about": "The partition metadata." },
{ "name": "ErrorCode", "type": "int16", "versions": "8+",
"about": "The partition-level error code, or 0 if there was no error." }
]}
]},
{ "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0",
"about": "The group-level error code, or 0 if there was no error." }
]}
]
}
Loading