Skip to content

Commit 0e2b837

Browse files
authored
Merge pull request #27647 from alextreichler/feat/add-group-describe-formatter
[UX-508] added json/yaml formatting to rpk group describe command
2 parents a38c665 + c0846b8 commit 0e2b837

1 file changed

Lines changed: 144 additions & 1 deletion

File tree

src/go/rpk/pkg/cli/group/describe.go

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ Describe any one-character group:
5757
`,
5858
Args: cobra.MinimumNArgs(1),
5959
Run: func(cmd *cobra.Command, groups []string) {
60+
f := p.Formatter
61+
if h, ok := f.Help(groupDescription{}); ok {
62+
out.Exit(h)
63+
}
6064
p, err := p.LoadVirtualProfile(fs)
6165
out.MaybeDie(err, "rpk unable to load config: %v", err)
6266

@@ -86,6 +90,16 @@ Describe any one-character group:
8690
if err != nil {
8791
out.Die("unable to describe groups: %v", err)
8892
}
93+
94+
groupDescriptions := buildDescribed(lags, partitionCount)
95+
formatOutput, isText, err := formatDescribedJSON(groupDescriptions, f)
96+
if err != nil {
97+
out.Die("Error formatting group descriptions: %v", err)
98+
}
99+
if !isText {
100+
fmt.Print(formatOutput)
101+
return
102+
}
89103
if lagPerTopic {
90104
printLagPerTopic(lags, partitionCount)
91105
return
@@ -97,6 +111,9 @@ Describe any one-character group:
97111
printDescribed(commitsOnly, lags, useInstanceID, partitionCount)
98112
},
99113
}
114+
115+
p.InstallFormatFlag(cmd)
116+
100117
cmd.Flags().BoolVarP(&lagPerTopic, "print-lag-per-topic", "t", false, "Print the aggregated lag per topic")
101118
cmd.Flags().BoolVarP(&summary, "print-summary", "s", false, "Print only the group summary section")
102119
cmd.Flags().BoolVarP(&commitsOnly, "print-commits", "c", false, "Print only the group commits section")
@@ -107,8 +124,134 @@ Describe any one-character group:
107124
return cmd
108125
}
109126

127+
// Below here lies printing the output in json/yaml.
128+
129+
// groupDescription provides details about the consumer group.
130+
type groupDescription struct {
131+
GroupName string `json:"group_name" yaml:"group_name"`
132+
CoordinatorPartition string `json:"coordinator_partition" yaml:"coordinator_partition"`
133+
State string `json:"state" yaml:"state"`
134+
Balancer string `json:"balancer" yaml:"balancer"`
135+
Members int `json:"members" yaml:"members"`
136+
CoordinatorNode int `json:"coordinator_node" yaml:"coordinator_node"`
137+
TotalLag int `json:"total_lag" yaml:"total_lag"`
138+
Partitions []partitionInfo `json:"partitions" yaml:"partitions"`
139+
MembersDetails []memberInfo `json:"members_details" yaml:"members_details"`
140+
}
141+
142+
// partitionInfo is use to provide details about the partition.
143+
type partitionInfo struct {
144+
Partition int `json:"partition" yaml:"partition"`
145+
CurrentOffset int `json:"current_offset" yaml:"current_offset"`
146+
LogStartOffset int `json:"log_start_offset" yaml:"log_start_offset"`
147+
LogEndOffset int `json:"log_end_offset" yaml:"log_end_offset"`
148+
Lag int `json:"lag" yaml:"lag"`
149+
Topic string `json:"topic" yaml:"topic"`
150+
MemberID string `json:"member_id" yaml:"member_id"`
151+
ClientID string `json:"client_id" yaml:"client_id"`
152+
Host string `json:"host" yaml:"host"`
153+
}
154+
155+
// memberInfo is used to provide details about the members.
156+
type memberInfo struct {
157+
MemberID string `json:"member_id" yaml:"member_id"`
158+
ClientID string `json:"client_id" yaml:"client_id"`
159+
Host string `json:"host" yaml:"host"`
160+
TopicPartitions []topicPartition `json:"topic_partitions" yaml:"topic_partitions"`
161+
}
162+
163+
// topicPartition will be used for Topic.
164+
type topicPartition struct {
165+
Topic string `json:"topic" yaml:"topic"`
166+
Partition int `json:"partition" yaml:"partition"`
167+
}
168+
169+
// BuildDescribed gathers all the group information and returns
170+
// it as a groupDescription.
171+
func buildDescribed(lags kadm.DescribedGroupLags, partitionCount int) []groupDescription {
172+
var groupDescriptions []groupDescription
173+
var partitionID int32
174+
175+
for _, group := range lags.Sorted() {
176+
var groupDescription groupDescription
177+
groupDescription.GroupName = group.Group
178+
groupDescription.CoordinatorNode = int(group.Coordinator.NodeID)
179+
groupDescription.State = group.State
180+
groupDescription.Balancer = group.Protocol
181+
groupDescription.Members = len(group.Members)
182+
groupDescription.TotalLag = int(group.Lag.Total())
183+
if partitionCount > 0 {
184+
// We do calculate the partition ID for the group with the following
185+
// hash-based algorithm, which is Redpanda specific at least at 25.1.x, https://github.com/redpanda-data/redpanda/blob/v25.1.1/src/v/kafka/server/coordinator_ntp_mapper.h#L50-L60
186+
// Apache Kafka does use a different algorithm, https://github.com/apache/kafka/blob/4.0.0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L192
187+
hash := xxhash.Sum64String(group.Group)
188+
partitionID = jump.Hash(hash, int32(partitionCount))
189+
groupDescription.CoordinatorPartition = fmt.Sprintf("__consumer_offsets/%d\n", partitionID)
190+
}
191+
192+
var partitionsInfo []partitionInfo
193+
for _, l := range group.Lag.Sorted() {
194+
pi := partitionInfo{
195+
Topic: l.Topic,
196+
Partition: int(l.Partition),
197+
LogStartOffset: int(l.Start.Offset),
198+
LogEndOffset: int(l.End.Offset),
199+
Lag: int(l.Lag),
200+
}
201+
if l.Commit.At == -1 {
202+
pi.CurrentOffset = -1
203+
} else {
204+
pi.CurrentOffset = int(l.Commit.At)
205+
}
206+
if l.Member != nil {
207+
pi.MemberID = l.Member.MemberID
208+
pi.ClientID = l.Member.ClientID
209+
pi.Host = l.Member.ClientHost
210+
} else {
211+
pi.MemberID = ""
212+
pi.ClientID = ""
213+
pi.Host = ""
214+
}
215+
partitionsInfo = append(partitionsInfo, pi)
216+
}
217+
groupDescription.Partitions = partitionsInfo
218+
var membersInfo []memberInfo
219+
for _, m := range group.Members {
220+
mi := memberInfo{
221+
MemberID: m.MemberID,
222+
ClientID: m.ClientID,
223+
Host: m.ClientHost,
224+
}
225+
226+
// topic partitions for members
227+
mi.TopicPartitions = []topicPartition{}
228+
membersInfo = append(membersInfo, mi)
229+
}
230+
groupDescription.MembersDetails = membersInfo
231+
232+
groupDescriptions = append(groupDescriptions, groupDescription)
233+
}
234+
return groupDescriptions
235+
}
236+
237+
// formatDescribedJSON is used to verify the type of format that
238+
// should be outputted. If it is suppose to be text, then return true
239+
// so that main run function can continue with printing in text.
240+
// if it returns false, then print in requested format (json or yaml)
241+
// and then return after.
242+
func formatDescribedJSON(groupDescriptions []groupDescription, f config.OutFormatter) (string, bool, error) {
243+
isText, _, s, err := f.Format(groupDescriptions)
244+
if err != nil {
245+
return "", false, fmt.Errorf("unable to print in the required format %q: %v", f.Kind, err)
246+
}
247+
// handle text printing else where
248+
if isText {
249+
return "", true, nil
250+
}
251+
return s, false, nil
252+
}
253+
110254
// Below here lies printing the output of everything we have done.
111-
//
112255
// There is not much logic; the main thing to note is that we use dashes when
113256
// some fields do not apply yet, and we only output the instance id or error
114257
// columns if any member in the group has an instance id / error.

0 commit comments

Comments
 (0)