Skip to content

Commit 2f6a679

Browse files
graham-rppgellert
andcommitted
Add cluster connections list
This adds a basic `cluster connections list` command that uses the adminv2 API to list open and recently closed client connections inside the cluster. Co-authored-by: Gellért Peresztegi-Nagy <pereszteginagy.gellert@gmail.com>
1 parent 92d4df4 commit 2f6a679

10 files changed

Lines changed: 885 additions & 9 deletions

File tree

src/go/rpk/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
buf.build/gen/go/redpandadata/cloud/connectrpc/go v1.18.1-20250806153840-184e270a51f5.1
1010
buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.36.7-20250806153840-184e270a51f5.1
1111
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.36.7-20250718021421-04f2daa29ad9.1
12-
buf.build/gen/go/redpandadata/core/protocolbuffers/go v1.36.10-20251021160320-d44d783528c8.1
12+
buf.build/gen/go/redpandadata/core/protocolbuffers/go v1.36.10-20251023115415-1f602d02339b.1
1313
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.18.1-20250819145731-621dc775ffe4.1
1414
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.36.7-20250819145731-621dc775ffe4.1
1515
buf.build/gen/go/redpandadata/gatekeeper/connectrpc/go v1.18.1-20250813192242-efcc93bcd794.1
@@ -47,7 +47,7 @@ require (
4747
github.com/prometheus/client_model v0.6.2
4848
github.com/prometheus/common v0.65.0
4949
github.com/redpanda-data/common-go/proto v0.0.0-20250820120127-9b518fca5ecf
50-
github.com/redpanda-data/common-go/rpadmin v0.1.17-0.20251020144719-b6a2aa280c0b
50+
github.com/redpanda-data/common-go/rpadmin v0.1.17-0.20251023170853-0d063f822858
5151
github.com/redpanda-data/common-go/rpsr v0.1.2
5252
github.com/redpanda-data/protoc-gen-go-mcp v0.0.0-20250812151819-7e5d5fef8241
5353
github.com/rs/xid v1.6.0
@@ -85,7 +85,7 @@ require (
8585
require (
8686
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.7-20250613105001-9f2d3c737feb.1 // indirect
8787
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.36.7-20240617172850-a48fcebcf8f1.1 // indirect
88-
buf.build/gen/go/redpandadata/core/connectrpc/go v1.19.1-20251021160320-d44d783528c8.2 // indirect
88+
buf.build/gen/go/redpandadata/core/connectrpc/go v1.19.1-20251023115415-1f602d02339b.2 // indirect
8989
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
9090
github.com/BurntSushi/toml v1.1.0 // indirect
9191
github.com/Microsoft/go-winio v0.4.14 // indirect

src/go/rpk/go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.36.7-20250806153840-18
88
buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.36.7-20250806153840-184e270a51f5.1/go.mod h1:J6+tKG3WghdUGXXD5lYp/G15vFpzMRvAsbF6+L+YZBQ=
99
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.36.7-20250718021421-04f2daa29ad9.1 h1:mE9lPhqy3ghMKY6fAot31NySa8aI22sUjKbJuMlmDGA=
1010
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.36.7-20250718021421-04f2daa29ad9.1/go.mod h1:pUNfu5CqCYitRtXrJ69rHyB7SXdTkDuqqnNwQYI2H1Q=
11-
buf.build/gen/go/redpandadata/core/connectrpc/go v1.19.1-20251021160320-d44d783528c8.2 h1:1Lt/PN3/ZJ9TiGIfmoQI1r1K2l3UQmHGpQ3pgxuwDxc=
12-
buf.build/gen/go/redpandadata/core/connectrpc/go v1.19.1-20251021160320-d44d783528c8.2/go.mod h1:3kUjX0N7isNP+EIkTWWht2ATgy8f4/+0feYvn65UgCs=
13-
buf.build/gen/go/redpandadata/core/protocolbuffers/go v1.36.10-20251021160320-d44d783528c8.1 h1:Ynt6p41TwmjUyG7H3jgXKoS+g7cC7ZED/zkOpzOgr/Y=
14-
buf.build/gen/go/redpandadata/core/protocolbuffers/go v1.36.10-20251021160320-d44d783528c8.1/go.mod h1:QenSPzqxZpyo9hHIpRzTetvDchelVDzimnmaggHKenc=
11+
buf.build/gen/go/redpandadata/core/connectrpc/go v1.19.1-20251023115415-1f602d02339b.2 h1:mvC5Q7Fz56di8es2aftS5/WskFyNs2gBC9kO6LyHxQo=
12+
buf.build/gen/go/redpandadata/core/connectrpc/go v1.19.1-20251023115415-1f602d02339b.2/go.mod h1:8RVo+ufXGClw5quRMTfAj0N3mTB2lKk+CJzlYc7QO0o=
13+
buf.build/gen/go/redpandadata/core/protocolbuffers/go v1.36.10-20251023115415-1f602d02339b.1 h1:faNTh8GShHh+aoonkwH9fISnvHxcbuPflygpqdWXu9w=
14+
buf.build/gen/go/redpandadata/core/protocolbuffers/go v1.36.10-20251023115415-1f602d02339b.1/go.mod h1:QenSPzqxZpyo9hHIpRzTetvDchelVDzimnmaggHKenc=
1515
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.18.1-20250819145731-621dc775ffe4.1 h1:CiTv2Rme+0H/2IQpyZFK8SQYRWgRWEvFNxZIfDMYOQQ=
1616
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.18.1-20250819145731-621dc775ffe4.1/go.mod h1:HJnvbiwx2qZN8HcD30Dijm5Aamjcrk/8/ffTOFrF5Go=
1717
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.36.7-20250819145731-621dc775ffe4.1 h1:31r1Sv/BoVyGzzBfBHcUcyQJz/+4i/fr8wc7gQVihEM=
@@ -251,8 +251,8 @@ github.com/redpanda-data/common-go/net v0.1.0 h1:JnJioRJuL961r1QXiJQ1tW9+yEaJfu8
251251
github.com/redpanda-data/common-go/net v0.1.0/go.mod h1:iOdNkjxM7a1T8F3cYHTaKIPFCHzzp/ia6TN+Z+7Tt5w=
252252
github.com/redpanda-data/common-go/proto v0.0.0-20250820120127-9b518fca5ecf h1:Oe0Sc3+/37Xgdpze7klkWyvB6eAt/nuUhrn93Rd4ESA=
253253
github.com/redpanda-data/common-go/proto v0.0.0-20250820120127-9b518fca5ecf/go.mod h1:GMoRcdMz6CIpr+8vKrSJvr8fJDlojETRV45BD6A0DvU=
254-
github.com/redpanda-data/common-go/rpadmin v0.1.17-0.20251020144719-b6a2aa280c0b h1:1IYchooFCXdUog5fu+ytG/Rnk0/B9amqMhGvs+1mxZw=
255-
github.com/redpanda-data/common-go/rpadmin v0.1.17-0.20251020144719-b6a2aa280c0b/go.mod h1:cmQgsLSFmrrt/9Ztu+zKz8+t+iGTvWVyyBz9nKiimts=
254+
github.com/redpanda-data/common-go/rpadmin v0.1.17-0.20251023170853-0d063f822858 h1:GIuEsarF54b8bN/pefXP8b39uI0hVW/EuZE+nM0Wi18=
255+
github.com/redpanda-data/common-go/rpadmin v0.1.17-0.20251023170853-0d063f822858/go.mod h1:9d2Rtwy5jz26ejcgVi3Dla4vjsjX4OHVYSbRzuKt+Qw=
256256
github.com/redpanda-data/common-go/rpsr v0.1.2 h1:DThUeyfBH8fkL9WoP1sEbRhT2NVV22zmsTpcCtzfusQ=
257257
github.com/redpanda-data/common-go/rpsr v0.1.2/go.mod h1:2j2416onosg5FKaKz52NooRE+q/9EJqQn0kyTcTXWHc=
258258
github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525 h1:vskZrV6q8W8flL0Ud23AJUYAd8ZgTadO45+loFnG2G0=

src/go/rpk/pkg/cli/cluster/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
deps = [
1414
"//src/go/rpk/pkg/adminapi",
1515
"//src/go/rpk/pkg/cli/cluster/config",
16+
"//src/go/rpk/pkg/cli/cluster/connections",
1617
"//src/go/rpk/pkg/cli/cluster/license",
1718
"//src/go/rpk/pkg/cli/cluster/maintenance",
1819
"//src/go/rpk/pkg/cli/cluster/partitions",

src/go/rpk/pkg/cli/cluster/cluster.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package cluster
1111

1212
import (
1313
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/config"
14+
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/connections"
1415
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/license"
1516
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/maintenance"
1617
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/partitions"
@@ -49,6 +50,7 @@ func NewCommand(fs afero.Fs, p *pkgconfig.Params) *cobra.Command {
4950
storage.NewCommand(fs, p),
5051
txn.NewCommand(fs, p),
5152
quotas.NewCommand(fs, p),
53+
connections.NewCommand(fs, p),
5254
offsets,
5355
)
5456

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "connections",
5+
srcs = [
6+
"connections.go",
7+
"list.go",
8+
],
9+
importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/connections",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//src/go/rpk/pkg/adminapi",
13+
"//src/go/rpk/pkg/config",
14+
"//src/go/rpk/pkg/out",
15+
"@build_buf_gen_go_redpandadata_core_protocolbuffers_go//redpanda/core/admin/v2:admin",
16+
"@com_connectrpc_connect//:connect",
17+
"@com_github_docker_go_units//:go-units",
18+
"@com_github_spf13_afero//:afero",
19+
"@com_github_spf13_cobra//:cobra",
20+
],
21+
)
22+
23+
go_test(
24+
name = "connections_test",
25+
srcs = ["list_test.go"],
26+
embed = [":connections"],
27+
deps = [
28+
"@build_buf_gen_go_redpandadata_core_protocolbuffers_go//redpanda/core/admin/v2:admin",
29+
"@com_github_stretchr_testify//require",
30+
"@org_golang_google_protobuf//types/known/durationpb",
31+
"@org_golang_google_protobuf//types/known/timestamppb",
32+
],
33+
)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
// Package connections deals with listing current connections in the cluster
11+
package connections
12+
13+
import (
14+
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
15+
"github.com/spf13/afero"
16+
"github.com/spf13/cobra"
17+
)
18+
19+
func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command {
20+
cmd := &cobra.Command{
21+
Use: "connections",
22+
Aliases: []string{"connection"},
23+
Short: "Manage and monitor cluster connections",
24+
Long: `Manage and monitor cluster connections.`,
25+
}
26+
27+
cmd.AddCommand(newConnectionList(fs, p))
28+
return cmd
29+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
package connections
11+
12+
import (
13+
"testing"
14+
"time"
15+
16+
adminv2 "buf.build/gen/go/redpandadata/core/protocolbuffers/go/redpanda/core/admin/v2"
17+
18+
"github.com/stretchr/testify/require"
19+
"google.golang.org/protobuf/types/known/durationpb"
20+
"google.golang.org/protobuf/types/known/timestamppb"
21+
)
22+
23+
func TestParseConnection(t *testing.T) {
24+
protoConn := &adminv2.KafkaConnection{
25+
NodeId: 2,
26+
ShardId: 4,
27+
Uid: "36338ca5-86b7-4478-ad23-32d49cfaef61",
28+
State: adminv2.KafkaConnectionState_KAFKA_CONNECTION_STATE_OPEN,
29+
OpenTime: timestamppb.New(time.Date(2025, 10, 23, 1, 2, 3, 0, time.UTC)),
30+
CloseTime: nil,
31+
AuthenticationInfo: &adminv2.AuthenticationInfo{
32+
State: adminv2.AuthenticationState_AUTHENTICATION_STATE_SUCCESS,
33+
Mechanism: adminv2.AuthenticationMechanism_AUTHENTICATION_MECHANISM_MTLS,
34+
UserPrincipal: "someone",
35+
},
36+
TlsInfo: &adminv2.TLSInfo{
37+
Enabled: true,
38+
},
39+
ListenerName: "external",
40+
Source: &adminv2.Source{
41+
IpAddress: "4.2.2.1",
42+
Port: 49722,
43+
},
44+
ClientId: "a-unique-client-id",
45+
ClientSoftwareName: "some-library",
46+
ClientSoftwareVersion: "v0.0.1",
47+
GroupId: "group-a",
48+
ApiVersions: map[int32]int32{0: 4, 9: 11},
49+
IdleDuration: durationpb.New(100 * time.Millisecond),
50+
InFlightRequests: &adminv2.InFlightRequests{
51+
SampledInFlightRequests: []*adminv2.InFlightRequests_Request{
52+
{
53+
ApiKey: 0, // PRODUCE
54+
InFlightDuration: durationpb.New(40 * time.Millisecond),
55+
},
56+
},
57+
HasMoreRequests: true,
58+
},
59+
TotalRequestStatistics: &adminv2.RequestStatistics{
60+
ProduceBytes: 10000,
61+
FetchBytes: 2000,
62+
RequestCount: 200,
63+
ProduceBatchCount: 10,
64+
},
65+
RecentRequestStatistics: &adminv2.RequestStatistics{
66+
ProduceBytes: 1000,
67+
FetchBytes: 200,
68+
RequestCount: 20,
69+
ProduceBatchCount: 1,
70+
},
71+
}
72+
73+
out := parseConnection(protoConn)
74+
75+
// Verify basic fields
76+
require.Equal(t, int32(2), out.NodeID)
77+
require.Equal(t, uint32(4), out.ShardID)
78+
require.Equal(t, "36338ca5-86b7-4478-ad23-32d49cfaef61", out.UID)
79+
require.Equal(t, "OPEN", out.State)
80+
require.Equal(t, "2025-10-23T01:02:03Z", out.OpenTime)
81+
require.Nil(t, out.CloseTime)
82+
require.NotEmpty(t, out.ConnectionDuration)
83+
require.True(t, out.TLSEnabled)
84+
require.Equal(t, "group-a", out.GroupID)
85+
require.Equal(t, "100ms", out.IdleDuration)
86+
87+
// Verify authentication
88+
require.Equal(t, &Authentication{
89+
State: "SUCCESS",
90+
Mechanism: "MTLS",
91+
UserPrincipal: "someone",
92+
}, out.Authentication)
93+
94+
// Verify client
95+
require.Equal(t, &Client{
96+
IP: "4.2.2.1",
97+
Port: 49722,
98+
ID: "a-unique-client-id",
99+
SoftwareName: "some-library",
100+
SoftwareVersion: "v0.0.1",
101+
}, out.Client)
102+
103+
// Verify API versions (order-independent check)
104+
require.ElementsMatch(t, []APIVersion{
105+
{API: "PRODUCE", Version: 4},
106+
{API: "OFFSET_FETCH", Version: 11},
107+
}, out.APIVersions)
108+
109+
// Verify active requests
110+
require.Equal(t, &ActiveRequests{
111+
SampledRequests: []SampledRequest{
112+
{API: "PRODUCE", Duration: "40ms"},
113+
},
114+
HasMoreRequests: true,
115+
}, out.ActiveRequests)
116+
117+
// Verify statistics
118+
require.Equal(t, &RequestStatistics{
119+
ProduceBytes: "10000",
120+
FetchBytes: "2000",
121+
RequestCount: "200",
122+
ProduceBatchCount: "10",
123+
}, out.RequestStatisticsAll)
124+
125+
require.Equal(t, &RequestStatistics{
126+
ProduceBytes: "1000",
127+
FetchBytes: "200",
128+
RequestCount: "20",
129+
ProduceBatchCount: "1",
130+
}, out.RequestStatistics1m)
131+
}

0 commit comments

Comments
 (0)