Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle tags for old consumer-groups implementation #1088

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func syncMain(ctx context.Context, filenames []string, dry bool, parallelism,
// read the current state
var currentState *state.KongState
if workspaceExists {
ctx = context.WithValue(ctx, utils.KongVersionContextKey, parsedKongVersion)
currentState, err = fetchCurrentState(ctx, kongClient, dumpConfig)
if err != nil {
return err
Expand Down
38 changes: 36 additions & 2 deletions dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,14 @@ func getConsumerGroupsConfiguration(ctx context.Context, group *errgroup.Group,
client *kong.Client, config Config, state *utils.KongRawState,
) {
group.Go(func() error {
consumerGroups, err := GetAllConsumerGroups(ctx, client, config.SelectorTags)
var handleTags bool
kongVersionFromContext := utils.GetKongVersionFromContext(ctx)
if kongVersionFromContext.LT(utils.Kong320Version) {
handleTags = true
}
consumerGroups, err := GetAllConsumerGroups(
ctx, client, config.SelectorTags, handleTags,
)
if err != nil {
if kong.IsNotFoundErr(err) || kong.IsForbiddenErr(err) {
return nil
Expand Down Expand Up @@ -523,12 +530,35 @@ func GetAllUpstreams(ctx context.Context,
return upstreams, nil
}

func containsTags(inputTagsMap map[string]bool, tags []*string) bool {
if len(inputTagsMap) == 0 {
// no tags are present in input, so no filtering is expected.
return true
}
if len(tags) == 0 {
// entity is not tagged, but tags were present in the request.
return false
}
for _, cgTag := range tags {
if inputTagsMap[*cgTag] {
return true
}
}
return false
}

// GetAllConsumerGroups queries Kong for all the ConsumerGroups using client.
func GetAllConsumerGroups(ctx context.Context,
client *kong.Client, tags []string,
client *kong.Client, tags []string, handleTags bool,
) ([]*kong.ConsumerGroupObject, error) {
var consumerGroupObjects []*kong.ConsumerGroupObject
opt := newOpt(tags)
tagsLen := len(tags)

tagsMap := make(map[string]bool, tagsLen)
for _, tag := range tags {
tagsMap[tag] = true
}

for {
cgs, nextopt, err := client.ConsumerGroups.List(ctx, opt)
Expand All @@ -540,6 +570,10 @@ func GetAllConsumerGroups(ctx context.Context,
}

for _, cg := range cgs {
if handleTags && !containsTags(tagsMap, cg.Tags) {
continue
}

r, err := client.ConsumerGroups.Get(ctx, cg.Name)
if err != nil {
return nil, err
Expand Down
47 changes: 47 additions & 0 deletions tests/integration/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4814,3 +4814,50 @@ func Test_Sync_DoNotUpdateCreatedAt(t *testing.T) {
// plugins do not have an updated_at field
// consumers do not have an updated_at field
}

// test scope:
// - 2.8.0+
func Test_Sync_ConsumerGroupsWithTags(t *testing.T) {
runWhen(t, "enterprise", ">=2.8.0")
setup(t)

client, err := getTestClient()
if err != nil {
t.Fatalf(err.Error())
}

const (
noTags = "testdata/sync/028-consumer-groups-tags/no_tags.yaml"
withTags = "testdata/sync/028-consumer-groups-tags/with_tags.yaml"
)

// create a consumer-group and a plugin with no tags
require.NoError(t, sync(noTags))

// get the current state
ctx := context.Background()
oldKongState, err := deckDump.Get(ctx, client, deckDump.Config{})
if err != nil {
t.Errorf(err.Error())
}

// create entities with select_tags
time.Sleep(time.Second)
require.NoError(t, sync(withTags))

// get the new state
newKongState, err := deckDump.Get(ctx, client, deckDump.Config{})
if err != nil {
t.Errorf(err.Error())
}

// verify that entities with no tags were not wiped out by the sync with select_tags
require.Equal(t, len(oldKongState.ConsumerGroups), len(newKongState.ConsumerGroups))
require.Equal(
t,
oldKongState.ConsumerGroups[0].ConsumerGroup.CreatedAt,
newKongState.ConsumerGroups[0].ConsumerGroup.CreatedAt,
)
require.Equal(t, len(oldKongState.Plugins), len(newKongState.Plugins))
require.Equal(t, oldKongState.Plugins[0].CreatedAt, newKongState.Plugins[0].CreatedAt)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
_format_version: "3.0"
consumer_groups:
- name: my-cg
consumers:
- username: foo
plugins:
- name: prometheus
consumers:
- username: foo
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
_format_version: "3.0"
_info:
defaults: {}
select_tags:
- foo
services:
- connect_timeout: 60000
enabled: true
host: mockbin.org
name: svc1
port: 80
protocol: http
read_timeout: 60000
retries: 5
write_timeout: 60000
17 changes: 17 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,26 @@ var (

Kong140Version = semver.MustParse("1.4.0")
Kong300Version = semver.MustParse("3.0.0")
Kong320Version = semver.MustParse("3.2.0")
Kong340Version = semver.MustParse("3.4.0")
)

type ContextKey int

const (
// KongVersionContextKey is the context key used to store the Kong version
// in the context.
KongVersionContextKey ContextKey = iota
)

func GetKongVersionFromContext(ctx context.Context) semver.Version {
v, ok := ctx.Value(KongVersionContextKey).(semver.Version)
if !ok {
return semver.Version{}
}
return v
}

var ErrorConsumerGroupUpgrade = errors.New(
"a rate-limiting-advanced plugin with config.consumer_groups\n" +
"and/or config.enforce_consumer_groups was found. Please use Consumer Groups scoped\n" +
Expand Down
Loading