From e165d39807b39ebeab444fcda7d44f9c87044760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Tue, 24 Sep 2024 15:00:38 +0200 Subject: [PATCH] fix(konnect): fix KongConsumer update when assigned group does not have a Konnect status or ID set (#638) * fix(konnect): fix KongConsumer update when assigned group does not have a Konnect status or ID set * Update controller/konnect/ops/ops_kongconsumer.go Co-authored-by: Jakub Warczarek * Update test/envtest/konnect_entities_kongconsumer_test.go Co-authored-by: Jakub Warczarek --------- Co-authored-by: Jakub Warczarek Co-authored-by: Tao Yi --- controller/konnect/ops/ops.go | 38 ++++-- controller/konnect/ops/ops_kongconsumer.go | 41 +++--- .../konnect_entities_kongconsumer_test.go | 125 +++++++++++------- 3 files changed, 123 insertions(+), 81 deletions(-) diff --git a/controller/konnect/ops/ops.go b/controller/konnect/ops/ops.go index c38121d94..3ce960497 100644 --- a/controller/konnect/ops/ops.go +++ b/controller/konnect/ops/ops.go @@ -127,18 +127,20 @@ func Delete[ } } -// Update updates a Konnect entity. -// It returns an error if the entity does not have a Konnect ID or if the operation fails. -func Update[ +func shouldUpdate[ T constraints.SupportedKonnectEntityType, TEnt constraints.EntityType[T], -](ctx context.Context, sdk SDKWrapper, syncPeriod time.Duration, cl client.Client, e *T) (ctrl.Result, error) { +]( + ctx context.Context, + ent TEnt, + syncPeriod time.Duration, + now time.Time, +) (bool, ctrl.Result) { var ( - ent = TEnt(e) condProgrammed, ok = k8sutils.GetCondition(conditions.KonnectEntityProgrammedConditionType, ent) - now = time.Now() timeFromLastUpdate = time.Since(condProgrammed.LastTransitionTime.Time) ) + // If the entity is already programmed and the last update was less than // the configured sync period, requeue after the remaining time. if ok && @@ -148,15 +150,33 @@ func Update[ timeFromLastUpdate <= syncPeriod { requeueAfter := syncPeriod - timeFromLastUpdate log.Debug(ctrllog.FromContext(ctx), - "no need for update, requeueing after configured sync period", e, + "no need for update, requeueing after configured sync period", ent, "last_update", condProgrammed.LastTransitionTime.Time, "time_from_last_update", timeFromLastUpdate, "requeue_after", requeueAfter, "requeue_at", now.Add(requeueAfter), ) - return ctrl.Result{ + return false, ctrl.Result{ RequeueAfter: requeueAfter, - }, nil + } + } + + return true, ctrl.Result{} +} + +// Update updates a Konnect entity. +// It returns an error if the entity does not have a Konnect ID or if the operation fails. +func Update[ + T constraints.SupportedKonnectEntityType, + TEnt constraints.EntityType[T], +](ctx context.Context, sdk SDKWrapper, syncPeriod time.Duration, cl client.Client, e *T) (ctrl.Result, error) { + var ( + ent = TEnt(e) + now = time.Now() + ) + + if ok, res := shouldUpdate(ctx, ent, syncPeriod, now); !ok { + return res, nil } if ent.GetKonnectStatus().GetKonnectID() == "" { diff --git a/controller/konnect/ops/ops_kongconsumer.go b/controller/konnect/ops/ops_kongconsumer.go index 2a80b0797..e50871573 100644 --- a/controller/konnect/ops/ops_kongconsumer.go +++ b/controller/konnect/ops/ops_kongconsumer.go @@ -121,37 +121,20 @@ func handleConsumerGroupAssignments( ) error { // Resolve the Konnect IDs of the ConsumerGroups referenced by the KongConsumer. desiredConsumerGroupsIDs, invalidConsumerGroups, err := resolveConsumerGroupsKonnectIDs(ctx, consumer, cl) - if err != nil { - k8sutils.SetCondition( - k8sutils.NewConditionWithGeneration( - conditions.KonnectEntityProgrammedConditionType, - metav1.ConditionFalse, - conditions.KonnectEntityProgrammedReasonFailedToResolveConsumerGroupRefs, - err.Error(), - consumer.GetGeneration(), - ), - consumer, - ) - return err - } // Even if we have invalid ConsumerGroup references, we carry on with the ones that are valid. Invalid ones will be // reported in the condition. populateConsumerGroupRefsValidCondition(invalidConsumerGroups, consumer) + if err != nil { + SetKonnectEntityProgrammedConditionFalse(consumer, conditions.KonnectEntityProgrammedReasonFailedToResolveConsumerGroupRefs, err.Error()) + return err + } + // Reconcile the ConsumerGroups assigned to the KongConsumer in Konnect (list the actual ConsumerGroups, calculate the // difference, and add/remove the Consumer from the ConsumerGroups accordingly). if err := reconcileConsumerGroupsWithKonnect(ctx, desiredConsumerGroupsIDs, cgSDK, cpID, consumer); err != nil { - k8sutils.SetCondition( - k8sutils.NewConditionWithGeneration( - conditions.KonnectEntityProgrammedConditionType, - metav1.ConditionFalse, - conditions.KonnectEntityProgrammedReasonFailedToReconcileConsumerGroupsWithKonnect, - err.Error(), - consumer.GetGeneration(), - ), - consumer, - ) + SetKonnectEntityProgrammedConditionFalse(consumer, conditions.KonnectEntityProgrammedReasonFailedToReconcileConsumerGroupsWithKonnect, err.Error()) return err } return nil @@ -198,6 +181,9 @@ func reconcileConsumerGroupsWithKonnect( // Adding consumer to consumer groups that it is not assigned to yet. for _, cgID := range consumerGroupsToBeAddedTo { + log.Debug(ctrllog.FromContext(ctx), "adding KongConsumer to group", consumer, + "group", cgID, + ) _, err := cgSDK.AddConsumerToGroup(ctx, sdkkonnectops.AddConsumerToGroupRequest{ ControlPlaneID: cpID, ConsumerGroupID: cgID, @@ -212,6 +198,9 @@ func reconcileConsumerGroupsWithKonnect( // Removing consumer from consumer groups that it is not assigned to anymore. for _, cgID := range consumerGroupsToBeRemovedFrom { + log.Debug(ctrllog.FromContext(ctx), "removing KongConsumer from group", consumer, + "group", cgID, + ) _, err := cgSDK.RemoveConsumerFromGroup(ctx, sdkkonnectops.RemoveConsumerFromGroupRequest{ ControlPlaneID: cpID, ConsumerGroupID: cgID, @@ -284,7 +273,7 @@ func resolveConsumerGroupsKonnectIDs( } return nil, nil, fmt.Errorf("failed to get KongConsumerGroup %s/%s: %w", consumer.Namespace, cgName, err) } - if cg.GetKonnectStatus() != nil && cg.GetKonnectStatus().GetKonnectID() == "" { + if cg.GetKonnectStatus() == nil || cg.GetKonnectStatus().GetKonnectID() == "" { invalidConsumerGroups = append(invalidConsumerGroups, invalidConsumerGroupRef{ Name: cgName, Reason: "NotCreatedInKonnect", @@ -293,6 +282,10 @@ func resolveConsumerGroupsKonnectIDs( } desiredConsumerGroupsIDs = append(desiredConsumerGroupsIDs, cg.GetKonnectStatus().GetKonnectID()) } + if len(invalidConsumerGroups) > 0 { + err := errors.New("some KongConsumerGroups couldn't be assigned to KongConsumer, see KongConsumer status for details") + return desiredConsumerGroupsIDs, invalidConsumerGroups, err + } return desiredConsumerGroupsIDs, invalidConsumerGroups, nil } diff --git a/test/envtest/konnect_entities_kongconsumer_test.go b/test/envtest/konnect_entities_kongconsumer_test.go index bba83c207..fe196543f 100644 --- a/test/envtest/konnect_entities_kongconsumer_test.go +++ b/test/envtest/konnect_entities_kongconsumer_test.go @@ -63,21 +63,31 @@ func TestKongConsumer(t *testing.T) { username = "user-1" ) t.Log("Setting up SDK expectations on KongConsumer creation") - sdk.ConsumersSDK.EXPECT().CreateConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), - mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool { - return input.Username != nil && *input.Username == username - }), - ).Return(&sdkkonnectops.CreateConsumerResponse{ + sdk.ConsumersSDK.EXPECT(). + CreateConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), + mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool { + return input.Username != nil && *input.Username == username + }), + ).Return(&sdkkonnectops.CreateConsumerResponse{ Consumer: &sdkkonnectcomp.Consumer{ ID: lo.ToPtr(consumerID), }, }, nil) + t.Log("Setting up SDK expectation on possibly updating KongConsumer ( due to asynchronous nature of updates between KongConsumer and KongConsumerGroup)") + sdk.ConsumersSDK.EXPECT(). + UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool { + return r.ConsumerID == consumerID + })). + Return(&sdkkonnectops.UpsertConsumerResponse{}, nil). + Maybe() + t.Log("Setting up SDK expectation on KongConsumerGroups listing") - sdk.ConsumerGroupSDK.EXPECT().ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{ - ConsumerID: consumerID, - ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), - }).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{}, nil) + sdk.ConsumerGroupSDK.EXPECT(). + ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{ + ConsumerID: consumerID, + ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), + }).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{}, nil) t.Log("Creating KongConsumer") createdConsumer := deployKongConsumerAttachedToCP(t, ctx, clientNamespaced, username, cp) @@ -99,10 +109,12 @@ func TestKongConsumer(t *testing.T) { }, waitTime, tickTime) t.Log("Setting up SDK expectations on KongConsumer update") - sdk.ConsumersSDK.EXPECT().UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool { - return r.ConsumerID == consumerID && - r.Consumer.Username != nil && *r.Consumer.Username == "user-1-updated" - })).Return(&sdkkonnectops.UpsertConsumerResponse{}, nil) + sdk.ConsumersSDK.EXPECT(). + UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool { + return r.ConsumerID == consumerID && + r.Consumer.Username != nil && *r.Consumer.Username == "user-1-updated" + })). + Return(&sdkkonnectops.UpsertConsumerResponse{}, nil) t.Log("Patching KongConsumer") consumerToPatch := createdConsumer.DeepCopy() @@ -115,7 +127,8 @@ func TestKongConsumer(t *testing.T) { }, waitTime, tickTime) t.Log("Setting up SDK expectations on KongConsumer deletion") - sdk.ConsumersSDK.EXPECT().DeleteConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), consumerID). + sdk.ConsumersSDK.EXPECT(). + DeleteConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), consumerID). Return(&sdkkonnectops.DeleteConsumerResponse{}, nil) t.Log("Deleting KongConsumer") @@ -139,42 +152,54 @@ func TestKongConsumer(t *testing.T) { consumerGroupName = "consumer-group-1" ) t.Log("Setting up SDK expectations on KongConsumer creation with ConsumerGroup") - sdk.ConsumersSDK.EXPECT().CreateConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), - mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool { - return input.Username != nil && *input.Username == username - }), - ).Return(&sdkkonnectops.CreateConsumerResponse{ + sdk.ConsumersSDK.EXPECT(). + CreateConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), + mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool { + return input.Username != nil && *input.Username == username + }), + ).Return(&sdkkonnectops.CreateConsumerResponse{ Consumer: &sdkkonnectcomp.Consumer{ ID: lo.ToPtr(consumerID), }, }, nil) - sdk.ConsumerGroupSDK.EXPECT().CreateConsumerGroup(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), - mock.MatchedBy(func(input sdkkonnectcomp.ConsumerGroupInput) bool { - return input.Name == consumerGroupName - }), - ).Return(&sdkkonnectops.CreateConsumerGroupResponse{ + t.Log("Setting up SDK expectation on possibly updating KongConsumer (due to asynchronous nature of updates between KongConsumer and KongConsumerGroup)") + sdk.ConsumersSDK.EXPECT(). + UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool { + return r.ConsumerID == consumerID + })). + Return(&sdkkonnectops.UpsertConsumerResponse{}, nil). + Maybe() + + sdk.ConsumerGroupSDK.EXPECT(). + CreateConsumerGroup(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), + mock.MatchedBy(func(input sdkkonnectcomp.ConsumerGroupInput) bool { + return input.Name == consumerGroupName + }), + ).Return(&sdkkonnectops.CreateConsumerGroupResponse{ ConsumerGroup: &sdkkonnectcomp.ConsumerGroup{ ID: lo.ToPtr(cgID), }, }, nil) t.Log("Setting up SDK expectation on KongConsumerGroups listing") - emptyListCall := sdk.ConsumerGroupSDK.EXPECT().ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{ - ConsumerID: consumerID, - ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), - }).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{ + emptyListCall := sdk.ConsumerGroupSDK.EXPECT(). + ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{ + ConsumerID: consumerID, + ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), + }).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{ // Returning no ConsumerGroups associated with the Consumer in Konnect to trigger addition. }, nil) t.Log("Setting up SDK expectation on adding Consumer to ConsumerGroup") - sdk.ConsumerGroupSDK.EXPECT().AddConsumerToGroup(mock.Anything, sdkkonnectops.AddConsumerToGroupRequest{ - ConsumerGroupID: cgID, - ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), - RequestBody: &sdkkonnectops.AddConsumerToGroupRequestBody{ - ConsumerID: lo.ToPtr(consumerID), - }, - }).Return(&sdkkonnectops.AddConsumerToGroupResponse{}, nil) + sdk.ConsumerGroupSDK.EXPECT(). + AddConsumerToGroup(mock.Anything, sdkkonnectops.AddConsumerToGroupRequest{ + ConsumerGroupID: cgID, + ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), + RequestBody: &sdkkonnectops.AddConsumerToGroupRequestBody{ + ConsumerID: lo.ToPtr(consumerID), + }, + }).Return(&sdkkonnectops.AddConsumerToGroupResponse{}, nil) t.Log("Creating KongConsumerGroup") createdConsumerGroup := deployKongConsumerGroupAttachedToCP(t, ctx, clientNamespaced, consumerGroupName, cp) @@ -213,16 +238,19 @@ func TestKongConsumer(t *testing.T) { }, waitTime, tickTime) t.Log("Setting up SDK expectations on KongConsumer update with ConsumerGroup") - sdk.ConsumersSDK.EXPECT().UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool { - return r.ConsumerID == consumerID && - r.Consumer.Username != nil && *r.Consumer.Username == "user-2-updated" - })).Return(&sdkkonnectops.UpsertConsumerResponse{}, nil) + sdk.ConsumersSDK.EXPECT(). + UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool { + return r.ConsumerID == consumerID && + r.Consumer.Username != nil && *r.Consumer.Username == "user-2-updated" + })). + Return(&sdkkonnectops.UpsertConsumerResponse{}, nil) emptyListCall.Unset() // Unset the previous expectation to allow the new one to be set. - sdk.ConsumerGroupSDK.EXPECT().ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{ - ConsumerID: consumerID, - ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), - }).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{ + sdk.ConsumerGroupSDK.EXPECT(). + ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{ + ConsumerID: consumerID, + ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), + }).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{ Object: &sdkkonnectops.ListConsumerGroupsForConsumerResponseBody{ Data: []sdkkonnectcomp.ConsumerGroup{ { @@ -239,11 +267,12 @@ func TestKongConsumer(t *testing.T) { }, }, nil) - sdk.ConsumerGroupSDK.EXPECT().RemoveConsumerFromGroup(mock.Anything, sdkkonnectops.RemoveConsumerFromGroupRequest{ - ConsumerGroupID: "not-defined-in-crd", - ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), - ConsumerID: consumerID, - }).Return(&sdkkonnectops.RemoveConsumerFromGroupResponse{}, nil) + sdk.ConsumerGroupSDK.EXPECT(). + RemoveConsumerFromGroup(mock.Anything, sdkkonnectops.RemoveConsumerFromGroupRequest{ + ConsumerGroupID: "not-defined-in-crd", + ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(), + ConsumerID: consumerID, + }).Return(&sdkkonnectops.RemoveConsumerFromGroupResponse{}, nil) t.Log("Patching KongConsumer to trigger reconciliation") consumerToPatch := createdConsumer.DeepCopy()