-
Notifications
You must be signed in to change notification settings - Fork 726
CORE-14251 Client Monitoring: Implement a cluster-wide endpoint for client connections #28137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
michael-redpanda
merged 7 commits into
redpanda-data:dev
from
pgellert:cm/cluster-wide-view
Oct 23, 2025
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
1cd7080
proto: minor improvements to ListKafkaConnections
pgellert 9e85ce3
admin: extract kafka_connections_service
pgellert 8c4c142
admin: refactor connection_collector interface
pgellert e68ccd9
admin: use optimized add_all method
pgellert fbd9b1c
admin: extract license utils
pgellert c2d3bd2
admin: ListKafkaConnections as ClusterService rpc
pgellert 743657b
admin: implement cluster-wide collection of ListKafkaConnections
pgellert File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| // Copyright 2025 Redpanda Data, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| syntax = "proto3"; | ||
|
|
||
| package redpanda.core.admin.v2; | ||
|
|
||
| import "proto/redpanda/core/pbgen/options.proto"; | ||
| import "proto/redpanda/core/pbgen/rpc.proto"; | ||
| import "proto/redpanda/core/admin/v2/kafka_connections.proto"; | ||
|
|
||
| option (redpanda.core.pbgen.cpp_namespace) = "proto::admin"; | ||
|
|
||
| // The ClusterService gives information about the cluster. | ||
| service ClusterService { | ||
| // ListKafkaConnections returns information about the cluster's Kafka | ||
| // connections, collected and ordered across all brokers. | ||
| rpc ListKafkaConnections(ListKafkaConnectionsRequest) | ||
| returns (ListKafkaConnectionsResponse) { | ||
| option (redpanda.core.pbgen.rpc) = { | ||
| authz: SUPERUSER | ||
| }; | ||
| } | ||
| } | ||
|
|
||
| // ListKafkaConnectionsRequest return information about the broker's Kafka | ||
| // connections. | ||
| message ListKafkaConnectionsRequest { | ||
| // The maximum number of connections to return. If unspecified or 0, a | ||
| // default value may be applied. Note that paging is currently not fully | ||
| // supported, and this field only acts as a limit for the first page of data | ||
| // returned. Subsequent pages of data cannot be requested. | ||
| int32 page_size = 1; | ||
|
|
||
| // Filter expression to apply to the connection list. | ||
| // Uses a subset of AIP-160 filter syntax supporting: | ||
| // - Field comparisons (`=`, `!=`, `<`, `>`, `<=`, `>=`) | ||
| // - Logical AND chaining: condition1 AND condition2 | ||
| // - Nested field access: parent.child = value | ||
| // - Escape sequences: field = "string with \"quotes\"" | ||
| // - Enum types | ||
| // - RFC3339 timestamps and ISO-like duration | ||
| // | ||
| // Limitations (not supported): | ||
| // - Logical OR chaining | ||
| // - Parentheses `(` `)` for grouping | ||
| // - Map and repeated types | ||
| // - HAS (:) operator | ||
| // - Negation (-, NOT) | ||
| // - Bare literal matching | ||
| // | ||
| // Example filters: | ||
| // - `state = KAFKA_CONNECTION_STATE_OPEN` | ||
| // - `idle_duration > 30s AND total_request_statistics.request_count > 100` | ||
| // - `authentication_info.user_principal = "my-producer"` | ||
| // - `recent_request_statistics.produce_bytes > 1000 AND | ||
| // client_software_name = "kgo"` | ||
| // - `open_time >= 2025-09-01T10:22:54Z` | ||
| // | ||
| // Reference: https://google.aip.dev/160 | ||
| string filter = 2; | ||
|
|
||
| // Field-based ordering specification following AIP-132 syntax. | ||
| // Supports multiple fields with `asc`/`desc` direction indicators. | ||
| // Examples: | ||
| // - `idle_duration desc` - longest idle connections first | ||
| // - `open_time desc, total_request_statistics.request_count desc` - newest | ||
| // connections first, then most active | ||
| // - `recent_request_statistics.produce_bytes desc` - connections with | ||
| // highest current produce throughput first | ||
| // | ||
| // Reference: https://google.aip.dev/132#ordering | ||
| string order_by = 3; | ||
| } | ||
|
|
||
| // ListKafkaConnectionsResponse is the response from the ListKafkaConnections | ||
| // RPC. | ||
| message ListKafkaConnectionsResponse { | ||
| // The list of connections matching the request. | ||
| // Note that in addition to open connections, some recently-closed | ||
| // connections may also be included here. If you don't want to include | ||
| // closed connections, set the filter in the request to `state = | ||
| // KAFKA_CONNECTION_STATE_OPEN`. | ||
| repeated KafkaConnection connections = 1; | ||
|
|
||
| // Total number of connections matching the request. | ||
| // This may be greater than `len(connections)` if some connections were | ||
| // omitted from the response due to the specified (or default) `page_size`. | ||
| // Example: | ||
| // request.page_size = 10 | ||
| // response.connections = [<10 items>] | ||
| // response.total_size = 13 | ||
| uint64 total_size = 2; | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rockwotj should all messages also have an overall comment? I realize this one is probably self explanatory, but just asking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we want all protobuf to have documentation even if it seems self-explanatory (because it's probably not for users).
Please follow the guide Kat put together: https://redpandadata.slack.com/archives/C09C27KTCPK/p1759766098707339
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a comment here now as part of the first commit.