Skip to content

Commit

Permalink
draft glue trigger #5
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Sushkov (teeverr) committed Oct 8, 2024
1 parent ac94b5a commit 7a19730
Showing 1 changed file with 70 additions and 62 deletions.
132 changes: 70 additions & 62 deletions pkg/controller/glue/trigger/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package trigger
import (
"context"
"encoding/json"

svcsdk "github.com/aws/aws-sdk-go/service/glue"
svcsdkapi "github.com/aws/aws-sdk-go/service/glue/glueiface"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
Expand Down Expand Up @@ -57,93 +56,108 @@ func newCustomExternal(kube client.Client, client svcsdkapi.GlueAPI) *customExte
client: client,
preObserve: preObserve,
postObserve: postObserve,
lateInitialize: nopLateInitialize,
isUpToDate: isUpToDate,
preCreate: preCreate,
postCreate: nopPostCreate,
preDelete: preDelete,
postDelete: nopPostDelete,
preUpdate: preUpdate,
postUpdate: nopPostUpdate},
lateInitialize: nopLateInitialize,
postCreate: nopPostCreate,
postDelete: nopPostDelete,
postUpdate: nopPostUpdate,
},
}
}

func (e *customExternal) Observe(ctx context.Context, mg cpresource.Managed) (managed.ExternalObservation, error) {
return e.Observe(ctx, mg)
}

func (e *customExternal) Create(ctx context.Context, mg cpresource.Managed) (managed.ExternalCreation, error) {
return e.Create(ctx, mg)
}

func (e *customExternal) Delete(ctx context.Context, mg cpresource.Managed) error {
return e.Delete(ctx, mg)
}
//func (e *customExternal) Observe(ctx context.Context, mg cpresource.Managed) (managed.ExternalObservation, error) {
// return e.Observe(ctx, mg)
//}
//
//func (e *customExternal) Create(ctx context.Context, mg cpresource.Managed) (managed.ExternalCreation, error) {
// return e.Create(ctx, mg)
//}
//
//func (e *customExternal) Delete(ctx context.Context, mg cpresource.Managed) error {
// return e.Delete(ctx, mg)
//}

func (e *customExternal) Update(ctx context.Context, mg cpresource.Managed) (managed.ExternalUpdate, error) {
cr, ok := mg.(*svcapitypes.Trigger)
if !ok {
return managed.ExternalUpdate{}, errors.New(errUnexpectedObject)
}
input := &svcsdk.UpdateTriggerInput{}
triggerUpdate := &svcsdk.TriggerUpdate{}

predicate := svcsdk.Predicate{}
if cr.Spec.ForProvider.Predicate != nil {
if cr.Spec.ForProvider.Predicate.Conditions != nil {
for _, condition := range cr.Spec.ForProvider.Predicate.Conditions {
predicate.Conditions = append(
predicate.Conditions,
&svcsdk.Condition{
CrawlState: condition.CrawlState,
CrawlerName: condition.CrawlerName,
JobName: condition.JobName,
LogicalOperator: condition.LogicalOperator,
State: condition.State,
},
)
triggerUpdate.Predicate.Conditions = make([]*svcsdk.Condition, 0, len(cr.Spec.ForProvider.Predicate.Conditions))

for _, crCondition := range cr.Spec.ForProvider.Predicate.Conditions {
inputCondition := &svcsdk.Condition{}
if crCondition.CrawlState != nil {
inputCondition.CrawlState = crCondition.CrawlState
}
if crCondition.CrawlerName != nil {
inputCondition.CrawlerName = crCondition.CrawlerName
}
if crCondition.JobName != nil {
inputCondition.JobName = crCondition.JobName
}
if crCondition.LogicalOperator != nil {
inputCondition.LogicalOperator = crCondition.LogicalOperator
}
if crCondition.State != nil {
inputCondition.State = crCondition.State
}
triggerUpdate.Predicate.Conditions = append(triggerUpdate.Predicate.Conditions, inputCondition)
}
}
if cr.Spec.ForProvider.Predicate.Logical != nil {
predicate.Logical = cr.Spec.ForProvider.Predicate.Logical
triggerUpdate.Predicate.Logical = cr.Spec.ForProvider.Predicate.Logical
}
}
input.TriggerUpdate.Predicate = &predicate

if cr.Spec.ForProvider.EventBatchingCondition != nil {
if cr.Spec.ForProvider.EventBatchingCondition.BatchSize != nil {
input.TriggerUpdate.EventBatchingCondition.BatchSize = cr.Spec.ForProvider.EventBatchingCondition.BatchSize
triggerUpdate.EventBatchingCondition.BatchSize = cr.Spec.ForProvider.EventBatchingCondition.BatchSize
}
if cr.Spec.ForProvider.EventBatchingCondition.BatchWindow != nil {
input.TriggerUpdate.EventBatchingCondition.BatchWindow = cr.Spec.ForProvider.EventBatchingCondition.BatchWindow
triggerUpdate.EventBatchingCondition.BatchWindow = cr.Spec.ForProvider.EventBatchingCondition.BatchWindow
}
}

var actions []*svcsdk.Action
if cr.Spec.ForProvider.Actions != nil {
for _, action := range cr.Spec.ForProvider.Actions {
notificationProperty := &svcsdk.NotificationProperty{}
if action.NotificationProperty != nil && action.NotificationProperty.NotifyDelayAfter != nil {
notificationProperty.NotifyDelayAfter = action.NotificationProperty.NotifyDelayAfter
triggerUpdate.Actions = make([]*svcsdk.Action, 0, len(cr.Spec.ForProvider.Actions))
for _, crAction := range cr.Spec.ForProvider.Actions {
inputAction := &svcsdk.Action{}
if crAction.NotificationProperty != nil && crAction.NotificationProperty.NotifyDelayAfter != nil {
inputAction.NotificationProperty = &svcsdk.NotificationProperty{NotifyDelayAfter: crAction.NotificationProperty.NotifyDelayAfter}
}
if crAction.Arguments != nil {
inputAction.Arguments = crAction.Arguments
}
if crAction.CrawlerName != nil {
inputAction.CrawlerName = crAction.CrawlerName
}
if crAction.JobName != nil {
inputAction.JobName = crAction.JobName
}
if crAction.SecurityConfiguration != nil {
inputAction.SecurityConfiguration = crAction.SecurityConfiguration
}
actions = append(actions, &svcsdk.Action{
Arguments: action.Arguments,
CrawlerName: action.CrawlerName,
JobName: action.JobName,
NotificationProperty: notificationProperty,
SecurityConfiguration: action.SecurityConfiguration,
})
if crAction.Timeout != nil {
inputAction.Timeout = crAction.Timeout
}
triggerUpdate.Actions = append(triggerUpdate.Actions, inputAction)
}
}
input.TriggerUpdate.Actions = actions

if cr.Spec.ForProvider.Schedule != nil {
input.TriggerUpdate.Schedule = cr.Spec.ForProvider.Schedule
triggerUpdate.Schedule = cr.Spec.ForProvider.Schedule
}

if cr.Spec.ForProvider.Description != nil {
input.TriggerUpdate.Description = cr.Spec.ForProvider.Description
triggerUpdate.Description = cr.Spec.ForProvider.Description
}

input := &svcsdk.UpdateTriggerInput{TriggerUpdate: triggerUpdate}
if err := preUpdate(ctx, cr, input); err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, "pre-update failed")
}
Expand Down Expand Up @@ -204,13 +218,13 @@ func isUpToDate(_ context.Context, cr *svcapitypes.Trigger, resp *svcsdk.GetTrig
if err != nil {
return false, "", err
}
diff = cmp.Diff(&cr.Spec.ForProvider, patch, cmpopts.EquateEmpty(),
diff = cmp.Diff(&svcapitypes.TriggerParameters{}, patch, cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(svcapitypes.TriggerParameters{}, "Region"),
cmpopts.IgnoreFields(svcapitypes.TriggerParameters{}, "Tags"),
cmpopts.IgnoreFields(svcapitypes.TriggerParameters{}, "StartOnCreation"),
)
if diff != "" {
return false, "Found observed difference in glue trigger\n" + diff, nil
return false, "Found observed difference in glue trigger " + diff, nil
}
return true, "", nil
}
Expand All @@ -237,7 +251,7 @@ func createPatch(currentParams *svcapitypes.TriggerParameters, resp *svcsdk.GetT
var actions []*svcapitypes.Action
for _, action := range resp.Trigger.Actions {
notificationProperty := &svcapitypes.NotificationProperty{}
if action.NotificationProperty != nil && action.NotificationProperty.NotifyDelayAfter != nil {
if action.NotificationProperty != nil {
notificationProperty.NotifyDelayAfter = action.NotificationProperty.NotifyDelayAfter
}
actions = append(actions, &svcapitypes.Action{
Expand All @@ -252,12 +266,8 @@ func createPatch(currentParams *svcapitypes.TriggerParameters, resp *svcsdk.GetT
externalConfig.Description = resp.Trigger.Description
eventBatchingCondition := &svcapitypes.EventBatchingCondition{}
if resp.Trigger.EventBatchingCondition != nil {
if resp.Trigger.EventBatchingCondition.BatchSize != nil {
eventBatchingCondition.BatchSize = resp.Trigger.EventBatchingCondition.BatchSize
}
if resp.Trigger.EventBatchingCondition.BatchWindow != nil {
eventBatchingCondition.BatchWindow = resp.Trigger.EventBatchingCondition.BatchWindow
}
eventBatchingCondition.BatchSize = resp.Trigger.EventBatchingCondition.BatchSize
eventBatchingCondition.BatchWindow = resp.Trigger.EventBatchingCondition.BatchWindow
}
externalConfig.EventBatchingCondition = eventBatchingCondition
predicate := &svcapitypes.Predicate{}
Expand All @@ -270,9 +280,7 @@ func createPatch(currentParams *svcapitypes.TriggerParameters, resp *svcsdk.GetT
})
}
}
if resp.Trigger.Predicate.Logical != nil {
predicate.Logical = resp.Trigger.Predicate.Logical
}
predicate.Logical = resp.Trigger.Predicate.Logical
}
externalConfig.Predicate = predicate
externalConfig.TriggerType = resp.Trigger.Type
Expand Down

0 comments on commit 7a19730

Please sign in to comment.