diff --git a/proto/redpanda/core/admin/v2/BUILD b/proto/redpanda/core/admin/v2/BUILD index df3c8d6b6393b..fd7d21bfded2b 100644 --- a/proto/redpanda/core/admin/v2/BUILD +++ b/proto/redpanda/core/admin/v2/BUILD @@ -2,16 +2,14 @@ load("@rules_proto//proto:defs.bzl", "proto_library") load("//bazel/pbgen:pbgen.bzl", "redpanda_proto_library") proto_library( - name = "broker_proto", + name = "kafka_connections_proto", srcs = [ - "broker.proto", "kafka_connections.proto", ], visibility = ["//visibility:public"], deps = [ "//proto/redpanda/core/common:common_proto", "//proto/redpanda/core/pbgen:options_proto", - "//proto/redpanda/core/pbgen:rpc_proto", "@googleapis//google/api:field_info_proto", "@protobuf//:duration_proto", "@protobuf//:timestamp_proto", @@ -19,14 +17,57 @@ proto_library( ) redpanda_proto_library( - name = "broker_redpanda_proto", - protos = [":broker_proto"], + name = "kafka_connections_redpanda_proto", + protos = [ + ":kafka_connections_proto", + ], visibility = ["//visibility:public"], deps = [ "@abseil-cpp//absl/time:time", ], ) +proto_library( + name = "broker_proto", + srcs = ["broker.proto"], + visibility = ["//visibility:public"], + deps = [ + "//proto/redpanda/core/pbgen:options_proto", + "//proto/redpanda/core/pbgen:rpc_proto", + ], +) + +redpanda_proto_library( + name = "broker_redpanda_proto", + protos = [ + ":broker_proto", + ], + visibility = ["//visibility:public"], + deps = [], +) + +proto_library( + name = "cluster_proto", + srcs = ["cluster.proto"], + visibility = ["//visibility:public"], + deps = [ + ":kafka_connections_proto", + "//proto/redpanda/core/pbgen:options_proto", + "//proto/redpanda/core/pbgen:rpc_proto", + ], +) + +redpanda_proto_library( + name = "cluster_redpanda_proto", + protos = [ + ":cluster_proto", + ], + visibility = ["//visibility:public"], + deps = [ + ":kafka_connections_redpanda_proto", + ], +) + proto_library( name = "shadow_link_proto", srcs = ["shadow_link.proto"], diff --git a/proto/redpanda/core/admin/v2/broker.proto b/proto/redpanda/core/admin/v2/broker.proto index 8e7237e05458a..fb7e1be1ee854 100644 --- a/proto/redpanda/core/admin/v2/broker.proto +++ b/proto/redpanda/core/admin/v2/broker.proto @@ -18,7 +18,6 @@ package redpanda.core.admin.v2; import "proto/redpanda/core/pbgen/options.proto"; import "proto/redpanda/core/pbgen/rpc.proto"; -import "proto/redpanda/core/admin/v2/kafka_connections.proto"; option (redpanda.core.pbgen.cpp_namespace) = "proto::admin"; @@ -37,15 +36,6 @@ service BrokerService { authz: SUPERUSER }; } - - // ListKafkaConnections returns information about the broker's Kafka - // connections. - rpc ListKafkaConnections(ListKafkaConnectionsRequest) - returns (ListKafkaConnectionsResponse) { - option (redpanda.core.pbgen.rpc) = { - authz: SUPERUSER - }; - } } // GetBrokerRequest returns information about a single broker in the cluster @@ -70,76 +60,6 @@ message ListBrokersResponse { repeated Broker brokers = 1; } -message ListKafkaConnectionsRequest { - // The node ID for the broker. If set to -1, the broker handling the RPC - // request returns information about itself. - int32 node_id = 1; - - // The maximum number of connections to return. If unspecified or 0, a - // default value may be applied. Note that paging is currently not fully - // supported, and this field only acts as a limit for the first page of data - // returned. Subsequent pages of data cannot be requested. - int32 page_size = 2; - - // Filter expression to apply to the connection list. - // Uses a subset of AIP-160 filter syntax supporting: - // - Field comparisons (`=`, `!=`, `<`, `>`, `<=`, `>=`) - // - Logical AND chaining: condition1 AND condition2 - // - Nested field access: parent.child = value - // - Escape sequences: field = "string with \"quotes\"" - // - Enum types - // - RFC3339 timestamps and ISO-like duration - // - // Limitations (not supported): - // - Logical OR chaining - // - Parentheses `(` `)` for grouping - // - Map and repeated types - // - HAS (:) operator - // - Negation (-, NOT) - // - Bare literal matching - // - // Example filters: - // - `state = KAFKA_CONNECTION_STATE_OPEN` - // - `idle_duration > 30s AND request_count_total > 100` - // - `authentication_info.user_principal = "my-producer"` - // - `recent_request_statistics.produce_bytes > 1000 AND - // client_software_name = "kgo"` - // - `open_time >= 2025-09-01T10:22:54Z` - // - // Reference: https://google.aip.dev/160 - string filter = 3; - - // Field-based ordering specification following AIP-132 syntax. - // Supports multiple fields with `asc`/`desc` direction indicators. - // Examples: - // - `idle_duration desc` - longest idle connections first - // - `open_time desc, total_request_statistics.request_count desc` - newest - // connections first, then most active - // - `recent_request_statistics.produce_bytes desc` - connections with - // highest current produce throughput first - // - // Reference: https://google.aip.dev/132#ordering - string order_by = 4; -} - -message ListKafkaConnectionsResponse { - // The list of connections matching the request. - // Note that in addition to open connections, some recently-closed - // connections may also be included here. If you don't want to include - // closed connections, set the filter in the request to `state = - // KAFKA_CONNECTION_STATE_OPEN`. - repeated KafkaConnection connections = 1; - - // Total number of connections matching the request. - // This may be greater than `len(connections)` if some connections were - // omitted from the response due to the specified (or default) `page_size`. - // Example: - // request.page_size = 10 - // response.connections = [<10 items>] - // response.total_size = 13 - uint64 total_size = 2; -} - // The resource for an individual broker within the Kafka Cluster. message Broker { // This broker's node ID. diff --git a/proto/redpanda/core/admin/v2/cluster.proto b/proto/redpanda/core/admin/v2/cluster.proto new file mode 100644 index 0000000000000..d4bc5820e3028 --- /dev/null +++ b/proto/redpanda/core/admin/v2/cluster.proto @@ -0,0 +1,105 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package redpanda.core.admin.v2; + +import "proto/redpanda/core/pbgen/options.proto"; +import "proto/redpanda/core/pbgen/rpc.proto"; +import "proto/redpanda/core/admin/v2/kafka_connections.proto"; + +option (redpanda.core.pbgen.cpp_namespace) = "proto::admin"; + +// The ClusterService gives information about the cluster. +service ClusterService { + // ListKafkaConnections returns information about the cluster's Kafka + // connections, collected and ordered across all brokers. + rpc ListKafkaConnections(ListKafkaConnectionsRequest) + returns (ListKafkaConnectionsResponse) { + option (redpanda.core.pbgen.rpc) = { + authz: SUPERUSER + }; + } +} + +// ListKafkaConnectionsRequest return information about the broker's Kafka +// connections. +message ListKafkaConnectionsRequest { + // The maximum number of connections to return. If unspecified or 0, a + // default value may be applied. Note that paging is currently not fully + // supported, and this field only acts as a limit for the first page of data + // returned. Subsequent pages of data cannot be requested. + int32 page_size = 1; + + // Filter expression to apply to the connection list. + // Uses a subset of AIP-160 filter syntax supporting: + // - Field comparisons (`=`, `!=`, `<`, `>`, `<=`, `>=`) + // - Logical AND chaining: condition1 AND condition2 + // - Nested field access: parent.child = value + // - Escape sequences: field = "string with \"quotes\"" + // - Enum types + // - RFC3339 timestamps and ISO-like duration + // + // Limitations (not supported): + // - Logical OR chaining + // - Parentheses `(` `)` for grouping + // - Map and repeated types + // - HAS (:) operator + // - Negation (-, NOT) + // - Bare literal matching + // + // Example filters: + // - `state = KAFKA_CONNECTION_STATE_OPEN` + // - `idle_duration > 30s AND total_request_statistics.request_count > 100` + // - `authentication_info.user_principal = "my-producer"` + // - `recent_request_statistics.produce_bytes > 1000 AND + // client_software_name = "kgo"` + // - `open_time >= 2025-09-01T10:22:54Z` + // + // Reference: https://google.aip.dev/160 + string filter = 2; + + // Field-based ordering specification following AIP-132 syntax. + // Supports multiple fields with `asc`/`desc` direction indicators. + // Examples: + // - `idle_duration desc` - longest idle connections first + // - `open_time desc, total_request_statistics.request_count desc` - newest + // connections first, then most active + // - `recent_request_statistics.produce_bytes desc` - connections with + // highest current produce throughput first + // + // Reference: https://google.aip.dev/132#ordering + string order_by = 3; +} + +// ListKafkaConnectionsResponse is the response from the ListKafkaConnections +// RPC. +message ListKafkaConnectionsResponse { + // The list of connections matching the request. + // Note that in addition to open connections, some recently-closed + // connections may also be included here. If you don't want to include + // closed connections, set the filter in the request to `state = + // KAFKA_CONNECTION_STATE_OPEN`. + repeated KafkaConnection connections = 1; + + // Total number of connections matching the request. + // This may be greater than `len(connections)` if some connections were + // omitted from the response due to the specified (or default) `page_size`. + // Example: + // request.page_size = 10 + // response.connections = [<10 items>] + // response.total_size = 13 + uint64 total_size = 2; +} diff --git a/src/v/kafka/server/BUILD b/src/v/kafka/server/BUILD index 17bada09ba297..790eda44a9ed7 100644 --- a/src/v/kafka/server/BUILD +++ b/src/v/kafka/server/BUILD @@ -259,7 +259,7 @@ redpanda_cc_library( ":qdc_monitor_config", ":security", ":topic_config_utils", - "//proto/redpanda/core/admin/v2:broker_redpanda_proto", + "//proto/redpanda/core/admin/v2:kafka_connections_redpanda_proto", "//src/v/base", "//src/v/bytes", "//src/v/bytes:iobuf", diff --git a/src/v/redpanda/BUILD b/src/v/redpanda/BUILD index c0e5c5a70d15c..f7a2b5cb50f56 100644 --- a/src/v/redpanda/BUILD +++ b/src/v/redpanda/BUILD @@ -68,11 +68,13 @@ redpanda_cc_library( "//src/v/pandaproxy/rest:server", "//src/v/raft", "//src/v/redpanda/admin", + "//src/v/redpanda/admin:kafka_connections_service", "//src/v/redpanda/admin/proxy:service", "//src/v/redpanda/admin/proxy:client", "//src/v/redpanda/admin/services/shadow_link", "//src/v/redpanda/admin/services/datalake", "//src/v/redpanda/admin/services/internal:debug", + "//src/v/redpanda/admin/services:cluster", "//src/v/resource_mgmt:cpu_profiler", "//src/v/resource_mgmt:cpu_scheduling", "//src/v/resource_mgmt:memory_groups", diff --git a/src/v/redpanda/admin/BUILD b/src/v/redpanda/admin/BUILD index 342d5585d0b37..8187d2d5b704e 100644 --- a/src/v/redpanda/admin/BUILD +++ b/src/v/redpanda/admin/BUILD @@ -200,6 +200,30 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "kafka_connections_service", + srcs = [ + "kafka_connections_service.cc", + ], + hdrs = [ + "kafka_connections_service.h", + ], + visibility = ["//visibility:public"], + deps = [ + ":aip_filter", + ":aip_ordering", + "//proto/redpanda/core/admin/v2:cluster_redpanda_proto", + "//proto/redpanda/core/admin/v2:kafka_connections_redpanda_proto", + "//src/v/base", + "//src/v/container:priority_queue", + "//src/v/kafka/server", + "//src/v/redpanda/admin/proxy:client", + "//src/v/serde/protobuf:rpc", + "//src/v/ssx:async_algorithm", + "@seastar", + ], +) + redpanda_cc_library( name = "admin", srcs = [ @@ -258,6 +282,7 @@ redpanda_cc_library( ":debug_bundle", ":features", ":hbadger", + ":kafka_connections_service", ":migration", ":partition", ":raft", diff --git a/src/v/redpanda/admin/kafka_connections_service.cc b/src/v/redpanda/admin/kafka_connections_service.cc new file mode 100644 index 0000000000000..a9a7a68481d4f --- /dev/null +++ b/src/v/redpanda/admin/kafka_connections_service.cc @@ -0,0 +1,311 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "redpanda/admin/kafka_connections_service.h" + +#include "container/priority_queue.h" +#include "kafka/server/server.h" +#include "proto/redpanda/core/admin/v2/cluster.proto.h" +#include "proto/redpanda/core/admin/v2/kafka_connections.proto.h" +#include "redpanda/admin/aip_filter.h" +#include "redpanda/admin/aip_ordering.h" +#include "ssx/async_algorithm.h" + +#include + +#include + +namespace proto { +using namespace proto::admin; +} // namespace proto + +namespace admin { + +namespace { +struct connection_collector { + virtual ~connection_collector() = default; + virtual void add(proto::admin::kafka_connection conn) = 0; + virtual ss::future<> + add_all(chunked_vector conns) = 0; + virtual chunked_vector + extract_unordered() && = 0; + virtual ss::future> + extract() && = 0; + virtual size_t size() const = 0; +}; + +class unordered_collector : public connection_collector { + chunked_vector _connections; + size_t _limit; + +public: + explicit unordered_collector(size_t limit) + : _limit(limit) {} + + void add(proto::admin::kafka_connection conn) final { + if (_connections.size() < _limit) { + _connections.emplace_back(std::move(conn)); + } + } + + ss::future<> + add_all(chunked_vector conns) final { + auto to_add_count = std::min( + conns.size(), _limit - _connections.size()); + auto insert_range = std::ranges::subrange( + conns.begin(), conns.begin() + to_add_count); + _connections.reserve(_connections.size() + insert_range.size()); + co_await ssx::async_for_each(insert_range, [this](auto& conn) { + _connections.emplace_back(std::move(conn)); + }); + } + + chunked_vector extract_unordered() + && final { + return std::move(_connections); + } + + ss::future> extract() + && final { + co_return std::move(_connections); + }; + + size_t size() const final { return _connections.size(); } +}; + +template +class ordered_collector : public connection_collector { + // Invert the order here to get the min-k instead of the max-k + chunked_bounded_priority_queue< + proto::admin::kafka_connection, + detail::invert_comparator> + _pq; + +public: + ordered_collector(size_t limit, Comparator comp) + : _pq(limit, detail::invert_comparator(std::move(comp))) {} + + void add(proto::admin::kafka_connection conn) final { + _pq.push(std::move(conn)); + } + + ss::future<> + add_all(chunked_vector conns) final { + co_await _pq.async_push_range(std::move(conns)); + } + + chunked_vector extract_unordered() + && final { + return std::move(_pq).extract_heap(); + } + + ss::future> extract() + && final { + return std::move(_pq).async_extract_sorted(); + } + + size_t size() const final { return _pq.size(); } +}; + +using make_local_collector_t + = ss::noncopyable_function(size_t)>; + +struct connection_gather_result { + chunked_vector connections; + size_t total_matching_count; +}; + +ss::future gather_connections( + const kafka::server& server, + const filter_predicate& filter, + std::unique_ptr collector) { + auto result = connection_gather_result{}; + + auto conn_ptrs = server.list_connections(); + co_await ss::coroutine::maybe_yield(); + + auto process_conn = [&result, &collector, &filter]( + proto::admin::kafka_connection&& conn_proto) { + bool matches_filter = filter(conn_proto); + + if (matches_filter) { + result.total_matching_count++; + collector->add(std::move(conn_proto)); + } + }; + + co_await ssx::async_for_each( + conn_ptrs, [&process_conn](const auto& conn_ptr) { + process_conn(conn_ptr->to_proto()); + }); + + auto closed_conns = server.list_closed_connections(); + for (auto& elem : closed_conns) { + auto elem_copy = co_await proto::admin::kafka_connection::from_proto( + co_await elem->to_proto()); + process_conn(std::move(elem_copy)); + } + + result.connections = std::move(*collector).extract_unordered(); + co_return result; +} + +ss::future gather_all_shards( + ss::sharded& kafka_server, + const filter_predicate& filter, + const make_local_collector_t& make_local_collector, + connection_collector& global_collector) { + size_t total_matching_connections = 0; + + for (ss::shard_id shard = 0; shard < ss::smp::count; ++shard) { + auto accumulated_count = global_collector.size(); + auto shard_result = co_await kafka_server.invoke_on( + shard, + [accumulated_count, &filter, &make_local_collector]( + kafka::server& server) { + return gather_connections( + server, filter, make_local_collector(accumulated_count)); + }); + + total_matching_connections += shard_result.total_matching_count; + co_await global_collector.add_all(std::move(shard_result.connections)); + + co_await ss::coroutine::maybe_yield(); + } + + co_return total_matching_connections; +} + +} // namespace + +ss::future +kafka_connections_service::list_kafka_connections_local( + proto::admin::list_kafka_connections_request req) { + auto resp = proto::admin::list_kafka_connections_response{}; + + auto limit = get_effective_limit(req.get_page_size()); + + auto filter_cfg = make_aip_filter_config( + req.get_filter()); + auto filter = aip_filter_parser::create_aip_filter(std::move(filter_cfg)); + + auto [global_collector, make_local_collector] = + [&req, limit]() -> std::pair< + std::unique_ptr, + make_local_collector_t> { + if (req.get_order_by().empty()) { + auto global_collector = std::make_unique( + limit); + + auto make_local_collector = [limit](size_t accumulated_count) { + return std::make_unique( + limit - accumulated_count); + }; + + return std::make_pair( + std::move(global_collector), std::move(make_local_collector)); + } else { + auto ordering_conf + = make_ordering_config( + req.get_order_by()); + auto comp = sort_order::parse(ordering_conf); + + auto global_collector + = std::make_unique>(limit, comp); + + auto make_local_collector = [limit, comp](size_t) { + return std::make_unique>( + limit, comp); + }; + + return std::make_pair( + std::move(global_collector), std::move(make_local_collector)); + } + }(); + + auto total_matching_connections = co_await gather_all_shards( + _kafka_server, filter, make_local_collector, *global_collector); + + resp.set_connections(co_await std::move(*global_collector).extract()); + resp.set_total_size(total_matching_connections); + co_return resp; +} + +ss::future +kafka_connections_service::list_kafka_connections_cluster_wide( + admin::proxy::client& proxy_client, + const serde::pb::rpc::context& ctx, + proto::admin::list_kafka_connections_request req) { + auto limit = get_effective_limit(req.get_page_size()); + + auto collector = [&req, limit]() -> std::unique_ptr { + if (req.get_order_by().empty()) { + return std::make_unique(limit); + } else { + auto ordering_conf + = make_ordering_config( + req.get_order_by()); + auto comp = sort_order::parse(ordering_conf); + + return std::make_unique>(limit, comp); + } + }(); + + auto total_count = size_t{0}; + + auto add_to_response = + [&collector, &total_count]( + proto::admin::list_kafka_connections_response client_resp) { + total_count += client_resp.get_total_size(); + return collector->add_all(std::move(client_resp.get_connections())); + }; + + // TODO: we could optimize here further by inspecting the filter and if we + // can detect that it is for a single broker by parsing the filtering AST + // (e.g.; "node_id = X AND ..."), then we could avoid querying nodes other + // than X. + + auto make_broker_req = [&req]() { + auto client_req = proto::admin::list_kafka_connections_request{}; + client_req.set_filter(ss::sstring{req.get_filter()}); + client_req.set_order_by(ss::sstring{req.get_order_by()}); + client_req.set_page_size(req.get_page_size()); + return client_req; + }; + + // Iterate one by one for now to limit memory usage to be approximately in + // the order of 2 x page_size. We could optimize here to issue requests in + // parallel when page_size is small. + auto other_node_clients + = proxy_client + .make_clients_for_other_nodes(); + for (auto& [node_id, client] : other_node_clients) { + auto client_resp = co_await client.list_kafka_connections( + ctx, make_broker_req()); + + co_await add_to_response(std::move(client_resp)); + } + + auto local_resp = co_await list_kafka_connections_local(make_broker_req()); + co_await add_to_response(std::move(local_resp)); + + auto resp = proto::admin::list_kafka_connections_response{}; + resp.set_connections(co_await std::move(*collector).extract()); + resp.set_total_size(total_count); + co_return resp; +} + +size_t kafka_connections_service::get_effective_limit(size_t page_size) { + constexpr size_t default_limit = 1000; + return page_size == 0 ? default_limit : page_size; +} + +} // namespace admin diff --git a/src/v/redpanda/admin/kafka_connections_service.h b/src/v/redpanda/admin/kafka_connections_service.h new file mode 100644 index 0000000000000..cf3c58f56465d --- /dev/null +++ b/src/v/redpanda/admin/kafka_connections_service.h @@ -0,0 +1,49 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "base/seastarx.h" +#include "kafka/server/fwd.h" +#include "proto/redpanda/core/admin/v2/cluster.proto.h" +#include "redpanda/admin/proxy/client.h" + +#include +#include + +#include + +namespace admin { + +class kafka_connections_service { +public: + explicit kafka_connections_service(ss::sharded& kafka_server) + : _kafka_server(kafka_server) {} + + // List connections from all shards on this node + ss::future + list_kafka_connections_local( + proto::admin::list_kafka_connections_request req); + + // List connections from all nodes and shard in the cluster + ss::future + list_kafka_connections_cluster_wide( + admin::proxy::client& proxy_client, + const serde::pb::rpc::context& ctx, + proto::admin::list_kafka_connections_request req); + + static size_t get_effective_limit(size_t page_size); + +private: + ss::sharded& _kafka_server; +}; + +} // namespace admin diff --git a/src/v/redpanda/admin/server.cc b/src/v/redpanda/admin/server.cc index 0a1edfb7e420b..15651972f8e59 100644 --- a/src/v/redpanda/admin/server.cc +++ b/src/v/redpanda/admin/server.cc @@ -307,7 +307,8 @@ admin_server::admin_server( std::unique_ptr& tx_manager_migrator, ss::sharded& kafka_server, ss::sharded& tx_gateway_frontend, - ss::sharded& debug_bundle_service) + ss::sharded& debug_bundle_service, + ss::sharded& kafka_connections_service) : _log_level_timer([this] { log_level_timer_handler(); }) , _server("admin") , _cfg(std::move(cfg)) @@ -339,6 +340,7 @@ admin_server::admin_server( , _kafka_server(kafka_server) , _tx_gateway_frontend(tx_gateway_frontend) , _debug_bundle_service(debug_bundle_service) + , _kafka_connections_service(kafka_connections_service) , _default_blocked_reactor_notify( ss::engine().get_blocked_reactor_notify_ms()) { _server.set_content_streaming(true); @@ -524,10 +526,7 @@ ss::future<> admin_server::start() { }); add_service( std::make_unique( - std::move(client), - &_services, - _kafka_server, - _controller->get_feature_table())); + std::move(client), &_services)); co_await _debug_bundle_file_handler.start(); diff --git a/src/v/redpanda/admin/server.h b/src/v/redpanda/admin/server.h index b26e30cbfcadc..a7d119146c740 100644 --- a/src/v/redpanda/admin/server.h +++ b/src/v/redpanda/admin/server.h @@ -26,6 +26,7 @@ #include "pandaproxy/rest/fwd.h" #include "pandaproxy/schema_registry/fwd.h" #include "redpanda/admin/debug_bundle.h" +#include "redpanda/admin/kafka_connections_service.h" #include "resource_mgmt/cpu_profiler.h" #include "resource_mgmt/memory_sampling.h" #include "rpc/connection_cache.h" @@ -95,7 +96,8 @@ class admin_server : public ss::peering_sharded_service { std::unique_ptr&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); // Add a ConnectRPC service to the admin server. void add_service(std::unique_ptr); @@ -801,6 +803,8 @@ class admin_server : public ss::peering_sharded_service { std::vector> _services; + ss::sharded& _kafka_connections_service; + // Value before the temporary override std::chrono::milliseconds _default_blocked_reactor_notify; ss::timer<> _blocked_reactor_notify_reset_timer; diff --git a/src/v/redpanda/admin/services/BUILD b/src/v/redpanda/admin/services/BUILD index 00d81fd875280..c1eaaf0d72215 100644 --- a/src/v/redpanda/admin/services/BUILD +++ b/src/v/redpanda/admin/services/BUILD @@ -1,22 +1,42 @@ load("//bazel:build.bzl", "redpanda_cc_library") -package(default_visibility = ["//src/v/redpanda/admin:__subpackages__"]) +package(default_visibility = ["//src/v/redpanda:__subpackages__"]) + +redpanda_cc_library( + name = "utils", + srcs = ["utils.cc"], + hdrs = ["utils.h"], + deps = [ + "//src/v/features", + "//src/v/serde/protobuf:rpc", + ], +) redpanda_cc_library( name = "broker", srcs = ["broker.cc"], hdrs = ["broker.h"], deps = [ + ":utils", "//proto/redpanda/core/admin/v2:broker_redpanda_proto", - "//src/v/container:priority_queue", - "//src/v/features", - "//src/v/kafka/server", - "//src/v/redpanda/admin:aip_filter", - "//src/v/redpanda/admin:aip_ordering", "//src/v/redpanda/admin/proxy:client", "//src/v/serde/protobuf:rpc", - "//src/v/ssx:async_algorithm", "//src/v/version", "@seastar", ], ) + +redpanda_cc_library( + name = "cluster", + srcs = ["cluster.cc"], + hdrs = ["cluster.h"], + deps = [ + ":utils", + "//proto/redpanda/core/admin/v2:cluster_redpanda_proto", + "//src/v/features", + "//src/v/redpanda/admin:kafka_connections_service", + "//src/v/redpanda/admin/proxy:client", + "//src/v/serde/protobuf:rpc", + "@seastar", + ], +) diff --git a/src/v/redpanda/admin/services/broker.cc b/src/v/redpanda/admin/services/broker.cc index c73ce46599c1a..e8107edd02066 100644 --- a/src/v/redpanda/admin/services/broker.cc +++ b/src/v/redpanda/admin/services/broker.cc @@ -11,15 +11,8 @@ #include "redpanda/admin/services/broker.h" -#include "container/priority_queue.h" -#include "features/feature_table.h" -#include "kafka/server/connection_context.h" -#include "kafka/server/server.h" -#include "proto/redpanda/core/admin/v2/kafka_connections.proto.h" -#include "redpanda/admin/aip_filter.h" -#include "redpanda/admin/aip_ordering.h" +#include "redpanda/admin/services/utils.h" #include "serde/protobuf/rpc.h" -#include "ssx/async_algorithm.h" #include "version/version.h" #include @@ -38,13 +31,9 @@ ss::logger brlog{"admin_api_server/broker_service"}; broker_service_impl::broker_service_impl( admin::proxy::client client, - std::vector>* services, - ss::sharded& kafka_server, - ss::sharded& feature_table) + std::vector>* services) : _proxy_client(std::move(client)) - , _services(services) - , _kafka_server(kafka_server) - , _feature_table(feature_table) {} + , _services(services) {} ss::future broker_service_impl::get_broker( serde::pb::rpc::context ctx, proto::admin::get_broker_request req) { @@ -93,215 +82,4 @@ proto::admin::broker broker_service_impl::self_broker() const { return b; } -namespace { - -struct connection_collector { - virtual ~connection_collector() = default; - virtual void add(proto::kafka_connection conn) = 0; - virtual chunked_vector extract() && = 0; - virtual size_t size() const = 0; -}; - -class unordered_collector : public connection_collector { - chunked_vector _connections; - size_t _limit; - -public: - explicit unordered_collector(size_t limit) - : _limit(limit) {} - - void add(proto::kafka_connection conn) final { - if (_connections.size() < _limit) { - _connections.emplace_back(std::move(conn)); - } - } - - chunked_vector extract() && final { - return std::move(_connections); - } - - size_t size() const final { return _connections.size(); } -}; - -template -class ordered_collector : public connection_collector { - // Invert the order here to get the min-k instead of the max-k - chunked_bounded_priority_queue< - proto::kafka_connection, - detail::invert_comparator> - _pq; - -public: - ordered_collector(size_t limit, Comparator comp) - : _pq(limit, detail::invert_comparator(std::move(comp))) {} - - void add(proto::kafka_connection conn) final { _pq.push(std::move(conn)); } - - chunked_vector extract() && final { - return std::move(_pq).extract_heap(); - } - - size_t size() const final { return _pq.size(); } - - ss::future> extract_sorted() && { - return std::move(_pq).async_extract_sorted(); - } -}; - -struct connection_gather_result { - chunked_vector connections; - size_t total_matching_count; -}; - -ss::future gather_connections( - const kafka::server& server, - const filter_predicate& filter, - ss::shared_ptr collector) { - auto result = connection_gather_result{}; - - auto conn_ptrs = server.list_connections(); - co_await ss::coroutine::maybe_yield(); - - auto process_conn = [&result, &collector, &filter]( - proto::admin::kafka_connection&& conn_proto) { - bool matches_filter = filter(conn_proto); - - if (matches_filter) { - result.total_matching_count++; - collector->add(std::move(conn_proto)); - } - }; - - co_await ssx::async_for_each( - conn_ptrs, [&process_conn](const auto& conn_ptr) { - process_conn(conn_ptr->to_proto()); - }); - - auto closed_conns = server.list_closed_connections(); - for (auto& elem : closed_conns) { - auto elem_copy = co_await proto::admin::kafka_connection::from_proto( - co_await elem->to_proto()); - process_conn(std::move(elem_copy)); - } - - result.connections = std::move(*collector).extract(); - co_return result; -} - -ss::future gather_all_shards( - ss::sharded& kafka_server, - const filter_predicate& filter, - const auto& make_local_collector, - connection_collector& global_collector) { - size_t total_matching_connections = 0; - - for (ss::shard_id shard = 0; shard < ss::smp::count; ++shard) { - auto accumulated_count = global_collector.size(); - auto shard_result = co_await kafka_server.invoke_on( - shard, - [accumulated_count, &filter, &make_local_collector]( - kafka::server& server) { - return gather_connections( - server, filter, make_local_collector(accumulated_count)); - }); - - total_matching_connections += shard_result.total_matching_count; - - for (auto& conn : shard_result.connections) { - global_collector.add(std::move(conn)); - } - - co_await ss::coroutine::maybe_yield(); - } - - co_return total_matching_connections; -} - -void check_license(const features::feature_table& ft) { - if (ft.should_sanction()) { - const auto& license = ft.get_license(); - auto status = [&license]() { - return !license.has_value() ? "not present" - : license->is_expired() ? "expired" - : "unknown error"; - }; - throw serde::pb::rpc::failed_precondition_exception( - fmt::format("Invalid license: {}", status())); - } -} - -} // namespace - -ss::future -broker_service_impl::list_kafka_connections( - serde::pb::rpc::context ctx, - proto::admin::list_kafka_connections_request req) { - vlog(brlog.trace, "list_kafka_connections: {}", req); - - check_license(_feature_table.local()); - - // Proxy to the target node id specified in the request - auto target = model::node_id{req.get_node_id()}; - if (target != -1 && target != _proxy_client.self_node_id()) { - vlog(brlog.debug, "Redirecting to target node id {}", target); - co_return co_await _proxy_client - .make_client_for_node(target) - .list_kafka_connections(ctx, std::move(req)); - } - - auto resp = proto::admin::list_kafka_connections_response{}; - - constexpr size_t default_limit = 1000; - auto limit = (req.get_page_size() == 0) ? default_limit - : req.get_page_size(); - - auto filter_cfg = make_aip_filter_config( - req.get_filter()); - auto filter = aip_filter_parser::create_aip_filter(std::move(filter_cfg)); - - if (req.get_order_by().empty()) { - auto global_collector = unordered_collector{limit}; - - auto make_local_collector = [limit](size_t accumulated_count) { - return ss::make_shared( - limit - accumulated_count); - }; - - auto total_matching_connections = co_await gather_all_shards( - _kafka_server, filter, make_local_collector, global_collector); - - resp.set_connections(std::move(global_collector).extract()); - resp.set_total_size(total_matching_connections); - } else { - auto ordering_conf - = make_ordering_config( - req.get_order_by()); - auto comp = sort_order::parse(ordering_conf); - - auto global_collector = ordered_collector{limit, comp}; - - auto make_local_collector = [limit, &comp](size_t) { - return ss::make_shared>( - limit, comp); - }; - - auto total_matching_connections = co_await gather_all_shards( - _kafka_server, filter, make_local_collector, global_collector); - - resp.set_connections( - co_await std::move(global_collector).extract_sorted()); - resp.set_total_size(total_matching_connections); - } - - vlog( - brlog.trace, - "list_kafka_connections: response connections: {} ({}b), total matching: " - "{}", - resp.get_connections().size(), - resp.get_connections().memory_size(), - resp.get_total_size()); - - co_return resp; -} - } // namespace admin diff --git a/src/v/redpanda/admin/services/broker.h b/src/v/redpanda/admin/services/broker.h index 40e33ce3dd934..579e5396e016b 100644 --- a/src/v/redpanda/admin/services/broker.h +++ b/src/v/redpanda/admin/services/broker.h @@ -11,8 +11,6 @@ #pragma once -#include "features/fwd.h" -#include "kafka/server/fwd.h" #include "proto/redpanda/core/admin/v2/broker.proto.h" #include "redpanda/admin/proxy/client.h" @@ -25,27 +23,18 @@ class broker_service_impl : public proto::admin::broker_service { public: broker_service_impl( admin::proxy::client, - std::vector>* services, - ss::sharded& kafka_server, - ss::sharded& _feature_table); + std::vector>* services); ss::future get_broker( serde::pb::rpc::context, proto::admin::get_broker_request) override; ss::future list_brokers( serde::pb::rpc::context, proto::admin::list_brokers_request) override; - ss::future - list_kafka_connections( - serde::pb::rpc::context, - proto::admin::list_kafka_connections_request) override; - private: proto::admin::broker self_broker() const; admin::proxy::client _proxy_client; std::vector>* _services; - ss::sharded& _kafka_server; - ss::sharded& _feature_table; }; } // namespace admin diff --git a/src/v/redpanda/admin/services/cluster.cc b/src/v/redpanda/admin/services/cluster.cc new file mode 100644 index 0000000000000..47655a0532a70 --- /dev/null +++ b/src/v/redpanda/admin/services/cluster.cc @@ -0,0 +1,67 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "redpanda/admin/services/cluster.h" + +#include "features/feature_table.h" +#include "proto/redpanda/core/admin/v2/cluster.proto.h" +#include "redpanda/admin/kafka_connections_service.h" +#include "redpanda/admin/services/utils.h" +#include "serde/protobuf/rpc.h" + +#include + +namespace proto { +using namespace proto::admin; +} // namespace proto + +namespace admin { + +namespace { +// NOLINTNEXTLINE(*-non-const-global-variables,cert-err58-*) +ss::logger brlog{"admin_api_server/cluster_service"}; + +} // namespace + +cluster_service_impl::cluster_service_impl( + admin::proxy::client client, + ss::sharded& kafka_connections_service, + ss::sharded& feature_table) + : _proxy_client(std::move(client)) + , _kafka_connections_service(kafka_connections_service) + , _feature_table(feature_table) {} + +ss::future +cluster_service_impl::list_kafka_connections( + serde::pb::rpc::context ctx, + proto::admin::list_kafka_connections_request req) { + vlog(brlog.trace, "list_kafka_connections: {}", req); + + utils::check_license(_feature_table.local()); + + auto& kcs = _kafka_connections_service.local(); + auto resp = ctx.is_proxied() + ? co_await kcs.list_kafka_connections_local(std::move(req)) + : co_await kcs.list_kafka_connections_cluster_wide( + _proxy_client, ctx, std::move(req)); + + vlog( + brlog.trace, + "list_kafka_connections: response connections: {} ({}b), total matching: " + "{}", + resp.get_connections().size(), + resp.get_connections().memory_size(), + resp.get_total_size()); + + co_return resp; +} + +} // namespace admin diff --git a/src/v/redpanda/admin/services/cluster.h b/src/v/redpanda/admin/services/cluster.h new file mode 100644 index 0000000000000..2df2a15be1659 --- /dev/null +++ b/src/v/redpanda/admin/services/cluster.h @@ -0,0 +1,39 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "features/fwd.h" +#include "proto/redpanda/core/admin/v2/cluster.proto.h" +#include "redpanda/admin/kafka_connections_service.h" +#include "redpanda/admin/proxy/client.h" + +namespace admin { + +class cluster_service_impl : public proto::admin::cluster_service { +public: + cluster_service_impl( + admin::proxy::client, + ss::sharded& kafka_connections_service, + ss::sharded& feature_table); + + ss::future + list_kafka_connections( + serde::pb::rpc::context, + proto::admin::list_kafka_connections_request) override; + +private: + admin::proxy::client _proxy_client; + ss::sharded& _kafka_connections_service; + ss::sharded& _feature_table; +}; + +} // namespace admin diff --git a/src/v/redpanda/admin/services/utils.cc b/src/v/redpanda/admin/services/utils.cc new file mode 100644 index 0000000000000..39786bdce7f15 --- /dev/null +++ b/src/v/redpanda/admin/services/utils.cc @@ -0,0 +1,32 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "redpanda/admin/services/utils.h" + +#include "features/feature_table.h" +#include "serde/protobuf/rpc.h" + +namespace admin::utils { + +void check_license(const features::feature_table& ft) { + if (ft.should_sanction()) { + const auto& license = ft.get_license(); + auto status = [&license]() { + return !license.has_value() ? "not present" + : license->is_expired() ? "expired" + : "unknown error"; + }; + throw serde::pb::rpc::failed_precondition_exception( + fmt::format("Invalid license: {}", status())); + } +} + +} // namespace admin::utils diff --git a/src/v/redpanda/admin/services/utils.h b/src/v/redpanda/admin/services/utils.h new file mode 100644 index 0000000000000..41082b8037b6d --- /dev/null +++ b/src/v/redpanda/admin/services/utils.h @@ -0,0 +1,20 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "features/feature_table.h" + +namespace admin::utils { + +void check_license(const features::feature_table& ft); + +} // namespace admin::utils diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index c5f8ccc00e9fc..af5d4e0924f41 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -138,9 +138,11 @@ #include "raft/coordinated_recovery_throttle.h" #include "raft/group_manager.h" #include "raft/service.h" +#include "redpanda/admin/kafka_connections_service.h" #include "redpanda/admin/proxy/client.h" #include "redpanda/admin/proxy/service.h" #include "redpanda/admin/server.h" +#include "redpanda/admin/services/cluster.h" #include "redpanda/admin/services/datalake/datalake.h" #include "redpanda/admin/services/internal/debug.h" #include "redpanda/admin/services/shadow_link/shadow_link.h" @@ -1151,7 +1153,8 @@ void application::configure_admin_server(model::node_id node_id) { std::ref(_tx_manager_migrator), std::ref(_kafka_server.ref()), std::ref(tx_gateway_frontend), - std::ref(_debug_bundle_service)) + std::ref(_debug_bundle_service), + std::ref(_kafka_connections_service)) .get(); _admin .invoke_on_all([this, node_id](admin_server& s) { @@ -1170,6 +1173,11 @@ void application::configure_admin_server(model::node_id node_id) { s.add_service( std::make_unique( create_client(), &_datalake_coordinator_fe)); + s.add_service( + std::make_unique( + create_client(), + std::ref(_kafka_connections_service), + controller->get_feature_table())); }) .get(); } @@ -1567,6 +1575,9 @@ void application::wire_up_runtime_services( construct_single_service(_host_metrics_watcher, std::ref(_log)); + construct_service(_kafka_connections_service, std::ref(_kafka_server.ref())) + .get(); + configure_admin_server(node_id); } diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index fd0a06b51dff9..228c42d80e442 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -54,6 +54,7 @@ #include "pandaproxy/schema_registry/configuration.h" #include "pandaproxy/schema_registry/fwd.h" #include "raft/fwd.h" +#include "redpanda/admin/kafka_connections_service.h" #include "redpanda/monitor_unsafe.h" #include "resource_mgmt/cpu_profiler.h" #include "resource_mgmt/cpu_scheduling.h" @@ -343,6 +344,8 @@ class application : public ssx::sharded_service_container { std::unique_ptr _host_metrics_watcher; + ss::sharded _kafka_connections_service; + ss::sharded _as; }; diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.py index 758b43bb9bbfe..f5bb794ce4757 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.py @@ -8,8 +8,7 @@ _sym_db = _symbol_database.Default() from ......proto.redpanda.core.pbgen import options_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_options__pb2 from ......proto.redpanda.core.pbgen import rpc_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_rpc__pb2 -from ......proto.redpanda.core.admin.v2 import kafka_connections_pb2 as proto_dot_redpanda_dot_core_dot_admin_dot_v2_dot_kafka__connections__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n)proto/redpanda/core/admin/v2/broker.proto\x12\x16redpanda.core.admin.v2\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto\x1a4proto/redpanda/core/admin/v2/kafka_connections.proto"#\n\x10GetBrokerRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\x05"C\n\x11GetBrokerResponse\x12.\n\x06broker\x18\x01 \x01(\x0b2\x1e.redpanda.core.admin.v2.Broker"\x14\n\x12ListBrokersRequest"F\n\x13ListBrokersResponse\x12/\n\x07brokers\x18\x01 \x03(\x0b2\x1e.redpanda.core.admin.v2.Broker"c\n\x1bListKafkaConnectionsRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\x05\x12\x11\n\tpage_size\x18\x02 \x01(\x05\x12\x0e\n\x06filter\x18\x03 \x01(\t\x12\x10\n\x08order_by\x18\x04 \x01(\t"p\n\x1cListKafkaConnectionsResponse\x12<\n\x0bconnections\x18\x01 \x03(\x0b2\'.redpanda.core.admin.v2.KafkaConnection\x12\x12\n\ntotal_size\x18\x02 \x01(\x04"\x8b\x01\n\x06Broker\x12\x0f\n\x07node_id\x18\x01 \x01(\x05\x125\n\nbuild_info\x18\x02 \x01(\x0b2!.redpanda.core.admin.v2.BuildInfo\x129\n\x0cadmin_server\x18\x03 \x01(\x0b2#.redpanda.core.admin.v2.AdminServer"/\n\tBuildInfo\x12\x0f\n\x07version\x18\x01 \x01(\t\x12\x11\n\tbuild_sha\x18\x02 \x01(\t"?\n\x0bAdminServer\x120\n\x06routes\x18\x01 \x03(\x0b2 .redpanda.core.admin.v2.RPCRoute",\n\x08RPCRoute\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x12\n\nhttp_route\x18\x02 \x01(\t2\xf5\x02\n\rBrokerService\x12h\n\tGetBroker\x12(.redpanda.core.admin.v2.GetBrokerRequest\x1a).redpanda.core.admin.v2.GetBrokerResponse"\x06\xea\x92\x19\x02\x10\x03\x12n\n\x0bListBrokers\x12*.redpanda.core.admin.v2.ListBrokersRequest\x1a+.redpanda.core.admin.v2.ListBrokersResponse"\x06\xea\x92\x19\x02\x10\x03\x12\x89\x01\n\x14ListKafkaConnections\x123.redpanda.core.admin.v2.ListKafkaConnectionsRequest\x1a4.redpanda.core.admin.v2.ListKafkaConnectionsResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n)proto/redpanda/core/admin/v2/broker.proto\x12\x16redpanda.core.admin.v2\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto"#\n\x10GetBrokerRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\x05"C\n\x11GetBrokerResponse\x12.\n\x06broker\x18\x01 \x01(\x0b2\x1e.redpanda.core.admin.v2.Broker"\x14\n\x12ListBrokersRequest"F\n\x13ListBrokersResponse\x12/\n\x07brokers\x18\x01 \x03(\x0b2\x1e.redpanda.core.admin.v2.Broker"\x8b\x01\n\x06Broker\x12\x0f\n\x07node_id\x18\x01 \x01(\x05\x125\n\nbuild_info\x18\x02 \x01(\x0b2!.redpanda.core.admin.v2.BuildInfo\x129\n\x0cadmin_server\x18\x03 \x01(\x0b2#.redpanda.core.admin.v2.AdminServer"/\n\tBuildInfo\x12\x0f\n\x07version\x18\x01 \x01(\t\x12\x11\n\tbuild_sha\x18\x02 \x01(\t"?\n\x0bAdminServer\x120\n\x06routes\x18\x01 \x03(\x0b2 .redpanda.core.admin.v2.RPCRoute",\n\x08RPCRoute\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x12\n\nhttp_route\x18\x02 \x01(\t2\xe9\x01\n\rBrokerService\x12h\n\tGetBroker\x12(.redpanda.core.admin.v2.GetBrokerRequest\x1a).redpanda.core.admin.v2.GetBrokerResponse"\x06\xea\x92\x19\x02\x10\x03\x12n\n\x0bListBrokers\x12*.redpanda.core.admin.v2.ListBrokersRequest\x1a+.redpanda.core.admin.v2.ListBrokersResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'proto.redpanda.core.admin.v2.broker_pb2', _globals) @@ -20,27 +19,21 @@ _globals['_BROKERSERVICE'].methods_by_name['GetBroker']._serialized_options = b'\xea\x92\x19\x02\x10\x03' _globals['_BROKERSERVICE'].methods_by_name['ListBrokers']._loaded_options = None _globals['_BROKERSERVICE'].methods_by_name['ListBrokers']._serialized_options = b'\xea\x92\x19\x02\x10\x03' - _globals['_BROKERSERVICE'].methods_by_name['ListKafkaConnections']._loaded_options = None - _globals['_BROKERSERVICE'].methods_by_name['ListKafkaConnections']._serialized_options = b'\xea\x92\x19\x02\x10\x03' - _globals['_GETBROKERREQUEST']._serialized_start = 201 - _globals['_GETBROKERREQUEST']._serialized_end = 236 - _globals['_GETBROKERRESPONSE']._serialized_start = 238 - _globals['_GETBROKERRESPONSE']._serialized_end = 305 - _globals['_LISTBROKERSREQUEST']._serialized_start = 307 - _globals['_LISTBROKERSREQUEST']._serialized_end = 327 - _globals['_LISTBROKERSRESPONSE']._serialized_start = 329 - _globals['_LISTBROKERSRESPONSE']._serialized_end = 399 - _globals['_LISTKAFKACONNECTIONSREQUEST']._serialized_start = 401 - _globals['_LISTKAFKACONNECTIONSREQUEST']._serialized_end = 500 - _globals['_LISTKAFKACONNECTIONSRESPONSE']._serialized_start = 502 - _globals['_LISTKAFKACONNECTIONSRESPONSE']._serialized_end = 614 - _globals['_BROKER']._serialized_start = 617 - _globals['_BROKER']._serialized_end = 756 - _globals['_BUILDINFO']._serialized_start = 758 - _globals['_BUILDINFO']._serialized_end = 805 - _globals['_ADMINSERVER']._serialized_start = 807 - _globals['_ADMINSERVER']._serialized_end = 870 - _globals['_RPCROUTE']._serialized_start = 872 - _globals['_RPCROUTE']._serialized_end = 916 - _globals['_BROKERSERVICE']._serialized_start = 919 - _globals['_BROKERSERVICE']._serialized_end = 1292 \ No newline at end of file + _globals['_GETBROKERREQUEST']._serialized_start = 147 + _globals['_GETBROKERREQUEST']._serialized_end = 182 + _globals['_GETBROKERRESPONSE']._serialized_start = 184 + _globals['_GETBROKERRESPONSE']._serialized_end = 251 + _globals['_LISTBROKERSREQUEST']._serialized_start = 253 + _globals['_LISTBROKERSREQUEST']._serialized_end = 273 + _globals['_LISTBROKERSRESPONSE']._serialized_start = 275 + _globals['_LISTBROKERSRESPONSE']._serialized_end = 345 + _globals['_BROKER']._serialized_start = 348 + _globals['_BROKER']._serialized_end = 487 + _globals['_BUILDINFO']._serialized_start = 489 + _globals['_BUILDINFO']._serialized_end = 536 + _globals['_ADMINSERVER']._serialized_start = 538 + _globals['_ADMINSERVER']._serialized_end = 601 + _globals['_RPCROUTE']._serialized_start = 603 + _globals['_RPCROUTE']._serialized_end = 647 + _globals['_BROKERSERVICE']._serialized_start = 650 + _globals['_BROKERSERVICE']._serialized_end = 883 \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.pyi index 2b39d47acc35d..98aba15b4bc8e 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.pyi +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.pyi @@ -20,7 +20,6 @@ import collections.abc import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.message -from ...... import proto import typing DESCRIPTOR: google.protobuf.descriptor.FileDescriptor @@ -85,53 +84,6 @@ class ListBrokersResponse(google.protobuf.message.Message): ... global___ListBrokersResponse = ListBrokersResponse -@typing.final -class ListKafkaConnectionsRequest(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - NODE_ID_FIELD_NUMBER: builtins.int - PAGE_SIZE_FIELD_NUMBER: builtins.int - FILTER_FIELD_NUMBER: builtins.int - ORDER_BY_FIELD_NUMBER: builtins.int - node_id: builtins.int - 'The node ID for the broker. If set to -1, the broker handling the RPC\n request returns information about itself.\n ' - page_size: builtins.int - 'The maximum number of connections to return. If unspecified or 0, a\n default value may be applied. Note that paging is currently not fully\n supported, and this field only acts as a limit for the first page of data\n returned. Subsequent pages of data cannot be requested.\n ' - filter: builtins.str - 'Filter expression to apply to the connection list.\n Uses a subset of AIP-160 filter syntax supporting:\n - Field comparisons (`=`, `!=`, `<`, `>`, `<=`, `>=`)\n - Logical AND chaining: condition1 AND condition2\n - Nested field access: parent.child = value\n - Escape sequences: field = "string with \\"quotes\\""\n - Enum types\n - RFC3339 timestamps and ISO-like duration\n\n Limitations (not supported):\n - Logical OR chaining\n - Parentheses `(` `)` for grouping\n - Map and repeated types\n - HAS (:) operator\n - Negation (-, NOT)\n - Bare literal matching\n\n Example filters:\n - `state = KAFKA_CONNECTION_STATE_OPEN`\n - `idle_duration > 30s AND request_count_total > 100`\n - `authentication_info.user_principal = "my-producer"`\n - `recent_request_statistics.produce_bytes > 1000 AND\n client_software_name = "kgo"`\n - `open_time >= 2025-09-01T10:22:54Z`\n\n Reference: https://google.aip.dev/160\n ' - order_by: builtins.str - 'Field-based ordering specification following AIP-132 syntax.\n Supports multiple fields with `asc`/`desc` direction indicators.\n Examples:\n - `idle_duration desc` - longest idle connections first\n - `open_time desc, total_request_statistics.request_count desc` - newest\n connections first, then most active\n - `recent_request_statistics.produce_bytes desc` - connections with\n highest current produce throughput first\n\n Reference: https://google.aip.dev/132#ordering\n ' - - def __init__(self, *, node_id: builtins.int=..., page_size: builtins.int=..., filter: builtins.str=..., order_by: builtins.str=...) -> None: - ... - - def ClearField(self, field_name: typing.Literal['filter', b'filter', 'node_id', b'node_id', 'order_by', b'order_by', 'page_size', b'page_size']) -> None: - ... -global___ListKafkaConnectionsRequest = ListKafkaConnectionsRequest - -@typing.final -class ListKafkaConnectionsResponse(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - CONNECTIONS_FIELD_NUMBER: builtins.int - TOTAL_SIZE_FIELD_NUMBER: builtins.int - total_size: builtins.int - 'Total number of connections matching the request.\n This may be greater than `len(connections)` if some connections were\n omitted from the response due to the specified (or default) `page_size`.\n Example:\n request.page_size = 10\n response.connections = [<10 items>]\n response.total_size = 13\n ' - - @property - def connections(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[proto.redpanda.core.admin.v2.kafka_connections_pb2.KafkaConnection]: - """The list of connections matching the request. - Note that in addition to open connections, some recently-closed - connections may also be included here. If you don't want to include - closed connections, set the filter in the request to `state = - KAFKA_CONNECTION_STATE_OPEN`. - """ - - def __init__(self, *, connections: collections.abc.Iterable[proto.redpanda.core.admin.v2.kafka_connections_pb2.KafkaConnection] | None=..., total_size: builtins.int=...) -> None: - ... - - def ClearField(self, field_name: typing.Literal['connections', b'connections', 'total_size', b'total_size']) -> None: - ... -global___ListKafkaConnectionsResponse = ListKafkaConnectionsResponse - @typing.final class Broker(google.protobuf.message.Message): """The resource for an individual broker within the Kafka Cluster.""" diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2_connect.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2_connect.py index ca976ae429771..c06586783aa6f 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2_connect.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2_connect.py @@ -64,21 +64,6 @@ def list_brokers(self, req: proto.redpanda.core.admin.v2.broker_pb2.ListBrokersR raise ConnectProtocolError('missing response message') return msg - def call_list_kafka_connections(self, req: proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsResponse]: - """Low-level method to call ListKafkaConnections, granting access to errors and metadata""" - url = self.base_url + '/redpanda.core.admin.v2.BrokerService/ListKafkaConnections' - return self._connect_client.call_unary(url, req, proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsResponse, extra_headers, timeout_seconds) - - def list_kafka_connections(self, req: proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsResponse: - response = self.call_list_kafka_connections(req, extra_headers, timeout_seconds) - err = response.error() - if err is not None: - raise err - msg = response.message() - if msg is None: - raise ConnectProtocolError('missing response message') - return msg - class AsyncBrokerServiceClient: def __init__(self, base_url: str, http_client: aiohttp.ClientSession, protocol: ConnectProtocol=ConnectProtocol.CONNECT_PROTOBUF): @@ -115,21 +100,6 @@ async def list_brokers(self, req: proto.redpanda.core.admin.v2.broker_pb2.ListBr raise ConnectProtocolError('missing response message') return msg - async def call_list_kafka_connections(self, req: proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsResponse]: - """Low-level method to call ListKafkaConnections, granting access to errors and metadata""" - url = self.base_url + '/redpanda.core.admin.v2.BrokerService/ListKafkaConnections' - return await self._connect_client.call_unary(url, req, proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsResponse, extra_headers, timeout_seconds) - - async def list_kafka_connections(self, req: proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsResponse: - response = await self.call_list_kafka_connections(req, extra_headers, timeout_seconds) - err = response.error() - if err is not None: - raise err - msg = response.message() - if msg is None: - raise ConnectProtocolError('missing response message') - return msg - @typing.runtime_checkable class BrokerServiceProtocol(typing.Protocol): @@ -138,14 +108,10 @@ def get_broker(self, req: ClientRequest[proto.redpanda.core.admin.v2.broker_pb2. def list_brokers(self, req: ClientRequest[proto.redpanda.core.admin.v2.broker_pb2.ListBrokersRequest]) -> ServerResponse[proto.redpanda.core.admin.v2.broker_pb2.ListBrokersResponse]: ... - - def list_kafka_connections(self, req: ClientRequest[proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsRequest]) -> ServerResponse[proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsResponse]: - ... BROKER_SERVICE_PATH_PREFIX = '/redpanda.core.admin.v2.BrokerService' def wsgi_broker_service(implementation: BrokerServiceProtocol) -> WSGIApplication: app = ConnectWSGI() app.register_unary_rpc('/redpanda.core.admin.v2.BrokerService/GetBroker', implementation.get_broker, proto.redpanda.core.admin.v2.broker_pb2.GetBrokerRequest) app.register_unary_rpc('/redpanda.core.admin.v2.BrokerService/ListBrokers', implementation.list_brokers, proto.redpanda.core.admin.v2.broker_pb2.ListBrokersRequest) - app.register_unary_rpc('/redpanda.core.admin.v2.BrokerService/ListKafkaConnections', implementation.list_kafka_connections, proto.redpanda.core.admin.v2.broker_pb2.ListKafkaConnectionsRequest) return app \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2.py new file mode 100644 index 0000000000000..4b6f7303872f9 --- /dev/null +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2.py @@ -0,0 +1,26 @@ +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion(_runtime_version.Domain.PUBLIC, 5, 29, 0, '', 'proto/redpanda/core/admin/v2/cluster.proto') +_sym_db = _symbol_database.Default() +from ......proto.redpanda.core.pbgen import options_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_options__pb2 +from ......proto.redpanda.core.pbgen import rpc_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_rpc__pb2 +from ......proto.redpanda.core.admin.v2 import kafka_connections_pb2 as proto_dot_redpanda_dot_core_dot_admin_dot_v2_dot_kafka__connections__pb2 +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n*proto/redpanda/core/admin/v2/cluster.proto\x12\x16redpanda.core.admin.v2\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto\x1a4proto/redpanda/core/admin/v2/kafka_connections.proto"R\n\x1bListKafkaConnectionsRequest\x12\x11\n\tpage_size\x18\x01 \x01(\x05\x12\x0e\n\x06filter\x18\x02 \x01(\t\x12\x10\n\x08order_by\x18\x03 \x01(\t"p\n\x1cListKafkaConnectionsResponse\x12<\n\x0bconnections\x18\x01 \x03(\x0b2\'.redpanda.core.admin.v2.KafkaConnection\x12\x12\n\ntotal_size\x18\x02 \x01(\x042\x9c\x01\n\x0eClusterService\x12\x89\x01\n\x14ListKafkaConnections\x123.redpanda.core.admin.v2.ListKafkaConnectionsRequest\x1a4.redpanda.core.admin.v2.ListKafkaConnectionsResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'proto.redpanda.core.admin.v2.cluster_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'\xea\x92\x19\x0cproto::admin' + _globals['_CLUSTERSERVICE'].methods_by_name['ListKafkaConnections']._loaded_options = None + _globals['_CLUSTERSERVICE'].methods_by_name['ListKafkaConnections']._serialized_options = b'\xea\x92\x19\x02\x10\x03' + _globals['_LISTKAFKACONNECTIONSREQUEST']._serialized_start = 202 + _globals['_LISTKAFKACONNECTIONSREQUEST']._serialized_end = 284 + _globals['_LISTKAFKACONNECTIONSRESPONSE']._serialized_start = 286 + _globals['_LISTKAFKACONNECTIONSRESPONSE']._serialized_end = 398 + _globals['_CLUSTERSERVICE']._serialized_start = 401 + _globals['_CLUSTERSERVICE']._serialized_end = 557 \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2.pyi new file mode 100644 index 0000000000000..4df186c504185 --- /dev/null +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2.pyi @@ -0,0 +1,75 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +Copyright 2025 Redpanda Data, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import builtins +import collections.abc +import google.protobuf.descriptor +import google.protobuf.internal.containers +import google.protobuf.message +from ...... import proto +import typing +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing.final +class ListKafkaConnectionsRequest(google.protobuf.message.Message): + """ListKafkaConnectionsRequest return information about the broker's Kafka + connections. + """ + DESCRIPTOR: google.protobuf.descriptor.Descriptor + PAGE_SIZE_FIELD_NUMBER: builtins.int + FILTER_FIELD_NUMBER: builtins.int + ORDER_BY_FIELD_NUMBER: builtins.int + page_size: builtins.int + 'The maximum number of connections to return. If unspecified or 0, a\n default value may be applied. Note that paging is currently not fully\n supported, and this field only acts as a limit for the first page of data\n returned. Subsequent pages of data cannot be requested.\n ' + filter: builtins.str + 'Filter expression to apply to the connection list.\n Uses a subset of AIP-160 filter syntax supporting:\n - Field comparisons (`=`, `!=`, `<`, `>`, `<=`, `>=`)\n - Logical AND chaining: condition1 AND condition2\n - Nested field access: parent.child = value\n - Escape sequences: field = "string with \\"quotes\\""\n - Enum types\n - RFC3339 timestamps and ISO-like duration\n\n Limitations (not supported):\n - Logical OR chaining\n - Parentheses `(` `)` for grouping\n - Map and repeated types\n - HAS (:) operator\n - Negation (-, NOT)\n - Bare literal matching\n\n Example filters:\n - `state = KAFKA_CONNECTION_STATE_OPEN`\n - `idle_duration > 30s AND total_request_statistics.request_count > 100`\n - `authentication_info.user_principal = "my-producer"`\n - `recent_request_statistics.produce_bytes > 1000 AND\n client_software_name = "kgo"`\n - `open_time >= 2025-09-01T10:22:54Z`\n\n Reference: https://google.aip.dev/160\n ' + order_by: builtins.str + 'Field-based ordering specification following AIP-132 syntax.\n Supports multiple fields with `asc`/`desc` direction indicators.\n Examples:\n - `idle_duration desc` - longest idle connections first\n - `open_time desc, total_request_statistics.request_count desc` - newest\n connections first, then most active\n - `recent_request_statistics.produce_bytes desc` - connections with\n highest current produce throughput first\n\n Reference: https://google.aip.dev/132#ordering\n ' + + def __init__(self, *, page_size: builtins.int=..., filter: builtins.str=..., order_by: builtins.str=...) -> None: + ... + + def ClearField(self, field_name: typing.Literal['filter', b'filter', 'order_by', b'order_by', 'page_size', b'page_size']) -> None: + ... +global___ListKafkaConnectionsRequest = ListKafkaConnectionsRequest + +@typing.final +class ListKafkaConnectionsResponse(google.protobuf.message.Message): + """ListKafkaConnectionsResponse is the response from the ListKafkaConnections + RPC. + """ + DESCRIPTOR: google.protobuf.descriptor.Descriptor + CONNECTIONS_FIELD_NUMBER: builtins.int + TOTAL_SIZE_FIELD_NUMBER: builtins.int + total_size: builtins.int + 'Total number of connections matching the request.\n This may be greater than `len(connections)` if some connections were\n omitted from the response due to the specified (or default) `page_size`.\n Example:\n request.page_size = 10\n response.connections = [<10 items>]\n response.total_size = 13\n ' + + @property + def connections(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[proto.redpanda.core.admin.v2.kafka_connections_pb2.KafkaConnection]: + """The list of connections matching the request. + Note that in addition to open connections, some recently-closed + connections may also be included here. If you don't want to include + closed connections, set the filter in the request to `state = + KAFKA_CONNECTION_STATE_OPEN`. + """ + + def __init__(self, *, connections: collections.abc.Iterable[proto.redpanda.core.admin.v2.kafka_connections_pb2.KafkaConnection] | None=..., total_size: builtins.int=...) -> None: + ... + + def ClearField(self, field_name: typing.Literal['connections', b'connections', 'total_size', b'total_size']) -> None: + ... +global___ListKafkaConnectionsResponse = ListKafkaConnectionsResponse \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2_connect.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2_connect.py new file mode 100644 index 0000000000000..d76592ca89558 --- /dev/null +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2_connect.py @@ -0,0 +1,83 @@ +from __future__ import annotations +from collections.abc import AsyncIterator +from collections.abc import Iterator +from collections.abc import Iterable +import aiohttp +import urllib3 +import typing +import sys +from connectrpc.client_async import AsyncConnectClient +from connectrpc.client_sync import ConnectClient +from connectrpc.client_protocol import ConnectProtocol +from connectrpc.client_connect import ConnectProtocolError +from connectrpc.headers import HeaderInput +from connectrpc.server import ClientRequest +from connectrpc.server import ClientStream +from connectrpc.server import ServerResponse +from connectrpc.server import ServerStream +from connectrpc.server_sync import ConnectWSGI +from connectrpc.streams import StreamInput +from connectrpc.streams import AsyncStreamOutput +from connectrpc.streams import StreamOutput +from connectrpc.unary import UnaryOutput +from connectrpc.unary import ClientStreamingOutput +if typing.TYPE_CHECKING: + if sys.version_info >= (3, 11): + from wsgiref.types import WSGIApplication + else: + from _typeshed.wsgi import WSGIApplication +from ...... import proto + +class ClusterServiceClient: + + def __init__(self, base_url: str, http_client: urllib3.PoolManager | None=None, protocol: ConnectProtocol=ConnectProtocol.CONNECT_PROTOBUF): + self.base_url = base_url + self._connect_client = ConnectClient(http_client, protocol) + + def call_list_kafka_connections(self, req: proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsResponse]: + """Low-level method to call ListKafkaConnections, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.ClusterService/ListKafkaConnections' + return self._connect_client.call_unary(url, req, proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsResponse, extra_headers, timeout_seconds) + + def list_kafka_connections(self, req: proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsResponse: + response = self.call_list_kafka_connections(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + +class AsyncClusterServiceClient: + + def __init__(self, base_url: str, http_client: aiohttp.ClientSession, protocol: ConnectProtocol=ConnectProtocol.CONNECT_PROTOBUF): + self.base_url = base_url + self._connect_client = AsyncConnectClient(http_client, protocol) + + async def call_list_kafka_connections(self, req: proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsResponse]: + """Low-level method to call ListKafkaConnections, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.ClusterService/ListKafkaConnections' + return await self._connect_client.call_unary(url, req, proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsResponse, extra_headers, timeout_seconds) + + async def list_kafka_connections(self, req: proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsResponse: + response = await self.call_list_kafka_connections(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + +@typing.runtime_checkable +class ClusterServiceProtocol(typing.Protocol): + + def list_kafka_connections(self, req: ClientRequest[proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsRequest]) -> ServerResponse[proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsResponse]: + ... +CLUSTER_SERVICE_PATH_PREFIX = '/redpanda.core.admin.v2.ClusterService' + +def wsgi_cluster_service(implementation: ClusterServiceProtocol) -> WSGIApplication: + app = ConnectWSGI() + app.register_unary_rpc('/redpanda.core.admin.v2.ClusterService/ListKafkaConnections', implementation.list_kafka_connections, proto.redpanda.core.admin.v2.cluster_pb2.ListKafkaConnectionsRequest) + return app \ No newline at end of file diff --git a/tests/rptest/clients/admin/v2.py b/tests/rptest/clients/admin/v2.py index f887aad887740..2f0bb14f2f3c9 100644 --- a/tests/rptest/clients/admin/v2.py +++ b/tests/rptest/clients/admin/v2.py @@ -8,15 +8,17 @@ from rptest.clients.admin.proto.redpanda.core.admin.v2 import ( broker_pb2, broker_pb2_connect, + cluster_pb2, + cluster_pb2_connect, kafka_connections_pb2, shadow_link_pb2, shadow_link_pb2_connect, ) from rptest.clients.admin.proto.redpanda.core.admin.v2.internal import ( - debug_pb2, - debug_pb2_connect, datalake_pb2, datalake_pb2_connect, + debug_pb2, + debug_pb2_connect, ) @@ -26,6 +28,7 @@ def started_nodes(self) -> list[ClusterNode]: ... # Re-export some protobufs for convenience broker_pb = broker_pb2 +cluster_pb = cluster_pb2 datalake_pb = datalake_pb2 shadow_link_pb = shadow_link_pb2 debug_pb = debug_pb2 @@ -94,6 +97,9 @@ def _make_service(self, service_clazz): def broker(self) -> broker_pb2_connect.BrokerServiceClient: return self._make_service(broker_pb2_connect.BrokerServiceClient) + def cluster(self) -> cluster_pb2_connect.ClusterServiceClient: + return self._make_service(cluster_pb2_connect.ClusterServiceClient) + def datalake(self) -> datalake_pb2_connect.DatalakeServiceClient: return self._make_service(datalake_pb2_connect.DatalakeServiceClient) diff --git a/tests/rptest/tests/list_kafka_connections_test.py b/tests/rptest/tests/list_kafka_connections_test.py index 3275b1a564819..1b9943edef874 100644 --- a/tests/rptest/tests/list_kafka_connections_test.py +++ b/tests/rptest/tests/list_kafka_connections_test.py @@ -12,7 +12,7 @@ from ducktape.tests.test import TestContext from rptest.clients.admin.v2 import Admin as AdminV2 -from rptest.clients.admin.v2 import broker_pb, kafka_connections_pb +from rptest.clients.admin.v2 import cluster_pb, kafka_connections_pb from rptest.clients.rpk import RpkTool from rptest.services.admin import Admin from rptest.services.cluster import cluster @@ -60,7 +60,7 @@ def setUp(self): self.super_rpk.create_topic(self.test_topic) - @cluster(num_nodes=2) + @cluster(num_nodes=4) def test_list_kafka_connections(self): """ Tests the AdminV2 list_connections endpoint by verifying active Kafka connections are correctly reported @@ -73,13 +73,12 @@ def test_list_kafka_connections(self): self.redpanda, auth=(self.superuser.username, self.superuser.password), ) - node_id = self.redpanda.node_id(self.redpanda.nodes[0]) - req = broker_pb.ListKafkaConnectionsRequest( - node_id=node_id, page_size=10, order_by="source.port desc" + req = cluster_pb.ListKafkaConnectionsRequest( + page_size=10, order_by="source.port desc" ) def valid_response() -> bool: - resp = admin_v2.broker().list_kafka_connections(req) + resp = admin_v2.cluster().list_kafka_connections(req) self.logger.info( f"ListKafkaConnectionsResponse: total_size={resp.total_size}, connections={len(resp.connections)}" @@ -98,7 +97,6 @@ def valid_response() -> bool: conn for conn in resp.connections if conn.group_id == self.test_group - and conn.node_id == node_id and conn.state == kafka_connections_pb.KAFKA_CONNECTION_STATE_OPEN and conn.open_time.ToDatetime() > datetime(year=2025, month=1, day=1) and len(conn.source.ip_address) > 0 @@ -125,7 +123,7 @@ def valid_response() -> bool: wait_until( valid_response, - timeout_sec=15, + timeout_sec=30, retry_on_exc=True, err_msg="Did not observe a valid ListKafkaConnectionsResponse", ) @@ -133,9 +131,8 @@ def valid_response() -> bool: self.logger.info( "Test the filtering integration by filtering for an unknown UUID, expect an empty response" ) - filtered_resp = admin_v2.broker().list_kafka_connections( - broker_pb.ListKafkaConnectionsRequest( - node_id=-1, + filtered_resp = admin_v2.cluster().list_kafka_connections( + cluster_pb.ListKafkaConnectionsRequest( filter='uid = "ba26cadd-90f6-4999-b2c9-a89b5f033507"', ) ) @@ -148,9 +145,8 @@ def valid_response() -> bool: self.logger.info( "Test that closed connections can also be included in the response" ) - closed_conns_resp = admin_v2.broker().list_kafka_connections( - broker_pb.ListKafkaConnectionsRequest( - node_id=-1, + closed_conns_resp = admin_v2.cluster().list_kafka_connections( + cluster_pb.ListKafkaConnectionsRequest( filter="state = KAFKA_CONNECTION_STATE_CLOSED", ) ) @@ -178,8 +174,8 @@ def setUp(self): @cluster(num_nodes=1) def test_without_license(self): admin = AdminV2(self.redpanda) - resp = admin.broker().call_list_kafka_connections( - broker_pb.ListKafkaConnectionsRequest(node_id=-1) + resp = admin.cluster().call_list_kafka_connections( + cluster_pb.ListKafkaConnectionsRequest() ) err = resp.error() assert err is not None, f"expected error response without license, got {err}" diff --git a/tools/type-checking/type-check-strictness.json b/tools/type-checking/type-check-strictness.json index 71f85fcfbab98..625394ccd9eae 100644 --- a/tools/type-checking/type-check-strictness.json +++ b/tools/type-checking/type-check-strictness.json @@ -13,6 +13,7 @@ "rptest/clients/admin/google/api/field_info_pb2.py", "rptest/clients/admin/google/api/resource_pb2.py", "rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2.py", + "rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2.py", "rptest/clients/admin/proto/redpanda/core/admin/v2/internal/datalake_pb2.py", "rptest/clients/admin/proto/redpanda/core/admin/v2/internal/debug_pb2.py", "rptest/clients/admin/proto/redpanda/core/admin/v2/kafka_connections_pb2.py", @@ -285,6 +286,7 @@ "off": [ "rptest/chaos_tests/single_fault_test.py", "rptest/clients/admin/proto/redpanda/core/admin/v2/broker_pb2_connect.py", + "rptest/clients/admin/proto/redpanda/core/admin/v2/cluster_pb2_connect.py", "rptest/clients/admin/proto/redpanda/core/admin/v2/internal/datalake_pb2_connect.py", "rptest/clients/admin/proto/redpanda/core/admin/v2/internal/debug_pb2_connect.py", "rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2_connect.py",