Skip to content

Commit

Permalink
Collect metrics for a set of CG states ADDENDUM (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
dobrerazvan authored Jul 27, 2023
1 parent 0e82c7c commit b201f8c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
10 changes: 1 addition & 9 deletions minion/consumer_group_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,9 @@ func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[
return nil, fmt.Errorf("failed to list groupsRes: %w", err)
}
groupIDs := make([]string, len(groupsRes.AllowedGroups.Groups))
groupStatesMap := s.Cfg.ConsumerGroups.GetAllowedConsumerGroupStates()

for i, group := range groupsRes.AllowedGroups.Groups {
if len(groupStatesMap) == 0 {
groupIDs[i] = group.Group
} else {
// only add group if it's state is allowed
if _, ok := groupStatesMap[group.GroupState]; ok {
groupIDs[i] = group.Group
}
}
groupIDs[i] = group.Group
}

return s.listConsumerGroupOffsetsBulk(ctx, groupIDs)
Expand Down
3 changes: 2 additions & 1 deletion minion/describe_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func (s *Service) listConsumerGroupsCached(ctx context.Context) (*GroupsInfo, er
return nil, err
}
allowedGroups := make([]kmsg.ListGroupsResponseGroup, 0)

for i := range res.Groups {
if s.IsGroupAllowed(res.Groups[i].Group) {
if s.IsGroupAllowed(res.Groups[i].Group, res.Groups[i].GroupState) {
allowedGroups = append(allowedGroups, res.Groups[i])
}
}
Expand Down
4 changes: 2 additions & 2 deletions minion/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *Storage) getNumberOfConsumedRecords() float64 {
return s.consumedRecords.Load()
}

func (s *Storage) getGroupOffsets(isAllowed func(groupName string) bool) map[string]map[string]map[int32]OffsetCommit {
func (s *Storage) getGroupOffsets(isAllowed func(groupName string, groupState string) bool) map[string]map[string]map[int32]OffsetCommit {
// Offsets by group, topic, partition
offsetsByGroup := make(map[string]map[string]map[int32]OffsetCommit)

Expand All @@ -121,7 +121,7 @@ func (s *Storage) getGroupOffsets(isAllowed func(groupName string) bool) map[str
for _, offset := range offsets {
val := offset.(OffsetCommit)

if !isAllowed(val.Key.Group) {
if !isAllowed(val.Key.Group, "") {
continue
}

Expand Down
11 changes: 10 additions & 1 deletion minion/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
)

func (s *Service) IsGroupAllowed(groupName string) bool {
func (s *Service) IsGroupAllowed(groupName string, groupState string) bool {
isAllowed := false
for _, regex := range s.AllowedGroupIDsExpr {
if regex.MatchString(groupName) {
Expand All @@ -21,6 +21,15 @@ func (s *Service) IsGroupAllowed(groupName string) bool {
break
}
}

if isAllowed && groupState != "" {
groupStatesMap := s.Cfg.ConsumerGroups.GetAllowedConsumerGroupStates()
if len(groupStatesMap) > 0 {
if _, ok := groupStatesMap[groupState]; !ok {
isAllowed = false
}
}
}
return isAllowed
}

Expand Down

0 comments on commit b201f8c

Please sign in to comment.