CORE-14251 Client Monitoring: Implement a cluster-wide endpoint for client connections#28137
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR implements a cluster-wide endpoint for listing Kafka connections by introducing a new ClusterService that aggregates connection data from all brokers. The implementation refactors existing connection gathering logic into a reusable service (kafka_connections_service) and adds new cluster-level protobuf definitions.
Key Changes
- Introduced
kafka_connections_serviceto centralize connection gathering logic previously embedded inbroker_service_impl - Added new
cluster_service_implwithListKafkaConnectionsendpoint for cluster-wide aggregation - Refactored protobuf package structure to use namespaces (
broker_service,cluster_service) for better organization
Reviewed Changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
proto/redpanda/core/admin/v2/cluster.proto |
New protobuf definition for ClusterService with ListKafkaConnections RPC |
proto/redpanda/core/admin/v2/broker.proto |
Updated package namespace and fixed comment example |
proto/redpanda/core/admin/v2/BUILD |
Restructured build to separate kafka_connections as independent proto library |
src/v/redpanda/admin/kafka_connections_service.{h,cc} |
New service extracting connection gathering logic from broker_service |
src/v/redpanda/admin/services/cluster.{h,cc} |
New cluster service implementing cluster-wide connection aggregation |
src/v/redpanda/admin/services/broker.cc |
Refactored to delegate connection gathering to kafka_connections_service |
src/v/redpanda/admin/services/utils.{h,cc} |
Extracted shared license checking logic |
src/v/redpanda/application.{h,cc} |
Wired up kafka_connections_service and cluster_service |
tests/rptest/tests/list_kafka_connections_test.py |
Added test for cluster-wide endpoint |
| Generated Python stubs | Auto-generated protobuf and connect client code |
| syntax = "proto3"; | ||
|
|
||
| package redpanda.core.admin.v2; | ||
| package redpanda.core.admin.v2.broker_service; |
There was a problem hiding this comment.
This should match the directory and version number is supposed to go last.
There was a problem hiding this comment.
I opted to back out of the namespacing and use an optional node_id field on the request instead, as discussed.
| } | ||
| } | ||
|
|
||
| message ListKafkaConnectionsRequest { |
There was a problem hiding this comment.
@rockwotj should all messages also have an overall comment? I realize this one is probably self explanatory, but just asking
There was a problem hiding this comment.
Yeah, we want all protobuf to have documentation even if it seems self-explanatory (because it's probably not for users).
Please follow the guide Kat put together: https://redpandadata.slack.com/archives/C09C27KTCPK/p1759766098707339
There was a problem hiding this comment.
I've added a comment here now as part of the first commit.
ffef9fe to
db607a0
Compare
db607a0 to
3fc8095
Compare
|
force-push: |
3fc8095 to
e16d32e
Compare
|
force-push: address copilot code review |
| auto global_collector = ss::make_shared<unordered_collector>(limit); | ||
|
|
||
| auto make_local_collector = [limit](size_t accumulated_count) { | ||
| return ss::make_shared<unordered_collector>( | ||
| limit - accumulated_count); |
There was a problem hiding this comment.
It's not obvious to me why these are shared_ptr; is it to avoid taking/capturing references in a coroutine?
There was a problem hiding this comment.
As discussed, it's to be able to pass around a pointer to a polymorphic connection_collector. I've replaced the ss::make_shared with a std::unique_ptr though as it doesn't need to be shared.
| // If specified, the results are returned only for the broker with the | ||
| // specified node ID. If set to -1, the broker handling the RPC request | ||
| // returns information about itself. | ||
| optional int32 node_id = 1; |
There was a problem hiding this comment.
I prefer this optional as unification of broker/cluster as to what was here previously.
Since I see scope for this being a common pattern (map brokers, reduce) pattern, should there be something even more explicit and used by other endpoints? @rockwotj
There was a problem hiding this comment.
should there be something even more explicit
What do you have in mind in terms of something more explicit?
There was a problem hiding this comment.
I would be up for making the requests use optional instead of -1, however here optional unset means that we want all brokers. TBH, I would rather remove this, rpk and console will always want a cluster wide view and I think we can optimize the filter to target specific nodes later on.
There was a problem hiding this comment.
I don't expect to have many broker service endpoints, we should strongly be pushing for cluster wide endpoints.
There was a problem hiding this comment.
should there be something even more explicit
What do you have in mind in terms of something more explicit?
I'm not really familiar with best practice here, but to me, sentinel values like -1, even with documentation, are just awkward. Perhaps some kind of "node_list OR cluster".
There was a problem hiding this comment.
I've moved to removing the node_id field now. As discussed with @rockwotj, and as the TODO comment I added states, we can optimize later by parsing the filter string and not querying other brokers if we safely determine that that is unnecessary. We can rely on ctx.is_proxied() to implement the scatter-gather logic.
| auto other_node_clients | ||
| = _proxy_client | ||
| .make_clients_for_other_nodes<proto::admin::cluster_service_client>(); | ||
| for (auto& [node_id, client] : other_node_clients) { | ||
| auto client_resp = co_await client.list_kafka_connections( | ||
| ctx, make_broker_req(node_id)); | ||
|
|
||
| co_await add_to_response(std::move(client_resp)); | ||
| } | ||
|
|
||
| auto local_resp = co_await _kafka_connections_service.local() | ||
| .list_kafka_connections_local(make_broker_req(-1)); | ||
| co_await add_to_response(std::move(local_resp)); |
There was a problem hiding this comment.
If there is an unordered limit, does it make sense to gather locally first?
There was a problem hiding this comment.
We would still need to send a request to other nodes to collect the total count, but we could optimize by reducing the limit sent to other nodes. I haven't implemented that yet but we can keep that in mind as a follow up optimization.
|
|
||
| namespace admin { | ||
|
|
||
| struct connection_collector { |
There was a problem hiding this comment.
Can't these go into the implementation file instead of the header?
There was a problem hiding this comment.
Yes, I've reworked this now
There was a problem hiding this comment.
extract_unordered() may be unordered even for the unordered_collector
You mean ordered_collector?
| // If specified, the results are returned only for the broker with the | ||
| // specified node ID. If set to -1, the broker handling the RPC request | ||
| // returns information about itself. | ||
| optional int32 node_id = 1; |
There was a problem hiding this comment.
I would be up for making the requests use optional instead of -1, however here optional unset means that we want all brokers. TBH, I would rather remove this, rpk and console will always want a cluster wide view and I think we can optimize the filter to target specific nodes later on.
| // If specified, the results are returned only for the broker with the | ||
| // specified node ID. If set to -1, the broker handling the RPC request | ||
| // returns information about itself. | ||
| optional int32 node_id = 1; |
There was a problem hiding this comment.
I don't expect to have many broker service endpoints, we should strongly be pushing for cluster wide endpoints.
|
force-push: some more minor improvements based on feedback + self-review |
| wait_until( | ||
| valid_response, | ||
| timeout_sec=15, | ||
| timeout_sec=30, |
There was a problem hiding this comment.
little concerning we couldn't do all this within 15 seconds...
BenPope
left a comment
There was a problem hiding this comment.
Very nice. No need to address the nitpick, it was more a PSA.
| auto insert_range = std::ranges::subrange( | ||
| conns.begin(), conns.begin() + to_add_count); |
There was a problem hiding this comment.
nitpick: As of #28125 you can do:
| auto insert_range = std::ranges::subrange( | |
| conns.begin(), conns.begin() + to_add_count); | |
| auto insert_range = conns | std::views::take(to_add_count); |
Retry command for Build#74704please wait until all jobs are finished before running the slash command |
|
/ci-repeat 1 |
- fix request_count_total field name in doc comment - add doc comment to request-response messages - extract the kafka_connections proto into a separate library for easier sharing
To encapsulate the logic of konnection gathering. The code is only moved, the only changes are extracting a `get_effective_limit` function for later reuse, and some minor fixups to make the code compile (e.g.; type name adjustments due to moving across namespaces).
Rename extract --> extract_unordered() Rename ordered_collector::extract_sorted --> extract() Define unordered_collector::extract --> same as extract_unordered() extract() now means extract as per the oredered'ness of the collector, while extract_unordered() may be unordered even for the ordered_collector. This enabled more code reuse across the ordered and unordered paths, and will also allow more code reuse in the cluster-level endpoint.
This is to allow using the faster `async_push_range` function.
Extract it to a utils file to allow sharing it with other admin endpoints as well later.
Move the endpoint from the BrokerService to the ClusterService, as we intend to make this endpoint return cluster-wide data to simplify rpk's logic of surfacing a cluster-wide view of "top" kafka connections. As part of this, the node_id field on the request is removed. The endpoint is always going to return cluster-wide data, and internal gathering is going to be implemented using `ctx.is_proxied()` in the next comimt. The is purely a code-shifting commit. The collection of cluster-wide data is implemented in the next commit.
This endpoint queries all of the brokers one-by-one and uses the defined ordering to combine the results. The ducktape test is made multi-node now that connections are gathered across the whole cluster. The timeout is bumped to 30s because I saw some flakiness at the 15s timeout (~1/10 failures).
54581e6 to
743657b
Compare
|
force-push: rebase to dev to fix merge conflict on |
This merge conflict seems strange to me, since the commit didn't go near it, as far as I can tell. |
|
It did before. That's a generated file, and it was stale on dev because someone must have forgotten to run |
Retry command for Build#74775please wait until all jobs are finished before running the slash command |
|
/ci-repeat 1 |
Retry command for Build#74778please wait until all jobs are finished before running the slash command |
|
/test-release-pipeline |
This endpoint is to provide a cluster-wide aggregated view of the
per-broker ListKafkaConnection endpoint. This is to simplify rpk's logic
for surfacing a cluster-wide view of "top" kafka connections.
See the commits for implementation details.
Fixes https://redpandadata.atlassian.net/browse/CORE-14251
Backports Required
Release Notes