Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 144 additions & 1 deletion src/go/rpk/pkg/cli/group/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ Describe any one-character group:
`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, groups []string) {
f := p.Formatter
if h, ok := f.Help(groupDescription{}); ok {
out.Exit(h)
}
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)

Expand Down Expand Up @@ -86,6 +90,16 @@ Describe any one-character group:
if err != nil {
out.Die("unable to describe groups: %v", err)
}

groupDescriptions := buildDescribed(lags, partitionCount)
formatOutput, isText, err := formatDescribedJSON(groupDescriptions, f)
if err != nil {
out.Die("Error formatting group descriptions: %v", err)
}
if !isText {
fmt.Print(formatOutput)
return
}
if lagPerTopic {
printLagPerTopic(lags, partitionCount)
return
Expand All @@ -97,6 +111,9 @@ Describe any one-character group:
printDescribed(commitsOnly, lags, useInstanceID, partitionCount)
},
}

p.InstallFormatFlag(cmd)

cmd.Flags().BoolVarP(&lagPerTopic, "print-lag-per-topic", "t", false, "Print the aggregated lag per topic")
cmd.Flags().BoolVarP(&summary, "print-summary", "s", false, "Print only the group summary section")
cmd.Flags().BoolVarP(&commitsOnly, "print-commits", "c", false, "Print only the group commits section")
Expand All @@ -107,8 +124,134 @@ Describe any one-character group:
return cmd
}

// Below here lies printing the output in json/yaml.

// groupDescription provides details about the consumer group.
type groupDescription struct {
GroupName string `json:"group_name" yaml:"group_name"`
CoordinatorPartition string `json:"coordinator_partition" yaml:"coordinator_partition"`
State string `json:"state" yaml:"state"`
Balancer string `json:"balancer" yaml:"balancer"`
Members int `json:"members" yaml:"members"`
CoordinatorNode int `json:"coordinator_node" yaml:"coordinator_node"`
TotalLag int `json:"total_lag" yaml:"total_lag"`
Partitions []partitionInfo `json:"partitions" yaml:"partitions"`
MembersDetails []memberInfo `json:"members_details" yaml:"members_details"`
}

// partitionInfo is use to provide details about the partition.
type partitionInfo struct {
Partition int `json:"partition" yaml:"partition"`
CurrentOffset int `json:"current_offset" yaml:"current_offset"`
LogStartOffset int `json:"log_start_offset" yaml:"log_start_offset"`
LogEndOffset int `json:"log_end_offset" yaml:"log_end_offset"`
Lag int `json:"lag" yaml:"lag"`
Topic string `json:"topic" yaml:"topic"`
MemberID string `json:"member_id" yaml:"member_id"`
ClientID string `json:"client_id" yaml:"client_id"`
Host string `json:"host" yaml:"host"`
}

// memberInfo is used to provide details about the members.
type memberInfo struct {
MemberID string `json:"member_id" yaml:"member_id"`
ClientID string `json:"client_id" yaml:"client_id"`
Host string `json:"host" yaml:"host"`
TopicPartitions []topicPartition `json:"topic_partitions" yaml:"topic_partitions"`
}

// topicPartition will be used for Topic.
type topicPartition struct {
Topic string `json:"topic" yaml:"topic"`
Partition int `json:"partition" yaml:"partition"`
}

// BuildDescribed gathers all the group information and returns
// it as a groupDescription.
func buildDescribed(lags kadm.DescribedGroupLags, partitionCount int) []groupDescription {
var groupDescriptions []groupDescription
var partitionID int32

for _, group := range lags.Sorted() {
var groupDescription groupDescription
groupDescription.GroupName = group.Group
groupDescription.CoordinatorNode = int(group.Coordinator.NodeID)
groupDescription.State = group.State
groupDescription.Balancer = group.Protocol
groupDescription.Members = len(group.Members)
groupDescription.TotalLag = int(group.Lag.Total())
if partitionCount > 0 {
// We do calculate the partition ID for the group with the following
// hash-based algorithm, which is Redpanda specific at least at 25.1.x, https://github.com/redpanda-data/redpanda/blob/v25.1.1/src/v/kafka/server/coordinator_ntp_mapper.h#L50-L60
// Apache Kafka does use a different algorithm, https://github.com/apache/kafka/blob/4.0.0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L192
hash := xxhash.Sum64String(group.Group)
partitionID = jump.Hash(hash, int32(partitionCount))
groupDescription.CoordinatorPartition = fmt.Sprintf("__consumer_offsets/%d\n", partitionID)
}

var partitionsInfo []partitionInfo
for _, l := range group.Lag.Sorted() {
pi := partitionInfo{
Topic: l.Topic,
Partition: int(l.Partition),
LogStartOffset: int(l.Start.Offset),
LogEndOffset: int(l.End.Offset),
Lag: int(l.Lag),
}
if l.Commit.At == -1 {
pi.CurrentOffset = -1
} else {
pi.CurrentOffset = int(l.Commit.At)
}
if l.Member != nil {
pi.MemberID = l.Member.MemberID
pi.ClientID = l.Member.ClientID
pi.Host = l.Member.ClientHost
} else {
pi.MemberID = ""
pi.ClientID = ""
pi.Host = ""
}
partitionsInfo = append(partitionsInfo, pi)
}
groupDescription.Partitions = partitionsInfo
var membersInfo []memberInfo
for _, m := range group.Members {
mi := memberInfo{
MemberID: m.MemberID,
ClientID: m.ClientID,
Host: m.ClientHost,
}

// topic partitions for members
mi.TopicPartitions = []topicPartition{}
membersInfo = append(membersInfo, mi)
}
groupDescription.MembersDetails = membersInfo

groupDescriptions = append(groupDescriptions, groupDescription)
}
return groupDescriptions
}

// formatDescribedJSON is used to verify the type of format that
// should be outputted. If it is suppose to be text, then return true
// so that main run function can continue with printing in text.
// if it returns false, then print in requested format (json or yaml)
// and then return after.
func formatDescribedJSON(groupDescriptions []groupDescription, f config.OutFormatter) (string, bool, error) {
isText, _, s, err := f.Format(groupDescriptions)
if err != nil {
return "", false, fmt.Errorf("unable to print in the required format %q: %v", f.Kind, err)
}
// handle text printing else where
if isText {
return "", true, nil
}
return s, false, nil
}

// Below here lies printing the output of everything we have done.
//
// There is not much logic; the main thing to note is that we use dashes when
// some fields do not apply yet, and we only output the instance id or error
// columns if any member in the group has an instance id / error.
Expand Down