Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions proto/redpanda/core/admin/v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,72 @@ load("@rules_proto//proto:defs.bzl", "proto_library")
load("//bazel/pbgen:pbgen.bzl", "redpanda_proto_library")

proto_library(
name = "broker_proto",
name = "kafka_connections_proto",
srcs = [
"broker.proto",
"kafka_connections.proto",
],
visibility = ["//visibility:public"],
deps = [
"//proto/redpanda/core/common:common_proto",
"//proto/redpanda/core/pbgen:options_proto",
"//proto/redpanda/core/pbgen:rpc_proto",
"@googleapis//google/api:field_info_proto",
"@protobuf//:duration_proto",
"@protobuf//:timestamp_proto",
],
)

redpanda_proto_library(
name = "broker_redpanda_proto",
protos = [":broker_proto"],
name = "kafka_connections_redpanda_proto",
protos = [
":kafka_connections_proto",
],
visibility = ["//visibility:public"],
deps = [
"@abseil-cpp//absl/time:time",
],
)

proto_library(
name = "broker_proto",
srcs = ["broker.proto"],
visibility = ["//visibility:public"],
deps = [
"//proto/redpanda/core/pbgen:options_proto",
"//proto/redpanda/core/pbgen:rpc_proto",
],
)

redpanda_proto_library(
name = "broker_redpanda_proto",
protos = [
":broker_proto",
],
visibility = ["//visibility:public"],
deps = [],
)

proto_library(
name = "cluster_proto",
srcs = ["cluster.proto"],
visibility = ["//visibility:public"],
deps = [
":kafka_connections_proto",
"//proto/redpanda/core/pbgen:options_proto",
"//proto/redpanda/core/pbgen:rpc_proto",
],
)

redpanda_proto_library(
name = "cluster_redpanda_proto",
protos = [
":cluster_proto",
],
visibility = ["//visibility:public"],
deps = [
":kafka_connections_redpanda_proto",
],
)

proto_library(
name = "shadow_link_proto",
srcs = ["shadow_link.proto"],
Expand Down
80 changes: 0 additions & 80 deletions proto/redpanda/core/admin/v2/broker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package redpanda.core.admin.v2;

import "proto/redpanda/core/pbgen/options.proto";
import "proto/redpanda/core/pbgen/rpc.proto";
import "proto/redpanda/core/admin/v2/kafka_connections.proto";

option (redpanda.core.pbgen.cpp_namespace) = "proto::admin";

Expand All @@ -37,15 +36,6 @@ service BrokerService {
authz: SUPERUSER
};
}

// ListKafkaConnections returns information about the broker's Kafka
// connections.
rpc ListKafkaConnections(ListKafkaConnectionsRequest)
returns (ListKafkaConnectionsResponse) {
option (redpanda.core.pbgen.rpc) = {
authz: SUPERUSER
};
}
}

// GetBrokerRequest returns information about a single broker in the cluster
Expand All @@ -70,76 +60,6 @@ message ListBrokersResponse {
repeated Broker brokers = 1;
}

message ListKafkaConnectionsRequest {
// The node ID for the broker. If set to -1, the broker handling the RPC
// request returns information about itself.
int32 node_id = 1;

// The maximum number of connections to return. If unspecified or 0, a
// default value may be applied. Note that paging is currently not fully
// supported, and this field only acts as a limit for the first page of data
// returned. Subsequent pages of data cannot be requested.
int32 page_size = 2;

// Filter expression to apply to the connection list.
// Uses a subset of AIP-160 filter syntax supporting:
// - Field comparisons (`=`, `!=`, `<`, `>`, `<=`, `>=`)
// - Logical AND chaining: condition1 AND condition2
// - Nested field access: parent.child = value
// - Escape sequences: field = "string with \"quotes\""
// - Enum types
// - RFC3339 timestamps and ISO-like duration
//
// Limitations (not supported):
// - Logical OR chaining
// - Parentheses `(` `)` for grouping
// - Map and repeated types
// - HAS (:) operator
// - Negation (-, NOT)
// - Bare literal matching
//
// Example filters:
// - `state = KAFKA_CONNECTION_STATE_OPEN`
// - `idle_duration > 30s AND request_count_total > 100`
// - `authentication_info.user_principal = "my-producer"`
// - `recent_request_statistics.produce_bytes > 1000 AND
// client_software_name = "kgo"`
// - `open_time >= 2025-09-01T10:22:54Z`
//
// Reference: https://google.aip.dev/160
string filter = 3;

// Field-based ordering specification following AIP-132 syntax.
// Supports multiple fields with `asc`/`desc` direction indicators.
// Examples:
// - `idle_duration desc` - longest idle connections first
// - `open_time desc, total_request_statistics.request_count desc` - newest
// connections first, then most active
// - `recent_request_statistics.produce_bytes desc` - connections with
// highest current produce throughput first
//
// Reference: https://google.aip.dev/132#ordering
string order_by = 4;
}

message ListKafkaConnectionsResponse {
// The list of connections matching the request.
// Note that in addition to open connections, some recently-closed
// connections may also be included here. If you don't want to include
// closed connections, set the filter in the request to `state =
// KAFKA_CONNECTION_STATE_OPEN`.
repeated KafkaConnection connections = 1;

// Total number of connections matching the request.
// This may be greater than `len(connections)` if some connections were
// omitted from the response due to the specified (or default) `page_size`.
// Example:
// request.page_size = 10
// response.connections = [<10 items>]
// response.total_size = 13
uint64 total_size = 2;
}

// The resource for an individual broker within the Kafka Cluster.
message Broker {
// This broker's node ID.
Expand Down
105 changes: 105 additions & 0 deletions proto/redpanda/core/admin/v2/cluster.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2025 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package redpanda.core.admin.v2;

import "proto/redpanda/core/pbgen/options.proto";
import "proto/redpanda/core/pbgen/rpc.proto";
import "proto/redpanda/core/admin/v2/kafka_connections.proto";

option (redpanda.core.pbgen.cpp_namespace) = "proto::admin";

// The ClusterService gives information about the cluster.
service ClusterService {
// ListKafkaConnections returns information about the cluster's Kafka
// connections, collected and ordered across all brokers.
rpc ListKafkaConnections(ListKafkaConnectionsRequest)
returns (ListKafkaConnectionsResponse) {
option (redpanda.core.pbgen.rpc) = {
authz: SUPERUSER
};
}
}

// ListKafkaConnectionsRequest return information about the broker's Kafka
// connections.
message ListKafkaConnectionsRequest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rockwotj should all messages also have an overall comment? I realize this one is probably self explanatory, but just asking

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we want all protobuf to have documentation even if it seems self-explanatory (because it's probably not for users).

Please follow the guide Kat put together: https://redpandadata.slack.com/archives/C09C27KTCPK/p1759766098707339

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment here now as part of the first commit.

// The maximum number of connections to return. If unspecified or 0, a
// default value may be applied. Note that paging is currently not fully
// supported, and this field only acts as a limit for the first page of data
// returned. Subsequent pages of data cannot be requested.
int32 page_size = 1;

// Filter expression to apply to the connection list.
// Uses a subset of AIP-160 filter syntax supporting:
// - Field comparisons (`=`, `!=`, `<`, `>`, `<=`, `>=`)
// - Logical AND chaining: condition1 AND condition2
// - Nested field access: parent.child = value
// - Escape sequences: field = "string with \"quotes\""
// - Enum types
// - RFC3339 timestamps and ISO-like duration
//
// Limitations (not supported):
// - Logical OR chaining
// - Parentheses `(` `)` for grouping
// - Map and repeated types
// - HAS (:) operator
// - Negation (-, NOT)
// - Bare literal matching
//
// Example filters:
// - `state = KAFKA_CONNECTION_STATE_OPEN`
// - `idle_duration > 30s AND total_request_statistics.request_count > 100`
// - `authentication_info.user_principal = "my-producer"`
// - `recent_request_statistics.produce_bytes > 1000 AND
// client_software_name = "kgo"`
// - `open_time >= 2025-09-01T10:22:54Z`
//
// Reference: https://google.aip.dev/160
string filter = 2;

// Field-based ordering specification following AIP-132 syntax.
// Supports multiple fields with `asc`/`desc` direction indicators.
// Examples:
// - `idle_duration desc` - longest idle connections first
// - `open_time desc, total_request_statistics.request_count desc` - newest
// connections first, then most active
// - `recent_request_statistics.produce_bytes desc` - connections with
// highest current produce throughput first
//
// Reference: https://google.aip.dev/132#ordering
string order_by = 3;
}

// ListKafkaConnectionsResponse is the response from the ListKafkaConnections
// RPC.
message ListKafkaConnectionsResponse {
// The list of connections matching the request.
// Note that in addition to open connections, some recently-closed
// connections may also be included here. If you don't want to include
// closed connections, set the filter in the request to `state =
// KAFKA_CONNECTION_STATE_OPEN`.
repeated KafkaConnection connections = 1;

// Total number of connections matching the request.
// This may be greater than `len(connections)` if some connections were
// omitted from the response due to the specified (or default) `page_size`.
// Example:
// request.page_size = 10
// response.connections = [<10 items>]
// response.total_size = 13
uint64 total_size = 2;
}
2 changes: 1 addition & 1 deletion src/v/kafka/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ redpanda_cc_library(
":qdc_monitor_config",
":security",
":topic_config_utils",
"//proto/redpanda/core/admin/v2:broker_redpanda_proto",
"//proto/redpanda/core/admin/v2:kafka_connections_redpanda_proto",
"//src/v/base",
"//src/v/bytes",
"//src/v/bytes:iobuf",
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ redpanda_cc_library(
"//src/v/pandaproxy/rest:server",
"//src/v/raft",
"//src/v/redpanda/admin",
"//src/v/redpanda/admin:kafka_connections_service",
"//src/v/redpanda/admin/proxy:service",
"//src/v/redpanda/admin/proxy:client",
"//src/v/redpanda/admin/services/shadow_link",
"//src/v/redpanda/admin/services/datalake",
"//src/v/redpanda/admin/services/internal:debug",
"//src/v/redpanda/admin/services:cluster",
"//src/v/resource_mgmt:cpu_profiler",
"//src/v/resource_mgmt:cpu_scheduling",
"//src/v/resource_mgmt:memory_groups",
Expand Down
25 changes: 25 additions & 0 deletions src/v/redpanda/admin/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,30 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "kafka_connections_service",
srcs = [
"kafka_connections_service.cc",
],
hdrs = [
"kafka_connections_service.h",
],
visibility = ["//visibility:public"],
deps = [
":aip_filter",
":aip_ordering",
"//proto/redpanda/core/admin/v2:cluster_redpanda_proto",
"//proto/redpanda/core/admin/v2:kafka_connections_redpanda_proto",
"//src/v/base",
"//src/v/container:priority_queue",
"//src/v/kafka/server",
"//src/v/redpanda/admin/proxy:client",
"//src/v/serde/protobuf:rpc",
"//src/v/ssx:async_algorithm",
"@seastar",
],
)

redpanda_cc_library(
name = "admin",
srcs = [
Expand Down Expand Up @@ -258,6 +282,7 @@ redpanda_cc_library(
":debug_bundle",
":features",
":hbadger",
":kafka_connections_service",
":migration",
":partition",
":raft",
Expand Down
Loading