diff --git a/src/v/cluster_link/group_mirroring_task.cc b/src/v/cluster_link/group_mirroring_task.cc index 28c6aa7d9c798..e87a74259df4b 100644 --- a/src/v/cluster_link/group_mirroring_task.cc +++ b/src/v/cluster_link/group_mirroring_task.cc @@ -313,7 +313,14 @@ build_offset_commit_request(group_mirroring_task::group_offsets g_offsets) { template kafka::api_version get_max_supported(kafka::client::api_version_range remote_supported) { - return std::min(remote_supported.max, ApiT::max_valid); + constexpr auto client_supported_max = [] { + if constexpr (std::same_as) { + return kafka::api_version{7}; + } else { + return ApiT::max_valid; + } + }(); + return std::min(remote_supported.max, client_supported_max); } kafka::list_groups_request make_list_groups_request() { diff --git a/src/v/cluster_link/tests/group_mirroring_task_test.cc b/src/v/cluster_link/tests/group_mirroring_task_test.cc index 1e50de67118be..925f7e406e176 100644 --- a/src/v/cluster_link/tests/group_mirroring_task_test.cc +++ b/src/v/cluster_link/tests/group_mirroring_task_test.cc @@ -326,7 +326,8 @@ class group_mirroring_task_test : public seastar_test { auto it = source_cluster_group.find(of_req.data.group_id); if (it == source_cluster_group.end()) { - co_return kafka::offset_fetch_response(of_req.data.topics); + co_return kafka::offset_fetch_response( + std::move(of_req.data.topics)); } if (it->second.coordinator_id != id) { diff --git a/src/v/kafka/client/client.cc b/src/v/kafka/client/client.cc index 181444fe7ff1f..e531245e5896a 100644 --- a/src/v/kafka/client/client.cc +++ b/src/v/kafka/client/client.cc @@ -604,11 +604,11 @@ client::consumer_assignment(const group_id& g_id, const member_id& name) { ss::future client::consumer_offset_fetch( const group_id& g_id, const member_id& name, - std::vector topics) { + chunked_vector topics) { return get_consumer(g_id, name) .then([this, topics{std::move(topics)}](shared_consumer_t c) mutable { return gated_retry_with_mitigation([c, topics{std::move(topics)}]() { - return c->offset_fetch(topics); + return c->offset_fetch(topics.copy()); }); }); } diff --git a/src/v/kafka/client/client.h b/src/v/kafka/client/client.h index 3ccb644faf49a..8b195d1419425 100644 --- a/src/v/kafka/client/client.h +++ b/src/v/kafka/client/client.h @@ -147,7 +147,7 @@ class client { ss::future consumer_offset_fetch( const group_id& g_id, const member_id& m_id, - std::vector topics); + chunked_vector topics); ss::future consumer_offset_commit( const group_id& g_id, diff --git a/src/v/kafka/client/consumer.cc b/src/v/kafka/client/consumer.cc index ff6d558d4a706..4ed82f7c13223 100644 --- a/src/v/kafka/client/consumer.cc +++ b/src/v/kafka/client/consumer.cc @@ -369,11 +369,11 @@ ss::future consumer::describe_group() { } ss::future -consumer::offset_fetch(std::vector topics) { +consumer::offset_fetch(chunked_vector topics) { refresh_inactivity_timer(); auto req_builder = [topics{std::move(topics)}, group_id{_group_id}]() { return offset_fetch_request{ - .data{.group_id = group_id, .topics = topics}}; + .data{.group_id = group_id, .topics = topics.copy()}}; }; return req_res(std::move(req_builder)) .then([this](offset_fetch_response res) { diff --git a/src/v/kafka/client/consumer.h b/src/v/kafka/client/consumer.h index 131f83b723d07..b640f561546c4 100644 --- a/src/v/kafka/client/consumer.h +++ b/src/v/kafka/client/consumer.h @@ -74,7 +74,7 @@ class consumer final : public ss::enable_lw_shared_from_this { ss::future leave(); ss::future<> subscribe(chunked_vector topics); ss::future - offset_fetch(std::vector topics); + offset_fetch(chunked_vector topics); ss::future offset_commit(chunked_vector topics); ss::future diff --git a/src/v/kafka/client/test/consumer_group.cc b/src/v/kafka/client/test/consumer_group.cc index 91a8928e6e946..d5f64bf371328 100644 --- a/src/v/kafka/client/test/consumer_group.cc +++ b/src/v/kafka/client/test/consumer_group.cc @@ -59,9 +59,9 @@ namespace { -std::vector +chunked_vector offset_request_from_assignment(kc::assignment assignment) { - auto topics = std::vector{}; + auto topics = chunked_vector{}; topics.reserve(assignment.size()); std::transform( std::make_move_iterator(assignment.begin()), diff --git a/src/v/kafka/protocol/offset_fetch.h b/src/v/kafka/protocol/offset_fetch.h index f19b5747156a1..5b69bb0b8ea17 100644 --- a/src/v/kafka/protocol/offset_fetch.h +++ b/src/v/kafka/protocol/offset_fetch.h @@ -50,35 +50,55 @@ struct offset_fetch_response final { offset_fetch_response_data data; - offset_fetch_response() = default; - - offset_fetch_response(error_code error) { data.error_code = error; } - - offset_fetch_response(const offset_fetch_request&, error_code error) - : offset_fetch_response(error) {} - - offset_fetch_response( - std::optional>& topics) { - data.error_code = error_code::none; - if (topics) { + template + static auto get_topics(auto topics) { + Topics result; + if (topics.has_value()) { + result.reserve(topics->size()); for (auto& topic : *topics) { - chunked_vector partitions; + decltype(Topics::value_type::partitions) partitions; + partitions.reserve(topic.partition_indexes.size()); for (auto id : topic.partition_indexes) { - offset_fetch_response_partition p = { + partitions.push_back({ .partition_index = id, .committed_offset = model::offset(-1), + .committed_leader_epoch = kafka::leader_epoch{-1}, .metadata = "", .error_code = error_code::none, - }; - partitions.push_back(std::move(p)); + }); } - offset_fetch_response_topic t = { - .name = topic.name, + result.push_back({ + .name = std::move(topic.name), .partitions = std::move(partitions), - }; - data.topics.push_back(std::move(t)); + }); } } + return result; + } + + static offset_fetch_response_group + make_group(offset_fetch_request_group request) { + using topics = decltype(offset_fetch_response_group::topics); + return offset_fetch_response_group{ + .group_id = std::move(request.group_id), + .topics = get_topics(std::move(request.topics)), + .error_code = error_code::none}; + } + + offset_fetch_response() = default; + + explicit offset_fetch_response(error_code error) { + data.error_code = error; + } + + explicit offset_fetch_response( + const offset_fetch_request&, error_code error) + : offset_fetch_response(error) {} + + explicit offset_fetch_response( + std::optional> topics) { + data.error_code = error_code::none; + data.topics = get_topics(std::move(topics)); } void encode(protocol::encoder& writer, api_version version) { diff --git a/src/v/kafka/protocol/schemata/generator.py b/src/v/kafka/protocol/schemata/generator.py index db27de3077cb2..bc043587b4f5a 100755 --- a/src/v/kafka/protocol/schemata/generator.py +++ b/src/v/kafka/protocol/schemata/generator.py @@ -63,7 +63,12 @@ "OffsetFetchRequestData": { "Topics": { "PartitionIndexes": ("model::partition_id", "int32"), - } + }, + "Groups": { + "Topics": { + "PartitionIndexes": ("model::partition_id", "int32"), + } + }, }, "OffsetFetchResponseData": { "Topics": { @@ -71,8 +76,17 @@ "PartitionIndex": ("model::partition_id", "int32"), "CommittedOffset": ("model::offset", "int64"), "CommittedLeaderEpoch": ("kafka::leader_epoch", "int32"), - }, - } + } + }, + "Groups": { + "Topics": { + "Partitions": { + "PartitionIndex": ("model::partition_id", "int32"), + "CommittedOffset": ("model::offset", "int64"), + "CommittedLeaderEpoch": ("kafka::leader_epoch", "int32"), + } + } + }, }, "OffsetCommitRequestData": { "Topics": { @@ -567,7 +581,6 @@ "metadata_response_partition": "chunked_vector", "metadata_response_topic": "chunked_vector", "partition_data": "chunked_vector", - "offset_fetch_response_partition": "chunked_vector", "int32_t": "std::vector", "model::node_id": "std::vector", "model::partition_id": "std::vector", @@ -577,7 +590,6 @@ "createable_topic_config": "std::vector", "creatable_topic_configs": "std::vector", "creatable_replica_assignment": "std::vector", - "offset_fetch_request_topic": "std::vector", "partition_produce_response": "std::vector", "creatable_acl_result": "std::vector", "offset_delete_request_partition": "std::vector", @@ -599,9 +611,14 @@ def make_context_field(path): # a listing of expected struct types STRUCT_TYPES = [ "ApiVersion", + "OffsetFetchRequestGroup", "OffsetFetchRequestTopic", + "OffsetFetchRequestTopics", + "OffsetFetchResponseGroup", "OffsetFetchResponseTopic", + "OffsetFetchResponseTopics", "OffsetFetchResponsePartition", + "OffsetFetchResponsePartitions", "OffsetCommitRequestTopic", "OffsetCommitRequestPartition", "OffsetCommitResponseTopic", diff --git a/src/v/kafka/protocol/schemata/offset_fetch_request.json b/src/v/kafka/protocol/schemata/offset_fetch_request.json index ddd53fc859600..8f3c4144ab078 100644 --- a/src/v/kafka/protocol/schemata/offset_fetch_request.json +++ b/src/v/kafka/protocol/schemata/offset_fetch_request.json @@ -16,6 +16,7 @@ { "apiKey": 9, "type": "request", + "listeners": ["zkBroker", "broker"], "name": "OffsetFetchRequest", // In version 0, the request read offsets from ZK. // @@ -30,19 +31,33 @@ // Version 6 is the first flexible version. // // Version 7 is adding the require stable flag. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups at a time + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ - { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", "about": "The group to fetch offsets for." }, - { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+", + { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name."}, - { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", + { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7", "about": "The partition indexes we would like to fetch offsets for." } ]}, + { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+", + "about": "Each group we would like to fetch offsets for", "fields": [ + { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", + "about": "The group ID."}, + { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+", + "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ + { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", + "about": "The topic name."}, + { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+", + "about": "The partition indexes we would like to fetch offsets for." } + ]} + ]}, {"name": "RequireStable", "type": "bool", "versions": "7+", "default": "false", - "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partition."} + "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."} ] } diff --git a/src/v/kafka/protocol/schemata/offset_fetch_response.json b/src/v/kafka/protocol/schemata/offset_fetch_response.json index b9777017ed9ac..71acf0b4d2e99 100644 --- a/src/v/kafka/protocol/schemata/offset_fetch_response.json +++ b/src/v/kafka/protocol/schemata/offset_fetch_response.json @@ -30,30 +30,57 @@ // Version 6 is the first flexible version. // // Version 7 adds pending offset commit as new error response on partition level. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", + { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7", "about": "The responses per topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name." }, - { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+", + { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7", "about": "The responses per partition", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", + { "name": "PartitionIndex", "type": "int32", "versions": "0-7", "about": "The partition index." }, - { "name": "CommittedOffset", "type": "int64", "versions": "0+", + { "name": "CommittedOffset", "type": "int64", "versions": "0-7", "about": "The committed message offset." }, - { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1", + { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1", "ignorable": true, "about": "The leader epoch." }, - { "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+", + { "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7", "about": "The partition metadata." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", + { "name": "ErrorCode", "type": "int16", "versions": "0-7", "about": "The error code, or 0 if there was no error." } ]} ]}, - { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable": true, - "about": "The top-level error code, or 0 if there was no error." } + { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true, + "about": "The top-level error code, or 0 if there was no error." }, + { "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+", + "about": "The responses per group id.", "fields": [ + { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", + "about": "The group ID." }, + { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+", + "about": "The responses per topic.", "fields": [ + { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+", + "about": "The responses per partition", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "8+", + "about": "The partition index." }, + { "name": "CommittedOffset", "type": "int64", "versions": "8+", + "about": "The committed message offset." }, + { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "8+", "default": "-1", + "ignorable": true, "about": "The leader epoch." }, + { "name": "Metadata", "type": "string", "versions": "8+", "nullableVersions": "8+", + "about": "The partition metadata." }, + { "name": "ErrorCode", "type": "int16", "versions": "8+", + "about": "The partition-level error code, or 0 if there was no error." } + ]} + ]}, + { "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0", + "about": "The group-level error code, or 0 if there was no error." } + ]} ] } diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index cddd91bb80120..835a480abe506 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2500,31 +2500,30 @@ group::handle_offset_commit(offset_commit_request&& r) { } } -ss::future -group::handle_offset_fetch(offset_fetch_request&& r) { +ss::future +group::handle_offset_fetch(offset_fetch_request_group r, bool require_stable) { if (in_state(group_state::dead)) { - return ss::make_ready_future( - offset_fetch_response(r.data.topics)); + co_return offset_fetch_response::make_group(std::move(r)); } - offset_fetch_response resp; - resp.data.error_code = error_code::none; + offset_fetch_response_group resp{ + .group_id = r.group_id, .error_code = error_code::none}; // retrieve all topics available - if (!r.data.topics) { - absl::flat_hash_map< + if (!r.topics) { + chunked_hash_map< model::topic, - chunked_vector> + chunked_vector> tmp; for (const auto& e : _offsets) { - offset_fetch_response_partition p = { + offset_fetch_response_partitions p = { .partition_index = e.first.partition, .committed_offset = model::offset(-1), .metadata = "", .error_code = error_code::none, }; - if (r.data.require_stable && has_pending_transaction(e.first)) { + if (require_stable && has_pending_transaction(e.first)) { p.error_code = error_code::unstable_offset_commit; } else { p.committed_offset = e.second->metadata.offset; @@ -2536,28 +2535,28 @@ group::handle_offset_fetch(offset_fetch_request&& r) { } for (auto& e : tmp) { - resp.data.topics.push_back( + resp.topics.push_back( {.name = e.first, .partitions = std::move(e.second)}); } - return ss::make_ready_future(std::move(resp)); + co_return resp; } // retrieve for the topics specified in the request - for (const auto& topic : *r.data.topics) { - offset_fetch_response_topic t; + for (const auto& topic : *r.topics) { + offset_fetch_response_topics t; t.name = topic.name; for (auto id : topic.partition_indexes) { model::topic_partition tp(topic.name, id); - offset_fetch_response_partition p = { + offset_fetch_response_partitions p = { .partition_index = id, .committed_offset = model::offset(-1), .metadata = "", .error_code = error_code::none, }; - if (r.data.require_stable && has_pending_transaction(tp)) { + if (require_stable && has_pending_transaction(tp)) { p.error_code = error_code::unstable_offset_commit; } else { auto res = offset(tp); @@ -2571,10 +2570,10 @@ group::handle_offset_fetch(offset_fetch_request&& r) { } t.partitions.push_back(std::move(p)); } - resp.data.topics.push_back(std::move(t)); + resp.topics.push_back(std::move(t)); } - return ss::make_ready_future(std::move(resp)); + co_return resp; } kafka::member_id group::generate_member_id(const join_group_request& r) { diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 013115d2dcac0..7d57396a0fe2e 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -25,6 +25,7 @@ #include "features/feature_table.h" #include "kafka/protocol/fwd.h" #include "kafka/protocol/offset_commit.h" +#include "kafka/protocol/offset_fetch.h" #include "kafka/server/group_metadata.h" #include "kafka/server/group_probe.h" #include "kafka/server/member.h" @@ -632,8 +633,8 @@ class group final : public ss::enable_lw_shared_from_this { ss::future handle_commit_tx(cluster::commit_group_tx_request r); - ss::future - handle_offset_fetch(offset_fetch_request&& r); + ss::future + handle_offset_fetch(offset_fetch_request_group r, bool require_stable); void insert_offset(const model::topic_partition& tp, offset_metadata md) { if (auto o_it = _offsets.find(tp); o_it != _offsets.end()) { diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 26023b037b22d..6ba75f284c410 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -1726,21 +1726,29 @@ group_manager::offset_commit(offset_commit_request&& r) { } ss::future -group_manager::offset_fetch(offset_fetch_request&& r) { - auto error = validate_group_status( - r.ntp, r.data.group_id, offset_fetch_api::key, true); - if (error != error_code::none) { - return ss::make_ready_future( - offset_fetch_response(error)); - } +group_manager::offset_fetch(offset_fetch_request r) { + const auto require_stable = r.data.require_stable; + offset_fetch_response response; - auto group = get_group(r.data.group_id); - if (!group) { - return ss::make_ready_future( - offset_fetch_response(r.data.topics)); - } + for (auto& g_req : r.data.groups) { + auto& g_res = response.data.groups.emplace_back(); + auto error = validate_group_status( + r.ntp, g_req.group_id, offset_fetch_api::key, true); + if (error != error_code::none) { + g_res.group_id = std::move(g_req.group_id); + g_res.error_code = error; + continue; + } - return group->handle_offset_fetch(std::move(r)).finally([group] {}); + auto group = get_group(g_req.group_id); + if (!group) { + g_res = offset_fetch_response::make_group(std::move(g_req)); + } else { + g_res = co_await group->handle_offset_fetch( + std::move(g_req), require_stable); + } + } + co_return response; } ss::future diff --git a/src/v/kafka/server/group_manager.h b/src/v/kafka/server/group_manager.h index cd9e308e5650f..e9d11c6fe2a5b 100644 --- a/src/v/kafka/server/group_manager.h +++ b/src/v/kafka/server/group_manager.h @@ -169,7 +169,7 @@ class group_manager { /// \brief Handle a OffsetFetch request ss::future - offset_fetch(offset_fetch_request&& request); + offset_fetch(offset_fetch_request request); ss::future offset_delete(offset_delete_request&& request); diff --git a/src/v/kafka/server/group_router.cc b/src/v/kafka/server/group_router.cc index c379dc623c8d8..c6f66238f8d2b 100644 --- a/src/v/kafka/server/group_router.cc +++ b/src/v/kafka/server/group_router.cc @@ -10,6 +10,7 @@ */ #include "kafka/server/group_router.h" +#include "kafka/protocol/offset_fetch.h" #include "kafka/server/logger.h" #include @@ -260,8 +261,46 @@ group_router::leave_group(leave_group_request&& request) { } ss::future -group_router::offset_fetch(offset_fetch_request&& request) { - return route(std::move(request), &group_manager::offset_fetch); +group_router::offset_fetch(offset_fetch_request request) { + chunked_hash_map< + std::optional>, + offset_fetch_request> + requests_by_shard; + + offset_fetch_response response; + // Collect requests by shard + for (auto& group : request.data.groups) { + requests_by_shard + .try_emplace(shard_for(group.group_id), offset_fetch_request{}) + .first->second.data.groups.push_back(std::move(group)); + } + // Collect responses + for (auto& [key, request] : requests_by_shard) { + if (key.has_value()) { + auto& [ntp, shard] = key.value(); + request.ntp = std::move(ntp); + auto shard_res = co_await with_scheduling_group( + _sg, [this, shard, r = std::move(request)]() mutable { + return get_group_manager().invoke_on( + shard, _ssg, &group_manager::offset_fetch, std::move(r)); + }); + std::ranges::move( + shard_res.data.groups, std::back_inserter(response.data.groups)); + } else { + // Not coordinator for these groups + for (auto& group : request.data.groups) { + vlog( + cg_klog.trace, + "in route() not coordinator for {}", + group.group_id); + response.data.groups.push_back( + offset_fetch_response_group{ + .group_id = std::move(group.group_id), + .error_code = error_code::not_coordinator}); + } + } + } + co_return response; } ss::future diff --git a/src/v/kafka/server/group_router.h b/src/v/kafka/server/group_router.h index 2bb16cacabde6..fc2ce0b25bb6d 100644 --- a/src/v/kafka/server/group_router.h +++ b/src/v/kafka/server/group_router.h @@ -67,7 +67,7 @@ class group_router final { ss::future leave_group(leave_group_request&& request); ss::future - offset_fetch(offset_fetch_request&& request); + offset_fetch(offset_fetch_request request); ss::future offset_delete(offset_delete_request&& request); diff --git a/src/v/kafka/server/handlers/offset_fetch.h b/src/v/kafka/server/handlers/offset_fetch.h index 64ff13891db5e..f6744484a5ebd 100644 --- a/src/v/kafka/server/handlers/offset_fetch.h +++ b/src/v/kafka/server/handlers/offset_fetch.h @@ -17,6 +17,6 @@ namespace kafka { // in version 0 kafka stores offsets in zookeeper. if we ever need to // support version 0 then we need to do some code review to see if this has // any implications on semantics. -using offset_fetch_handler = single_stage_handler; +using offset_fetch_handler = single_stage_handler; } // namespace kafka diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index ec2c6e6c5de33..65c6b1fc23318 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -24,6 +24,7 @@ #include "features/enterprise_feature_messages.h" #include "features/feature_table.h" #include "kafka/protocol/errors.h" +#include "kafka/protocol/offset_fetch.h" #include "kafka/protocol/produce.h" #include "kafka/protocol/schemata/list_groups_response.h" #include "kafka/server/connection_context.h" @@ -1085,92 +1086,213 @@ ss::future add_partitions_to_txn_handler::handle( }); } +namespace { + +auto partition_auth_describe( + request_context& ctx, + auto& resources, + auto projection, + authz_quiet quiet_authz) { + return std::ranges::partition( + resources, + [&ctx, quiet_authz](const auto& resource) { + return ctx.authorized( + security::acl_operation::describe, resource, quiet_authz); + }, + projection); +}; + +void convert_to_latest(offset_fetch_request& req, api_version current_version) { + if (current_version >= api_version{8}) { + return; + } + auto& group = req.data.groups.emplace_back(); + group.group_id = std::move(req.data.group_id); + if (req.data.topics.has_value()) { + group.topics = chunked_vector{}; + group.topics->reserve(req.data.topics->size()); + for (auto& topic : *req.data.topics) { + group.topics->push_back( + offset_fetch_request_topics{ + .name = std::move(topic.name), + .partition_indexes{ + topic.partition_indexes.begin(), + topic.partition_indexes.end()}, + .unknown_tags = std::move(topic.unknown_tags)}); + } + req.data.topics.reset(); + } +} + +void convert_to(offset_fetch_response& res, api_version target_version) { + if (target_version >= api_version{8}) { + return; + } + if (res.data.groups.size() != 1) { + res.data.groups.clear(); + res.data.topics.clear(); + res.data.error_code = error_code::invalid_request; + vlog( + klog.warn, + "offset_fetch v{} only supports a single group, got: {}", + target_version, + fmt::join( + res.data.groups + | std::views::transform(&offset_fetch_response_group::group_id), + ", ")); + return; + } + auto& group = res.data.groups[0]; + res.data.error_code = group.error_code; + res.data.topics.reserve(group.topics.size()); + for (auto& topic : group.topics) { + chunked_vector partitions; + partitions.reserve(topic.partitions.size()); + for (auto& partition : topic.partitions) { + partitions.push_back( + offset_fetch_response_partition{ + .partition_index = partition.partition_index, + .committed_offset = partition.committed_offset, + .committed_leader_epoch = partition.committed_leader_epoch, + .metadata = std::move(partition.metadata), + .error_code = partition.error_code, + .unknown_tags = std::move(partition.unknown_tags)}); + } + res.data.topics.push_back( + offset_fetch_response_topic{ + .name = std::move(topic.name), + .partitions = std::move(partitions), + .unknown_tags = std::move(topic.unknown_tags)}); + } + res.data.groups.clear(); +} + +} // namespace + template<> ss::future offset_fetch_handler::handle(request_context ctx, ss::smp_service_group) { offset_fetch_request request; request.decode(ctx.reader(), ctx.header().version); log_request(ctx.header(), request); - if (!ctx.authorized( - security::acl_operation::describe, request.data.group_id)) { + convert_to_latest(request, ctx.header().version); + + constexpr auto has_topics = [](const auto& g) { + return g.topics.has_value(); + }; + + constexpr auto pre_filter_authorized_topics = [](auto& ctx, auto& group) { + offset_fetch_response_group response{.group_id = group.group_id}; + + auto unauthorized_rng = partition_auth_describe( + ctx, + group.topics.value(), + &offset_fetch_request_topics::name, + authz_quiet::no); + if (!ctx.audit()) { - co_return co_await ctx.respond( - offset_fetch_response(error_code::broker_not_available)); - } else { - co_return co_await ctx.respond( - offset_fetch_response(error_code::group_authorization_failed)); + response.error_code = error_code::broker_not_available; + return response; } - } - /* - * request is for all group offsets - */ - if (!request.data.topics) { - auto resp = co_await ctx.groups().offset_fetch(std::move(request)); - if (resp.data.error_code != error_code::none) { - co_return co_await ctx.respond(std::move(resp)); + chunked_vector unauthorized{ + std::from_range, unauthorized_rng | std::views::as_rvalue}; + + // remove unauthorized topics from request + group.topics->erase_to_end(unauthorized_rng.begin()); + + // add requested (but unauthorized) topics into response + for (auto& req_topic : unauthorized) { + auto& topic = response.topics.emplace_back(); + topic.name = std::move(req_topic.name); + for (auto partition_index : req_topic.partition_indexes) { + auto& partition = topic.partitions.emplace_back(); + partition.partition_index = partition_index; + partition.committed_offset = model::offset{-1}; + partition.error_code = error_code::topic_authorization_failed; + } } + return response; + }; - // remove unauthorized topics from response - auto unauthorized = std::partition( - resp.data.topics.begin(), - resp.data.topics.end(), - [&ctx](const offset_fetch_response_topic& topic) { - /* - * quiet authz failures. this is checking for visibility across - * all topics not specifically requested topics. - */ - return ctx.authorized( - security::acl_operation::describe, - topic.name, - authz_quiet{true}); - }); + constexpr auto post_filter_authorized_topics = [](auto& ctx, auto& group) { + if (group.error_code != error_code::none) { + return; + } + /* + * quiet authz failures. this is checking for visibility across + * all topics not specifically requested topics. + */ + auto unauthorized_rng = partition_auth_describe( + ctx, + group.topics, + &offset_fetch_response_topics::name, + authz_quiet::yes); if (!ctx.audit()) { - resp.data.topics.clear(); - resp.data.error_code = error_code::broker_not_available; - co_return co_await ctx.respond(std::move(resp)); + group.topics.clear(); + group.error_code = error_code::broker_not_available; + return; } - resp.data.topics.erase_to_end(unauthorized); + group.topics.erase_to_end(unauthorized_rng.begin()); + return; + }; - co_return co_await ctx.respond(std::move(resp)); - } + chunked_hash_map unauthorized; /* - * pre-filter authorized topics in request + * pre-filter authorized groups in request */ - auto unauthorized_it = std::partition( - request.data.topics->begin(), - request.data.topics->end(), - [&ctx](const offset_fetch_request_topic& topic) { - return ctx.authorized(security::acl_operation::describe, topic.name); - }); + auto unauthorized_rng = partition_auth_describe( + ctx, + request.data.groups, + &offset_fetch_request_group::group_id, + authz_quiet::no); - if (!ctx.audit()) { - co_return co_await ctx.respond( - offset_fetch_response(error_code::broker_not_available)); + for (auto& group : unauthorized_rng) { + auto& response = unauthorized[group.group_id]; + response.group_id = std::move(group.group_id); + response.error_code = !ctx.audit() + ? error_code::broker_not_available + : error_code::group_authorization_failed; } + request.data.groups.erase_to_end(unauthorized_rng.begin()); - std::vector unauthorized( - std::make_move_iterator(unauthorized_it), - std::make_move_iterator(request.data.topics->end())); + /* + * pre-filter authorized topics in request + */ + for (auto& group : request.data.groups | std::views::filter(has_topics)) { + unauthorized.emplace( + group.group_id, pre_filter_authorized_topics(ctx, group)); + } - // remove unauthorized topics from request - request.data.topics->erase(unauthorized_it, request.data.topics->end()); auto resp = co_await ctx.groups().offset_fetch(std::move(request)); - // add requested (but unauthorized) topics into response - for (auto& req_topic : unauthorized) { - auto& topic = resp.data.topics.emplace_back(); - topic.name = std::move(req_topic.name); - for (auto partition_index : req_topic.partition_indexes) { - auto& partition = topic.partitions.emplace_back(); - partition.partition_index = partition_index; - partition.error_code = error_code::group_authorization_failed; + for (auto& group : resp.data.groups) { + auto it = unauthorized.find(group.group_id); + if (it != unauthorized.end()) { + /* + * merge pre-filtered unauthorized topics into response + */ + std::ranges::move( + it->second.topics, std::back_inserter(group.topics)); + unauthorized.erase(it); + } else { + /* + * post-filter unauthorized topics in response + */ + post_filter_authorized_topics(ctx, group); } } + /* + * merge pre-filtered unauthorized groups into response + */ + std::ranges::move( + unauthorized | std::views::values, std::back_inserter(resp.data.groups)); + + convert_to(resp, ctx.header().version); co_return co_await ctx.respond(std::move(resp)); } diff --git a/src/v/kafka/server/tests/BUILD b/src/v/kafka/server/tests/BUILD index 1eee066e5c781..03777a6bdd61a 100644 --- a/src/v/kafka/server/tests/BUILD +++ b/src/v/kafka/server/tests/BUILD @@ -276,7 +276,7 @@ redpanda_cc_btest( cpu = 1, deps = [ "//src/v/cluster", - "//src/v/kafka/client", + "//src/v/kafka/client:transport", "//src/v/kafka/protocol", "//src/v/kafka/protocol:describe_groups", "//src/v/kafka/protocol:find_coordinator", @@ -287,6 +287,7 @@ redpanda_cc_btest( "//src/v/model", "//src/v/model:timeout_clock", "//src/v/redpanda/tests:fixture", + "//src/v/security", "//src/v/test_utils:scoped_config", "//src/v/test_utils:seastar_boost", "//src/v/utils:base64", @@ -370,6 +371,7 @@ redpanda_cc_btest( ], deps = [ "//src/v/kafka/protocol:offset_fetch", + "//src/v/kafka/server", "//src/v/redpanda/tests:fixture", "//src/v/test_utils:seastar_boost", "@boost//:test", diff --git a/src/v/kafka/server/tests/consumer_groups_test.cc b/src/v/kafka/server/tests/consumer_groups_test.cc index 5bee6780bf7e3..42a9b2f7750ce 100644 --- a/src/v/kafka/server/tests/consumer_groups_test.cc +++ b/src/v/kafka/server/tests/consumer_groups_test.cc @@ -9,7 +9,8 @@ #include "cluster/controller.h" #include "cluster/controller_api.h" -#include "kafka/client/client.h" +#include "cluster/security_frontend.h" +#include "kafka/client/transport.h" #include "kafka/protocol/describe_groups.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/find_coordinator.h" @@ -19,10 +20,12 @@ #include "kafka/server/coordinator_ntp_mapper.h" #include "kafka/server/group.h" #include "kafka/server/group_manager.h" +#include "kafka/server/handlers/offset_fetch.h" #include "model/fundamental.h" #include "model/namespace.h" #include "model/timeout_clock.h" #include "redpanda/tests/fixture.h" +#include "security/acl.h" #include "test_utils/async.h" #include "test_utils/boost_fixture.h" #include "test_utils/scoped_config.h" @@ -80,6 +83,45 @@ struct consumer_offsets_fixture : public redpanda_thread_fixture { client.stop().get(); client.shutdown(); } + + void create_user(const ss::sstring& user, const ss::sstring& pass) { + auto creds = security::scram_sha256::make_credentials( + pass, security::scram_sha256::min_iterations); + app.controller->get_security_frontend() + .local() + .create_user( + security::credential_user(user), + std::move(creds), + model::timeout_clock::now() + 5s) + .get(); + } + + template + void authorize( + const ss::sstring& user, + security::acl_operation op, + const std::vector& resources) { + const auto make_binding = [&user, op](const T& resource) { + return security::acl_binding{ + security::resource_pattern{ + security::get_resource_type(), + resource, + security::pattern_type::literal}, + security::acl_entry{ + security::acl_principal{security::principal_type::user, user}, + security::acl_host::wildcard_host(), + op, + security::acl_permission::allow}}; + }; + + app.controller->get_security_frontend() + .local() + .create_acls( + resources | std::views::transform(make_binding) + | std::ranges::to(), + 5s) + .get(); + } }; FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) { @@ -147,6 +189,187 @@ FIXTURE_TEST(empty_offset_commit_request, consumer_offsets_fixture) { } } +FIXTURE_TEST(offset_commit_and_fetch_request, consumer_offsets_fixture) { + scoped_config cfg; + cfg.get("group_topic_partitions").set_value(1); + + static const model::topic topic_foo{"foo"}; + static const model::topic topic_bar{"bar"}; + static const model::topic topic_no_auth{"no_auth"}; + + static const kafka::group_id group_foo{"foo-partitions"}; + static const kafka::group_id group_bar{"bar-partitions"}; + static const kafka::group_id group_no_auth{"no_auth-partitions"}; + + // v2 supports an error_code + static constexpr api_version version_with_error{2}; + // v8 supports multiple groups + static constexpr api_version version_with_groups{8}; + + const std::map< + kafka::group_id, + std::map>> + committed_offsets{ + {group_foo, + { + {topic_foo, {{model::partition_id{2}, model::offset{42}}}}, + {topic_no_auth, {{model::partition_id{1}, model::offset{24}}}}, + }}, + {group_bar, + { + {topic_bar, {{model::partition_id{1}, model::offset{24}}}}, + {topic_no_auth, {{model::partition_id{0}, model::offset{36}}}}, + }}, + {group_no_auth, + { + {topic_no_auth, {{model::partition_id{0}, model::offset{12}}}}, + }}, + }; + + for (const auto& topic : {topic_foo, topic_bar, topic_no_auth}) { + add_topic({model::kafka_namespace, topic}, 3).get(); + } + + kafka::group_instance_id gr("instance-1"); + wait_for_consumer_offsets_topic(gr); + auto client = make_kafka_client().get(); + auto deferred = ss::defer([&client] { + client.stop().then([&client] { client.shutdown(); }).get(); + }); + client.connect().get(); + + // Commit offsets + for (const auto& [group_id, topics] : committed_offsets) { + offset_commit_request req{.data{.group_id = group_id}}; + req.data.group_instance_id = gr; + for (const auto& [topic, partitions] : topics) { + auto& c_topic = req.data.topics.emplace_back( + offset_commit_request_topic{.name = topic}); + for (const auto& [partition, offset] : partitions) { + c_topic.partitions.push_back( + {.partition_index = partition, .committed_offset = offset}); + } + } + auto resp + = client.dispatch(std::move(req), kafka::api_version(7)).get(); + BOOST_REQUIRE(!resp.data.errored()); + } + + const auto fill_request = + [&committed_offsets]( + auto& req, const kafka::group_id& group_id, bool all_topics) { + req.group_id = group_id; + if (!all_topics) { + req.topics.emplace(); + for (const auto& [topic, partitions] : + committed_offsets.at(group_id)) { + req.topics->push_back( + {.name{topic}, + .partition_indexes{ + std::from_range, partitions | std::views::keys}}); + } + } + }; + + const auto check_response_topics = [&committed_offsets]( + kafka::api_version api_version, + auto& resp, + const kafka::group_id& group_id, + bool all_topics) { + if (group_id == group_no_auth) { + BOOST_REQUIRE(resp.topics.empty()); + if (api_version >= version_with_error) { + BOOST_REQUIRE_EQUAL( + resp.error_code, error_code::group_authorization_failed); + } + return; + } + const auto& c_offsets = committed_offsets.at(group_id); + BOOST_REQUIRE_EQUAL( + resp.topics.size(), all_topics ? 1 : c_offsets.size()); + for (const auto& topic : resp.topics) { + const auto t_it = c_offsets.find(topic.name); + BOOST_REQUIRE(t_it != c_offsets.end()); + BOOST_REQUIRE_EQUAL(topic.partitions.size(), t_it->second.size()); + for (const auto& p : topic.partitions) { + if (topic.name == topic_no_auth) { + BOOST_REQUIRE(resp.errored()); + BOOST_REQUIRE_EQUAL(p.committed_offset, model::offset{-1}); + BOOST_REQUIRE_EQUAL( + p.error_code, error_code::topic_authorization_failed); + } else { + const auto p_it = t_it->second.find(p.partition_index); + BOOST_REQUIRE(p_it != t_it->second.end()); + BOOST_REQUIRE_EQUAL(p_it->second, p.committed_offset); + } + } + } + }; + + ss::sstring user{"user_name_256"}; + ss::sstring password{"password_256"}; + create_user(user, password); + authorize( + user, + security::acl_operation::describe, + std::vector{topic_foo, topic_bar}); + authorize( + user, + security::acl_operation::describe, + std::vector{group_bar, group_foo}); + + enable_sasl(); + auto disable_sasl_defer = ss::defer([this] { disable_sasl(); }); + + auto auth_client = make_kafka_client().get(); + auto auth_deferred = ss::defer([&auth_client] { + auth_client.stop() + .then([&auth_client] { auth_client.shutdown(); }) + .get(); + }); + auth_client.connect().get(); + + authn_kafka_client( + auth_client, user, password); + + for (auto api_version = kafka::offset_fetch_handler::min_supported; + api_version <= kafka::offset_fetch_handler::max_supported; + ++api_version) { + for (bool all_topics : {true, false}) { + if (api_version < version_with_groups) { + for (const auto& g : committed_offsets | std::views::keys) { + offset_fetch_request req; + fill_request(req.data, g, all_topics); + auto resp + = auth_client.dispatch(std::move(req), api_version).get(); + if (api_version >= version_with_error) { + BOOST_REQUIRE_EQUAL( + resp.data.errored(), + !all_topics || g == group_no_auth); + } + BOOST_REQUIRE_EQUAL(resp.data.groups.size(), 0); + check_response_topics( + api_version, resp.data, g, all_topics); + } + } else { + offset_fetch_request req; + for (const auto& g : committed_offsets | std::views::keys) { + fill_request(req.data.groups.emplace_back(), g, all_topics); + } + auto resp + = auth_client.dispatch(std::move(req), api_version).get(); + BOOST_REQUIRE(resp.data.errored()); + BOOST_REQUIRE_EQUAL( + resp.data.groups.size(), committed_offsets.size()); + for (const auto& g : resp.data.groups) { + check_response_topics( + api_version, g, g.group_id, all_topics); + } + } + } + } +} + FIXTURE_TEST(block_test, consumer_offsets_fixture) { scoped_config cfg; cfg.get("group_topic_partitions").set_value(1); diff --git a/src/v/kafka/server/tests/offset_fetch_test.cc b/src/v/kafka/server/tests/offset_fetch_test.cc index 96979e131310c..1d7ae0ddc8898 100644 --- a/src/v/kafka/server/tests/offset_fetch_test.cc +++ b/src/v/kafka/server/tests/offset_fetch_test.cc @@ -24,7 +24,7 @@ FIXTURE_TEST(offset_fetch, redpanda_thread_fixture) { kafka::offset_fetch_request req; req.data.group_id = kafka::group_id("g"); - auto resp = client.dispatch(req, kafka::api_version(2)).get(); + auto resp = client.dispatch(std::move(req), kafka::api_version(2)).get(); client.stop().then([&client] { client.shutdown(); }).get(); BOOST_TEST(resp.data.error_code == kafka::error_code::not_coordinator); diff --git a/src/v/pandaproxy/json/requests/offset_fetch.h b/src/v/pandaproxy/json/requests/offset_fetch.h index f04d7a71b7ac2..4bf990c2daf14 100644 --- a/src/v/pandaproxy/json/requests/offset_fetch.h +++ b/src/v/pandaproxy/json/requests/offset_fetch.h @@ -19,9 +19,9 @@ namespace pandaproxy::json { -inline std::vector +inline chunked_vector partitions_request_to_offset_request(std::vector tps) { - std::vector res; + chunked_vector res; if (tps.empty()) { return res; }