Skip to content

Commit

Permalink
fix(konnect): fix KongConsumer update when assigned group does not ha…
Browse files Browse the repository at this point in the history
…ve a Konnect status or ID set (#638)

* fix(konnect): fix KongConsumer update when assigned group does not have a Konnect status or ID set

* Update controller/konnect/ops/ops_kongconsumer.go

Co-authored-by: Jakub Warczarek <[email protected]>

* Update test/envtest/konnect_entities_kongconsumer_test.go

Co-authored-by: Jakub Warczarek <[email protected]>

---------

Co-authored-by: Jakub Warczarek <[email protected]>
Co-authored-by: Tao Yi <[email protected]>
  • Loading branch information
3 people authored Sep 24, 2024
1 parent 6b321b3 commit e165d39
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 81 deletions.
38 changes: 29 additions & 9 deletions controller/konnect/ops/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,20 @@ func Delete[
}
}

// Update updates a Konnect entity.
// It returns an error if the entity does not have a Konnect ID or if the operation fails.
func Update[
func shouldUpdate[
T constraints.SupportedKonnectEntityType,
TEnt constraints.EntityType[T],
](ctx context.Context, sdk SDKWrapper, syncPeriod time.Duration, cl client.Client, e *T) (ctrl.Result, error) {
](
ctx context.Context,
ent TEnt,
syncPeriod time.Duration,
now time.Time,
) (bool, ctrl.Result) {
var (
ent = TEnt(e)
condProgrammed, ok = k8sutils.GetCondition(conditions.KonnectEntityProgrammedConditionType, ent)
now = time.Now()
timeFromLastUpdate = time.Since(condProgrammed.LastTransitionTime.Time)
)

// If the entity is already programmed and the last update was less than
// the configured sync period, requeue after the remaining time.
if ok &&
Expand All @@ -148,15 +150,33 @@ func Update[
timeFromLastUpdate <= syncPeriod {
requeueAfter := syncPeriod - timeFromLastUpdate
log.Debug(ctrllog.FromContext(ctx),
"no need for update, requeueing after configured sync period", e,
"no need for update, requeueing after configured sync period", ent,
"last_update", condProgrammed.LastTransitionTime.Time,
"time_from_last_update", timeFromLastUpdate,
"requeue_after", requeueAfter,
"requeue_at", now.Add(requeueAfter),
)
return ctrl.Result{
return false, ctrl.Result{
RequeueAfter: requeueAfter,
}, nil
}
}

return true, ctrl.Result{}
}

// Update updates a Konnect entity.
// It returns an error if the entity does not have a Konnect ID or if the operation fails.
func Update[
T constraints.SupportedKonnectEntityType,
TEnt constraints.EntityType[T],
](ctx context.Context, sdk SDKWrapper, syncPeriod time.Duration, cl client.Client, e *T) (ctrl.Result, error) {
var (
ent = TEnt(e)
now = time.Now()
)

if ok, res := shouldUpdate(ctx, ent, syncPeriod, now); !ok {
return res, nil
}

if ent.GetKonnectStatus().GetKonnectID() == "" {
Expand Down
41 changes: 17 additions & 24 deletions controller/konnect/ops/ops_kongconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,37 +121,20 @@ func handleConsumerGroupAssignments(
) error {
// Resolve the Konnect IDs of the ConsumerGroups referenced by the KongConsumer.
desiredConsumerGroupsIDs, invalidConsumerGroups, err := resolveConsumerGroupsKonnectIDs(ctx, consumer, cl)
if err != nil {
k8sutils.SetCondition(
k8sutils.NewConditionWithGeneration(
conditions.KonnectEntityProgrammedConditionType,
metav1.ConditionFalse,
conditions.KonnectEntityProgrammedReasonFailedToResolveConsumerGroupRefs,
err.Error(),
consumer.GetGeneration(),
),
consumer,
)
return err
}

// Even if we have invalid ConsumerGroup references, we carry on with the ones that are valid. Invalid ones will be
// reported in the condition.
populateConsumerGroupRefsValidCondition(invalidConsumerGroups, consumer)

if err != nil {
SetKonnectEntityProgrammedConditionFalse(consumer, conditions.KonnectEntityProgrammedReasonFailedToResolveConsumerGroupRefs, err.Error())
return err
}

// Reconcile the ConsumerGroups assigned to the KongConsumer in Konnect (list the actual ConsumerGroups, calculate the
// difference, and add/remove the Consumer from the ConsumerGroups accordingly).
if err := reconcileConsumerGroupsWithKonnect(ctx, desiredConsumerGroupsIDs, cgSDK, cpID, consumer); err != nil {
k8sutils.SetCondition(
k8sutils.NewConditionWithGeneration(
conditions.KonnectEntityProgrammedConditionType,
metav1.ConditionFalse,
conditions.KonnectEntityProgrammedReasonFailedToReconcileConsumerGroupsWithKonnect,
err.Error(),
consumer.GetGeneration(),
),
consumer,
)
SetKonnectEntityProgrammedConditionFalse(consumer, conditions.KonnectEntityProgrammedReasonFailedToReconcileConsumerGroupsWithKonnect, err.Error())
return err
}
return nil
Expand Down Expand Up @@ -198,6 +181,9 @@ func reconcileConsumerGroupsWithKonnect(

// Adding consumer to consumer groups that it is not assigned to yet.
for _, cgID := range consumerGroupsToBeAddedTo {
log.Debug(ctrllog.FromContext(ctx), "adding KongConsumer to group", consumer,
"group", cgID,
)
_, err := cgSDK.AddConsumerToGroup(ctx, sdkkonnectops.AddConsumerToGroupRequest{
ControlPlaneID: cpID,
ConsumerGroupID: cgID,
Expand All @@ -212,6 +198,9 @@ func reconcileConsumerGroupsWithKonnect(

// Removing consumer from consumer groups that it is not assigned to anymore.
for _, cgID := range consumerGroupsToBeRemovedFrom {
log.Debug(ctrllog.FromContext(ctx), "removing KongConsumer from group", consumer,
"group", cgID,
)
_, err := cgSDK.RemoveConsumerFromGroup(ctx, sdkkonnectops.RemoveConsumerFromGroupRequest{
ControlPlaneID: cpID,
ConsumerGroupID: cgID,
Expand Down Expand Up @@ -284,7 +273,7 @@ func resolveConsumerGroupsKonnectIDs(
}
return nil, nil, fmt.Errorf("failed to get KongConsumerGroup %s/%s: %w", consumer.Namespace, cgName, err)
}
if cg.GetKonnectStatus() != nil && cg.GetKonnectStatus().GetKonnectID() == "" {
if cg.GetKonnectStatus() == nil || cg.GetKonnectStatus().GetKonnectID() == "" {
invalidConsumerGroups = append(invalidConsumerGroups, invalidConsumerGroupRef{
Name: cgName,
Reason: "NotCreatedInKonnect",
Expand All @@ -293,6 +282,10 @@ func resolveConsumerGroupsKonnectIDs(
}
desiredConsumerGroupsIDs = append(desiredConsumerGroupsIDs, cg.GetKonnectStatus().GetKonnectID())
}
if len(invalidConsumerGroups) > 0 {
err := errors.New("some KongConsumerGroups couldn't be assigned to KongConsumer, see KongConsumer status for details")
return desiredConsumerGroupsIDs, invalidConsumerGroups, err
}
return desiredConsumerGroupsIDs, invalidConsumerGroups, nil
}

Expand Down
125 changes: 77 additions & 48 deletions test/envtest/konnect_entities_kongconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,31 @@ func TestKongConsumer(t *testing.T) {
username = "user-1"
)
t.Log("Setting up SDK expectations on KongConsumer creation")
sdk.ConsumersSDK.EXPECT().CreateConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(),
mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool {
return input.Username != nil && *input.Username == username
}),
).Return(&sdkkonnectops.CreateConsumerResponse{
sdk.ConsumersSDK.EXPECT().
CreateConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(),
mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool {
return input.Username != nil && *input.Username == username
}),
).Return(&sdkkonnectops.CreateConsumerResponse{
Consumer: &sdkkonnectcomp.Consumer{
ID: lo.ToPtr(consumerID),
},
}, nil)

t.Log("Setting up SDK expectation on possibly updating KongConsumer ( due to asynchronous nature of updates between KongConsumer and KongConsumerGroup)")
sdk.ConsumersSDK.EXPECT().
UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool {
return r.ConsumerID == consumerID
})).
Return(&sdkkonnectops.UpsertConsumerResponse{}, nil).
Maybe()

t.Log("Setting up SDK expectation on KongConsumerGroups listing")
sdk.ConsumerGroupSDK.EXPECT().ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{
ConsumerID: consumerID,
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
}).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{}, nil)
sdk.ConsumerGroupSDK.EXPECT().
ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{
ConsumerID: consumerID,
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
}).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{}, nil)

t.Log("Creating KongConsumer")
createdConsumer := deployKongConsumerAttachedToCP(t, ctx, clientNamespaced, username, cp)
Expand All @@ -99,10 +109,12 @@ func TestKongConsumer(t *testing.T) {
}, waitTime, tickTime)

t.Log("Setting up SDK expectations on KongConsumer update")
sdk.ConsumersSDK.EXPECT().UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool {
return r.ConsumerID == consumerID &&
r.Consumer.Username != nil && *r.Consumer.Username == "user-1-updated"
})).Return(&sdkkonnectops.UpsertConsumerResponse{}, nil)
sdk.ConsumersSDK.EXPECT().
UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool {
return r.ConsumerID == consumerID &&
r.Consumer.Username != nil && *r.Consumer.Username == "user-1-updated"
})).
Return(&sdkkonnectops.UpsertConsumerResponse{}, nil)

t.Log("Patching KongConsumer")
consumerToPatch := createdConsumer.DeepCopy()
Expand All @@ -115,7 +127,8 @@ func TestKongConsumer(t *testing.T) {
}, waitTime, tickTime)

t.Log("Setting up SDK expectations on KongConsumer deletion")
sdk.ConsumersSDK.EXPECT().DeleteConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), consumerID).
sdk.ConsumersSDK.EXPECT().
DeleteConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(), consumerID).
Return(&sdkkonnectops.DeleteConsumerResponse{}, nil)

t.Log("Deleting KongConsumer")
Expand All @@ -139,42 +152,54 @@ func TestKongConsumer(t *testing.T) {
consumerGroupName = "consumer-group-1"
)
t.Log("Setting up SDK expectations on KongConsumer creation with ConsumerGroup")
sdk.ConsumersSDK.EXPECT().CreateConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(),
mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool {
return input.Username != nil && *input.Username == username
}),
).Return(&sdkkonnectops.CreateConsumerResponse{
sdk.ConsumersSDK.EXPECT().
CreateConsumer(mock.Anything, cp.GetKonnectStatus().GetKonnectID(),
mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool {
return input.Username != nil && *input.Username == username
}),
).Return(&sdkkonnectops.CreateConsumerResponse{
Consumer: &sdkkonnectcomp.Consumer{
ID: lo.ToPtr(consumerID),
},
}, nil)

sdk.ConsumerGroupSDK.EXPECT().CreateConsumerGroup(mock.Anything, cp.GetKonnectStatus().GetKonnectID(),
mock.MatchedBy(func(input sdkkonnectcomp.ConsumerGroupInput) bool {
return input.Name == consumerGroupName
}),
).Return(&sdkkonnectops.CreateConsumerGroupResponse{
t.Log("Setting up SDK expectation on possibly updating KongConsumer (due to asynchronous nature of updates between KongConsumer and KongConsumerGroup)")
sdk.ConsumersSDK.EXPECT().
UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool {
return r.ConsumerID == consumerID
})).
Return(&sdkkonnectops.UpsertConsumerResponse{}, nil).
Maybe()

sdk.ConsumerGroupSDK.EXPECT().
CreateConsumerGroup(mock.Anything, cp.GetKonnectStatus().GetKonnectID(),
mock.MatchedBy(func(input sdkkonnectcomp.ConsumerGroupInput) bool {
return input.Name == consumerGroupName
}),
).Return(&sdkkonnectops.CreateConsumerGroupResponse{
ConsumerGroup: &sdkkonnectcomp.ConsumerGroup{
ID: lo.ToPtr(cgID),
},
}, nil)

t.Log("Setting up SDK expectation on KongConsumerGroups listing")
emptyListCall := sdk.ConsumerGroupSDK.EXPECT().ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{
ConsumerID: consumerID,
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
}).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{
emptyListCall := sdk.ConsumerGroupSDK.EXPECT().
ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{
ConsumerID: consumerID,
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
}).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{
// Returning no ConsumerGroups associated with the Consumer in Konnect to trigger addition.
}, nil)

t.Log("Setting up SDK expectation on adding Consumer to ConsumerGroup")
sdk.ConsumerGroupSDK.EXPECT().AddConsumerToGroup(mock.Anything, sdkkonnectops.AddConsumerToGroupRequest{
ConsumerGroupID: cgID,
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
RequestBody: &sdkkonnectops.AddConsumerToGroupRequestBody{
ConsumerID: lo.ToPtr(consumerID),
},
}).Return(&sdkkonnectops.AddConsumerToGroupResponse{}, nil)
sdk.ConsumerGroupSDK.EXPECT().
AddConsumerToGroup(mock.Anything, sdkkonnectops.AddConsumerToGroupRequest{
ConsumerGroupID: cgID,
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
RequestBody: &sdkkonnectops.AddConsumerToGroupRequestBody{
ConsumerID: lo.ToPtr(consumerID),
},
}).Return(&sdkkonnectops.AddConsumerToGroupResponse{}, nil)

t.Log("Creating KongConsumerGroup")
createdConsumerGroup := deployKongConsumerGroupAttachedToCP(t, ctx, clientNamespaced, consumerGroupName, cp)
Expand Down Expand Up @@ -213,16 +238,19 @@ func TestKongConsumer(t *testing.T) {
}, waitTime, tickTime)

t.Log("Setting up SDK expectations on KongConsumer update with ConsumerGroup")
sdk.ConsumersSDK.EXPECT().UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool {
return r.ConsumerID == consumerID &&
r.Consumer.Username != nil && *r.Consumer.Username == "user-2-updated"
})).Return(&sdkkonnectops.UpsertConsumerResponse{}, nil)
sdk.ConsumersSDK.EXPECT().
UpsertConsumer(mock.Anything, mock.MatchedBy(func(r sdkkonnectops.UpsertConsumerRequest) bool {
return r.ConsumerID == consumerID &&
r.Consumer.Username != nil && *r.Consumer.Username == "user-2-updated"
})).
Return(&sdkkonnectops.UpsertConsumerResponse{}, nil)

emptyListCall.Unset() // Unset the previous expectation to allow the new one to be set.
sdk.ConsumerGroupSDK.EXPECT().ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{
ConsumerID: consumerID,
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
}).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{
sdk.ConsumerGroupSDK.EXPECT().
ListConsumerGroupsForConsumer(mock.Anything, sdkkonnectops.ListConsumerGroupsForConsumerRequest{
ConsumerID: consumerID,
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
}).Return(&sdkkonnectops.ListConsumerGroupsForConsumerResponse{
Object: &sdkkonnectops.ListConsumerGroupsForConsumerResponseBody{
Data: []sdkkonnectcomp.ConsumerGroup{
{
Expand All @@ -239,11 +267,12 @@ func TestKongConsumer(t *testing.T) {
},
}, nil)

sdk.ConsumerGroupSDK.EXPECT().RemoveConsumerFromGroup(mock.Anything, sdkkonnectops.RemoveConsumerFromGroupRequest{
ConsumerGroupID: "not-defined-in-crd",
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
ConsumerID: consumerID,
}).Return(&sdkkonnectops.RemoveConsumerFromGroupResponse{}, nil)
sdk.ConsumerGroupSDK.EXPECT().
RemoveConsumerFromGroup(mock.Anything, sdkkonnectops.RemoveConsumerFromGroupRequest{
ConsumerGroupID: "not-defined-in-crd",
ControlPlaneID: cp.GetKonnectStatus().GetKonnectID(),
ConsumerID: consumerID,
}).Return(&sdkkonnectops.RemoveConsumerFromGroupResponse{}, nil)

t.Log("Patching KongConsumer to trigger reconciliation")
consumerToPatch := createdConsumer.DeepCopy()
Expand Down

0 comments on commit e165d39

Please sign in to comment.