From 5eb060eb45129aa10ee7e17e99853f7130a0b04e Mon Sep 17 00:00:00 2001 From: Max Melentyev Date: Fri, 12 Apr 2024 12:01:02 +0100 Subject: [PATCH] Add monitor for event-based reconciliation Signed-off-by: Max Melentyev --- cmd/provider/main.go | 61 +++++- go.mod | 2 + pkg/controller/aws.go | 121 ++++++------ pkg/controller/ec2/instance/controller.go | 18 +- pkg/controller/ec2/instance/monitor.go | 56 ++++++ pkg/controller/ec2/setup.go | 53 +++--- .../route53/resourcerecordset/controller.go | 18 +- .../route53/resourcerecordset/monitor.go | 56 ++++++ pkg/controller/route53/setup.go | 13 +- pkg/monitor/monitor.go | 47 +++++ pkg/monitor/sqs/message.go | 152 +++++++++++++++ pkg/monitor/sqs/message_test.go | 178 ++++++++++++++++++ pkg/monitor/sqs/monitor.go | 116 ++++++++++++ pkg/utils/cache/cache.go | 39 ++++ pkg/utils/controller/options.go | 120 ++++++++++++ pkg/utils/controller/options_test.go | 68 +++++++ pkg/utils/setup/batch.go | 72 +++++++ pkg/utils/setup/setup_controller.go | 6 +- 18 files changed, 1085 insertions(+), 111 deletions(-) create mode 100644 pkg/controller/ec2/instance/monitor.go create mode 100644 pkg/controller/route53/resourcerecordset/monitor.go create mode 100644 pkg/monitor/monitor.go create mode 100644 pkg/monitor/sqs/message.go create mode 100644 pkg/monitor/sqs/message_test.go create mode 100644 pkg/monitor/sqs/monitor.go create mode 100644 pkg/utils/cache/cache.go create mode 100644 pkg/utils/controller/options.go create mode 100644 pkg/utils/controller/options_test.go create mode 100644 pkg/utils/setup/batch.go diff --git a/cmd/provider/main.go b/cmd/provider/main.go index 27e0a9d74f..1762eb36f3 100644 --- a/cmd/provider/main.go +++ b/cmd/provider/main.go @@ -20,8 +20,11 @@ import ( "context" "os" "path/filepath" + "strings" "time" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sqs" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" xpcontroller "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/feature" @@ -41,9 +44,15 @@ import ( "github.com/crossplane-contrib/provider-aws/apis/v1alpha1" "github.com/crossplane-contrib/provider-aws/pkg/controller" "github.com/crossplane-contrib/provider-aws/pkg/features" + sqsmonitor "github.com/crossplane-contrib/provider-aws/pkg/monitor/sqs" + utilscontroller "github.com/crossplane-contrib/provider-aws/pkg/utils/controller" "github.com/crossplane-contrib/provider-aws/pkg/utils/metrics" ) +// Env prefix for options to configure controllers. +// Example usage: `PROVIDER_AWS_ec2.instance.pollInterval=10m`. +const OPTION_ENV_PREFIX = "PROVIDER_AWS_" + func main() { var ( app = kingpin.New(filepath.Base(os.Args[0]), "AWS support for Crossplane.").DefaultEnvars() @@ -52,6 +61,7 @@ func main() { pollInterval = app.Flag("poll", "Poll interval controls how often an individual resource should be checked for drift.").Default("1m").Duration() leaderElection = app.Flag("leader-election", "Use leader election for the conroller manager.").Short('l').Default("false").OverrideDefaultFromEnvar("LEADER_ELECTION").Bool() maxReconcileRate = app.Flag("max-reconcile-rate", "The global maximum rate per second at which resources may checked for drift from the desired state.").Default("10").Int() + eventsSqsUrl = app.Flag("events-sqs-url", "SQS queue with AWS events").Default("").String() namespace = app.Flag("namespace", "Namespace used to set as default scope in default secret store config.").Default("crossplane-system").Envar("POD_NAMESPACE").String() enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool() @@ -94,13 +104,13 @@ func main() { kingpin.FatalIfError(err, "Cannot create controller manager") kingpin.FatalIfError(apis.AddToScheme(mgr.GetScheme()), "Cannot add AWS APIs to scheme") - o := xpcontroller.Options{ + o := utilscontroller.Options{Options: xpcontroller.Options{ Logger: log, MaxConcurrentReconciles: *maxReconcileRate, PollInterval: *pollInterval, GlobalRateLimiter: ratelimiter.NewGlobal(*maxReconcileRate), Features: &feature.Flags{}, - } + }} if *enableExternalSecretStores { o.Features.Enable(features.EnableAlphaExternalSecretStores) @@ -126,10 +136,23 @@ func main() { log.Info("Alpha feature enabled", "flag", features.EnableAlphaManagementPolicies) } - kingpin.FatalIfError(metrics.SetupMetrics(), "Cannot setup AWS metrics hook") - kingpin.FatalIfError(controller.Setup(mgr, o), "Cannot setup AWS controllers") - kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager") + if *eventsSqsUrl != "" { + o.Monitor = newSqsMonitor(*eventsSqsUrl, log) + } + optionsWithOverrides := utilscontroller.NewOptionsSet(o) + kingpin.FatalIfError(optionsWithOverrides.AddOverrides(optionsOverridesFromEnv()), "Cannot add overrides") + + ctx := ctrl.SetupSignalHandler() + + kingpin.FatalIfError(metrics.SetupMetrics(), "Cannot setup AWS metrics hook") + kingpin.FatalIfError(controller.Setup(mgr, optionsWithOverrides), "Cannot setup AWS controllers") + // Must be added after controllers so that received messages are processed by the controllers + if o.Monitor != nil { + kingpin.FatalIfError(mgr.Add(o.Monitor), "Cannot add monitor to manager") + kingpin.FatalIfError(o.Monitor.Prepare(ctx), "Monitor.Prepare() failed") + } + kingpin.FatalIfError(mgr.Start(ctx), "Cannot start controller manager") } // UseISO8601 sets the logger to use ISO8601 timestamp format @@ -138,3 +161,31 @@ func UseISO8601() zap.Opts { o.TimeEncoder = zapcore.ISO8601TimeEncoder } } + +// Collects all env variables with the prefix OPTION_ENV_PREFIX and returns them as a map +// with the prefix removed. +func optionsOverridesFromEnv() map[string]string { + result := make(map[string]string) + for _, str := range os.Environ() { + if rest, ok := strings.CutPrefix(str, OPTION_ENV_PREFIX); ok { + parts := strings.SplitN(rest, "=", 2) + if len(parts) == 2 { + result[parts[0]] = parts[1] + } + } + } + return result +} + +func newSqsMonitor(url string, logger logging.Logger) *sqsmonitor.Monitor { + awsCfg, err := config.LoadDefaultConfig(context.Background()) + kingpin.FatalIfError(err, "Cannot load AWS config") + return sqsmonitor.NewMonitor( + awsCfg, + sqs.ReceiveMessageInput{ + QueueUrl: &url, + WaitTimeSeconds: 20, + }, + logger, + ) +} diff --git a/go.mod b/go.mod index 5af2b9d7fb..314dcca5ca 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/onsi/gomega v1.27.10 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 + github.com/stretchr/testify v1.8.2 go.uber.org/zap v1.26.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 k8s.io/api v0.28.3 @@ -108,6 +109,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/operator-framework/api v0.6.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect diff --git a/pkg/controller/aws.go b/pkg/controller/aws.go index 8451a56f23..6bc2354759 100644 --- a/pkg/controller/aws.go +++ b/pkg/controller/aws.go @@ -17,7 +17,6 @@ limitations under the License. package controller import ( - "github.com/crossplane/crossplane-runtime/pkg/controller" ctrl "sigs.k8s.io/controller-runtime" "github.com/crossplane-contrib/provider-aws/pkg/controller/acm" @@ -76,69 +75,69 @@ import ( "github.com/crossplane-contrib/provider-aws/pkg/controller/sns" "github.com/crossplane-contrib/provider-aws/pkg/controller/sqs" "github.com/crossplane-contrib/provider-aws/pkg/controller/transfer" + "github.com/crossplane-contrib/provider-aws/pkg/utils/controller" "github.com/crossplane-contrib/provider-aws/pkg/utils/setup" ) // Setup creates all AWS controllers with the supplied logger and adds them to // the supplied manager. -func Setup(mgr ctrl.Manager, o controller.Options) error { - return setup.SetupControllers( - mgr, o, - acm.Setup, - acmpca.Setup, - apigateway.Setup, - apigatewayv2.Setup, - athena.Setup, - autoscaling.Setup, - batch.Setup, - cache.Setup, - cloudfront.Setup, - cloudsearch.Setup, - cloudwatchlogs.Setup, - cognitoidentity.Setup, - cognitoidentityprovider.Setup, - config.Setup, - database.Setup, - dax.Setup, - docdb.Setup, - dynamodb.Setup, - ec2.Setup, - ecr.Setup, - ecs.Setup, - efs.Setup, - eks.Setup, - elasticache.Setup, - elasticloadbalancing.Setup, - elbv2.Setup, - emrcontainers.Setup, - firehose.Setup, - glue.Setup, - globalaccelerator.Setup, - iam.Setup, - iot.Setup, - kafka.Setup, - kinesis.Setup, - kms.Setup, - lambda.Setup, - mq.Setup, - mwaa.Setup, - neptune.Setup, - opensearchservice.Setup, - prometheusservice.Setup, - ram.Setup, - rds.Setup, - redshift.Setup, - route53.Setup, - route53resolver.Setup, - s3.Setup, - s3control.Setup, - secretsmanager.Setup, - servicecatalog.Setup, - servicediscovery.Setup, - sesv2.Setup, - sfn.Setup, - sns.Setup, - sqs.Setup, - transfer.Setup, - ) +func Setup(mgr ctrl.Manager, o controller.OptionsSet) error { + b := setup.NewBatch(mgr, o, "") + b.AddProxyXp(acm.Setup) + b.AddProxyXp(acmpca.Setup) + b.AddProxyXp(apigateway.Setup) + b.AddProxyXp(apigatewayv2.Setup) + b.AddProxyXp(athena.Setup) + b.AddProxyXp(autoscaling.Setup) + b.AddProxyXp(batch.Setup) + b.AddProxyXp(cache.Setup) + b.AddProxyXp(cloudfront.Setup) + b.AddProxyXp(cloudsearch.Setup) + b.AddProxyXp(cloudwatchlogs.Setup) + b.AddProxyXp(cognitoidentity.Setup) + b.AddProxyXp(cognitoidentityprovider.Setup) + b.AddProxyXp(config.Setup) + b.AddProxyXp(database.Setup) + b.AddProxyXp(dax.Setup) + b.AddProxyXp(docdb.Setup) + b.AddProxyXp(dynamodb.Setup) + b.AddProxy(ec2.Setup) + b.AddProxyXp(ecr.Setup) + b.AddProxyXp(ecs.Setup) + b.AddProxyXp(efs.Setup) + b.AddProxyXp(eks.Setup) + b.AddProxyXp(elasticache.Setup) + b.AddProxyXp(elasticloadbalancing.Setup) + b.AddProxyXp(elbv2.Setup) + b.AddProxyXp(emrcontainers.Setup) + b.AddProxyXp(firehose.Setup) + b.AddProxyXp(glue.Setup) + b.AddProxyXp(globalaccelerator.Setup) + b.AddProxyXp(iam.Setup) + b.AddProxyXp(iot.Setup) + b.AddProxyXp(kafka.Setup) + b.AddProxyXp(kinesis.Setup) + b.AddProxyXp(kms.Setup) + b.AddProxyXp(lambda.Setup) + b.AddProxyXp(mq.Setup) + b.AddProxyXp(mwaa.Setup) + b.AddProxyXp(neptune.Setup) + b.AddProxyXp(opensearchservice.Setup) + b.AddProxyXp(prometheusservice.Setup) + b.AddProxyXp(ram.Setup) + b.AddProxyXp(rds.Setup) + b.AddProxyXp(redshift.Setup) + b.AddProxy(route53.Setup) + b.AddProxyXp(route53resolver.Setup) + b.AddProxyXp(s3.Setup) + b.AddProxyXp(s3control.Setup) + b.AddProxyXp(secretsmanager.Setup) + b.AddProxyXp(servicecatalog.Setup) + b.AddProxyXp(servicediscovery.Setup) + b.AddProxyXp(sesv2.Setup) + b.AddProxyXp(sfn.Setup) + b.AddProxyXp(sns.Setup) + b.AddProxyXp(sqs.Setup) + b.AddProxyXp(transfer.Setup) + return b.Run() } diff --git a/pkg/controller/ec2/instance/controller.go b/pkg/controller/ec2/instance/controller.go index e5d5a78526..d0909f1aa7 100644 --- a/pkg/controller/ec2/instance/controller.go +++ b/pkg/controller/ec2/instance/controller.go @@ -24,7 +24,6 @@ import ( types "github.com/aws/aws-sdk-go-v2/service/ec2/types" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/connection" - "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" @@ -34,12 +33,14 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" svcapitypes "github.com/crossplane-contrib/provider-aws/apis/ec2/manualv1alpha1" "github.com/crossplane-contrib/provider-aws/apis/v1alpha1" "github.com/crossplane-contrib/provider-aws/pkg/clients/ec2" "github.com/crossplane-contrib/provider-aws/pkg/features" connectaws "github.com/crossplane-contrib/provider-aws/pkg/utils/connect/aws" + "github.com/crossplane-contrib/provider-aws/pkg/utils/controller" errorutils "github.com/crossplane-contrib/provider-aws/pkg/utils/errors" "github.com/crossplane-contrib/provider-aws/pkg/utils/pointer" custommanaged "github.com/crossplane-contrib/provider-aws/pkg/utils/reconciler/managed" @@ -79,6 +80,10 @@ func SetupInstance(mgr ctrl.Manager, o controller.Options) error { managed.WithConnectionPublishers(cps...), } + if o.PollIntervalJitter != 0 { + reconcilerOpts = append(reconcilerOpts, managed.WithPollJitterHook(o.PollIntervalJitter)) + } + if o.Features.Enabled(features.EnableAlphaManagementPolicies) { reconcilerOpts = append(reconcilerOpts, managed.WithManagementPolicies()) } @@ -87,12 +92,17 @@ func SetupInstance(mgr ctrl.Manager, o controller.Options) error { resource.ManagedKind(svcapitypes.InstanceGroupVersionKind), reconcilerOpts...) - return ctrl.NewControllerManagedBy(mgr). + bldr := ctrl.NewControllerManagedBy(mgr). Named(name). WithOptions(o.ForControllerRuntime()). WithEventFilter(resource.DesiredStateChanged()). - For(&svcapitypes.Instance{}). - Complete(r) + For(&svcapitypes.Instance{}) + + if o.Monitor != nil { + bldr = bldr.WatchesRawSource(eventsFromMonitor(mgr, o.Monitor), &handler.EnqueueRequestForObject{}) + } + + return bldr.Complete(r) } type connector struct { diff --git a/pkg/controller/ec2/instance/monitor.go b/pkg/controller/ec2/instance/monitor.go new file mode 100644 index 0000000000..70d36e495f --- /dev/null +++ b/pkg/controller/ec2/instance/monitor.go @@ -0,0 +1,56 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instance + +import ( + "context" + "fmt" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/source" + + svcapitypes "github.com/crossplane-contrib/provider-aws/apis/ec2/manualv1alpha1" + "github.com/crossplane-contrib/provider-aws/pkg/monitor" + "github.com/crossplane-contrib/provider-aws/pkg/utils/cache" +) + +func eventsFromMonitor(mgr ctrl.Manager, mnt monitor.Monitor) source.Source { + mnt.AddPrepareHook(func(ctx context.Context) error { + if err := cache.IndexByExternalName(ctx, mgr.GetCache(), &svcapitypes.Instance{}); err != nil { + return fmt.Errorf("setting up indexer: %w", err) + } + return nil + }) + + eventsChannel := make(chan event.GenericEvent) + mnt.AddSubscriber(func(ctx context.Context, evt monitor.Event) { + if evt.GVK != svcapitypes.InstanceGroupVersionKind { + return + } + var list svcapitypes.InstanceList + if err := cache.ListByExternalName(ctx, mgr.GetCache(), &list, evt.ExternalName); err != nil { + mgr.GetLogger().Error(err, "failed to list objects", "externalName", evt.ExternalName) + return + } + for _, object := range list.Items { + object := object + eventsChannel <- event.GenericEvent{Object: &object} + } + }) + return &source.Channel{Source: eventsChannel} +} diff --git a/pkg/controller/ec2/setup.go b/pkg/controller/ec2/setup.go index 2154e32b97..9714e08dd8 100644 --- a/pkg/controller/ec2/setup.go +++ b/pkg/controller/ec2/setup.go @@ -17,7 +17,6 @@ limitations under the License. package ec2 import ( - "github.com/crossplane/crossplane-runtime/pkg/controller" ctrl "sigs.k8s.io/controller-runtime" "github.com/crossplane-contrib/provider-aws/pkg/controller/ec2/address" @@ -42,34 +41,34 @@ import ( "github.com/crossplane-contrib/provider-aws/pkg/controller/ec2/vpcendpoint" "github.com/crossplane-contrib/provider-aws/pkg/controller/ec2/vpcendpointserviceconfiguration" "github.com/crossplane-contrib/provider-aws/pkg/controller/ec2/vpcpeeringconnection" + "github.com/crossplane-contrib/provider-aws/pkg/utils/controller" "github.com/crossplane-contrib/provider-aws/pkg/utils/setup" ) // Setup ec2 controllers. -func Setup(mgr ctrl.Manager, o controller.Options) error { - return setup.SetupControllers( - mgr, o, - address.SetupAddress, - flowlog.SetupFlowLog, - instance.SetupInstance, - internetgateway.SetupInternetGateway, - launchtemplate.SetupLaunchTemplate, - launchtemplateversion.SetupLaunchTemplateVersion, - natgateway.SetupNatGateway, - route.SetupRoute, - routetable.SetupRouteTable, - securitygroup.SetupSecurityGroup, - securitygrouprule.SetupSecurityGroupRule, - subnet.SetupSubnet, - transitgateway.SetupTransitGateway, - transitgatewayroute.SetupTransitGatewayRoute, - transitgatewayroutetable.SetupTransitGatewayRouteTable, - transitgatewayvpcattachment.SetupTransitGatewayVPCAttachment, - volume.SetupVolume, - vpc.SetupVPC, - vpccidrblock.SetupVPCCIDRBlock, - vpcendpoint.SetupVPCEndpoint, - vpcendpointserviceconfiguration.SetupVPCEndpointServiceConfiguration, - vpcpeeringconnection.SetupVPCPeeringConnection, - ) +func Setup(mgr ctrl.Manager, o controller.OptionsSet) error { + batch := setup.NewBatch(mgr, o, "ec2") + batch.AddXp("address", address.SetupAddress) + batch.AddXp("flowlog", flowlog.SetupFlowLog) + batch.Add("instance", instance.SetupInstance) + batch.AddXp("internetgateway", internetgateway.SetupInternetGateway) + batch.AddXp("launchtemplate", launchtemplate.SetupLaunchTemplate) + batch.AddXp("launchtemplateversion", launchtemplateversion.SetupLaunchTemplateVersion) + batch.AddXp("natgateway", natgateway.SetupNatGateway) + batch.AddXp("route", route.SetupRoute) + batch.AddXp("routetable", routetable.SetupRouteTable) + batch.AddXp("securitygroup", securitygroup.SetupSecurityGroup) + batch.AddXp("securitygrouprule", securitygrouprule.SetupSecurityGroupRule) + batch.AddXp("subnet", subnet.SetupSubnet) + batch.AddXp("transitgateway", transitgateway.SetupTransitGateway) + batch.AddXp("transitgatewayroute", transitgatewayroute.SetupTransitGatewayRoute) + batch.AddXp("transitgatewayroutetable", transitgatewayroutetable.SetupTransitGatewayRouteTable) + batch.AddXp("transitgatewayvpcattachment", transitgatewayvpcattachment.SetupTransitGatewayVPCAttachment) + batch.AddXp("volume", volume.SetupVolume) + batch.AddXp("vpc", vpc.SetupVPC) + batch.AddXp("vpccidrblock", vpccidrblock.SetupVPCCIDRBlock) + batch.AddXp("vpcendpoint", vpcendpoint.SetupVPCEndpoint) + batch.AddXp("vpcendpointserviceconfiguration", vpcendpointserviceconfiguration.SetupVPCEndpointServiceConfiguration) + batch.AddXp("vpcpeeringconnection", vpcpeeringconnection.SetupVPCPeeringConnection) + return batch.Run() } diff --git a/pkg/controller/route53/resourcerecordset/controller.go b/pkg/controller/route53/resourcerecordset/controller.go index 15f533e589..488667ccad 100644 --- a/pkg/controller/route53/resourcerecordset/controller.go +++ b/pkg/controller/route53/resourcerecordset/controller.go @@ -23,7 +23,6 @@ import ( route53types "github.com/aws/aws-sdk-go-v2/service/route53/types" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/connection" - "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" @@ -32,12 +31,14 @@ import ( "github.com/pkg/errors" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" route53v1alpha1 "github.com/crossplane-contrib/provider-aws/apis/route53/v1alpha1" "github.com/crossplane-contrib/provider-aws/apis/v1alpha1" "github.com/crossplane-contrib/provider-aws/pkg/clients/resourcerecordset" "github.com/crossplane-contrib/provider-aws/pkg/features" connectaws "github.com/crossplane-contrib/provider-aws/pkg/utils/connect/aws" + "github.com/crossplane-contrib/provider-aws/pkg/utils/controller" errorutils "github.com/crossplane-contrib/provider-aws/pkg/utils/errors" custommanaged "github.com/crossplane-contrib/provider-aws/pkg/utils/reconciler/managed" ) @@ -72,6 +73,10 @@ func SetupResourceRecordSet(mgr ctrl.Manager, o controller.Options) error { managed.WithConnectionPublishers(cps...), } + if o.PollIntervalJitter != 0 { + reconcilerOpts = append(reconcilerOpts, managed.WithPollJitterHook(o.PollIntervalJitter)) + } + if o.Features.Enabled(features.EnableAlphaManagementPolicies) { reconcilerOpts = append(reconcilerOpts, managed.WithManagementPolicies()) } @@ -80,12 +85,17 @@ func SetupResourceRecordSet(mgr ctrl.Manager, o controller.Options) error { resource.ManagedKind(route53v1alpha1.ResourceRecordSetGroupVersionKind), reconcilerOpts...) - return ctrl.NewControllerManagedBy(mgr). + bldr := ctrl.NewControllerManagedBy(mgr). Named(name). WithOptions(o.ForControllerRuntime()). WithEventFilter(resource.DesiredStateChanged()). - For(&route53v1alpha1.ResourceRecordSet{}). - Complete(r) + For(&route53v1alpha1.ResourceRecordSet{}) + + if o.Monitor != nil { + bldr = bldr.WatchesRawSource(eventsFromMonitor(mgr, o.Monitor), &handler.EnqueueRequestForObject{}) + } + + return bldr.Complete(r) } type connector struct { diff --git a/pkg/controller/route53/resourcerecordset/monitor.go b/pkg/controller/route53/resourcerecordset/monitor.go new file mode 100644 index 0000000000..31f2432601 --- /dev/null +++ b/pkg/controller/route53/resourcerecordset/monitor.go @@ -0,0 +1,56 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcerecordset + +import ( + "context" + "fmt" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/source" + + route53v1alpha1 "github.com/crossplane-contrib/provider-aws/apis/route53/v1alpha1" + "github.com/crossplane-contrib/provider-aws/pkg/monitor" + "github.com/crossplane-contrib/provider-aws/pkg/utils/cache" +) + +func eventsFromMonitor(mgr ctrl.Manager, mnt monitor.Monitor) source.Source { + mnt.AddPrepareHook(func(ctx context.Context) error { + if err := cache.IndexByExternalName(ctx, mgr.GetCache(), &route53v1alpha1.ResourceRecordSet{}); err != nil { + return fmt.Errorf("setting up indexer: %w", err) + } + return nil + }) + + eventsChannel := make(chan event.GenericEvent, 10) + mnt.AddSubscriber(func(ctx context.Context, evt monitor.Event) { + if evt.GVK != route53v1alpha1.ResourceRecordSetGroupVersionKind { + return + } + var list route53v1alpha1.ResourceRecordSetList + if err := cache.ListByExternalName(ctx, mgr.GetCache(), &list, evt.ExternalName); err != nil { + mgr.GetLogger().Error(err, "failed to list objects", "externalName", evt.ExternalName) + return + } + for _, object := range list.Items { + object := object + eventsChannel <- event.GenericEvent{Object: &object} + } + }) + return &source.Channel{Source: eventsChannel} +} diff --git a/pkg/controller/route53/setup.go b/pkg/controller/route53/setup.go index 34183b0b08..63318e5f5e 100644 --- a/pkg/controller/route53/setup.go +++ b/pkg/controller/route53/setup.go @@ -17,19 +17,18 @@ limitations under the License. package route53 import ( - "github.com/crossplane/crossplane-runtime/pkg/controller" ctrl "sigs.k8s.io/controller-runtime" "github.com/crossplane-contrib/provider-aws/pkg/controller/route53/hostedzone" "github.com/crossplane-contrib/provider-aws/pkg/controller/route53/resourcerecordset" + "github.com/crossplane-contrib/provider-aws/pkg/utils/controller" "github.com/crossplane-contrib/provider-aws/pkg/utils/setup" ) // Setup route53 controllers. -func Setup(mgr ctrl.Manager, o controller.Options) error { - return setup.SetupControllers( - mgr, o, - hostedzone.SetupHostedZone, - resourcerecordset.SetupResourceRecordSet, - ) +func Setup(mgr ctrl.Manager, o controller.OptionsSet) error { + batch := setup.NewBatch(mgr, o, "route53") + batch.AddXp("hostedzone", hostedzone.SetupHostedZone) + batch.Add("resourcerecordset", resourcerecordset.SetupResourceRecordSet) + return batch.Run() } diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go new file mode 100644 index 0000000000..20221cc8ff --- /dev/null +++ b/pkg/monitor/monitor.go @@ -0,0 +1,47 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitor + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// Monitor monitors resources and notifies subscribers when resources are changed. +type Monitor interface { + manager.Runnable + // AddSubscriber adds a subscriber to the monitor. + AddSubscriber(Subscriber) + // Prepare prepares the monitor for running, and runs registered prepare hooks. + Prepare(context.Context) error + // AddPrepareHook adds a prepare hook to the monitor. + // Hooks can be used to setup field indexers with correct context, + // as they cannot be configured with existing controller-runtime APIs: + // controller cannot register a code to be run before event sources are initialized. + AddPrepareHook(PrepareHook) +} + +type Subscriber func(context.Context, Event) +type PrepareHook func(context.Context) error + +// Event represents an external resource event. +type Event struct { + GVK schema.GroupVersionKind + ExternalName string +} diff --git a/pkg/monitor/sqs/message.go b/pkg/monitor/sqs/message.go new file mode 100644 index 0000000000..866f2558c4 --- /dev/null +++ b/pkg/monitor/sqs/message.go @@ -0,0 +1,152 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqs + +import ( + "encoding/json" + "strings" + + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + + ec2manualv1alpha1 "github.com/crossplane-contrib/provider-aws/apis/ec2/manualv1alpha1" + route53v1alpha1 "github.com/crossplane-contrib/provider-aws/apis/route53/v1alpha1" + "github.com/crossplane-contrib/provider-aws/pkg/monitor" +) + +func parseEventsFromMessage(message types.Message) ([]monitor.Event, error) { + if message.Body == nil { + return nil, nil + } + msg := map[string]any{} + if err := json.Unmarshal([]byte(*message.Body), &msg); err != nil { + return nil, err + } + return eventsFromMessage(msg), nil +} + +func eventsFromMessage(msg map[string]any) []monitor.Event { //nolint:gocyclo + events := []monitor.Event{} + + if resources, ok := dig(msg, "resources").([]any); ok && len(resources) > 0 { + for _, resource := range resources { + if arn, ok := resource.(string); ok { + if event := eventFromARN(arn); (event != monitor.Event{}) { + events = append(events, event) + } + } + } + // If resources are present, we don't need to parse the rest of the message + return events + } + + // EC2 API call that supports different resource types, like tagging + if resourcesSet, ok := dig(msg, "detail", "requestParameters", "resourcesSet", "items").([]any); ok { + for _, item := range resourcesSet { + if resourceId, ok := dig(item, "resourceId").(string); ok { + if strings.HasPrefix(resourceId, "i-") { + events = append(events, monitor.Event{ + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: resourceId, + }) + } + } + } + } + + // EC2 API call for multiple instances + if instancesSet, ok := dig(msg, "detail", "requestParameters", "instancesSet", "items").([]any); ok { + for _, item := range instancesSet { + if instanceId, ok := dig(item, "instanceId").(string); ok { + events = append(events, monitor.Event{ + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: instanceId, + }) + } + } + } + + // EC2 API call for a single instance + if instanceId, ok := dig(msg, "detail", "requestParameters", "instanceId").(string); ok { + events = append(events, monitor.Event{ + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: instanceId, + }) + } + + // EC2 Instance State-change Notification + if instanceId, ok := dig(msg, "detail", "instance-id").(string); ok { + events = append(events, monitor.Event{ + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: instanceId, + }) + } + + // Route53 API call + if changes, ok := dig(msg, "detail", "requestParameters", "changeBatch", "changes").([]any); ok { + for _, item := range changes { + if name, ok := dig(item, "resourceRecordSet", "name").(string); ok { + events = append(events, monitor.Event{ + GVK: route53v1alpha1.ResourceRecordSetGroupVersionKind, + ExternalName: name, + }) + } + } + } + + return events +} + +// dig is a helper function to return a nested value from a map following the provided path. +func dig(obj any, parts ...string) any { + for _, part := range parts { + if strMap, ok := obj.(map[string]any); !ok { + return nil + } else { + obj = strMap[part] + } + } + return obj +} + +// Parses ARN and returns Event for the resource. +// ARN example: "arn:aws:ec2:us-east-1:123456789012:instance/i-abcd1111". +func eventFromARN(arn string) monitor.Event { + parts := strings.Split(arn, ":") + if len(parts) < 6 { + return monitor.Event{} + } + resourceParts := strings.Split(parts[5], "/") + if len(resourceParts) < 2 { + return monitor.Event{} + } + service := parts[2] + resourceType := resourceParts[0] + name := resourceParts[1] + + if service == "ec2" && resourceType == "instance" { + return monitor.Event{ + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: name, + } + } else if service == "route53" && resourceType == "resourceRecordSet" { + return monitor.Event{ + GVK: route53v1alpha1.ResourceRecordSetGroupVersionKind, + ExternalName: name, + } + } + return monitor.Event{} +} diff --git a/pkg/monitor/sqs/message_test.go b/pkg/monitor/sqs/message_test.go new file mode 100644 index 0000000000..42595fa2be --- /dev/null +++ b/pkg/monitor/sqs/message_test.go @@ -0,0 +1,178 @@ +package sqs + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + ec2manualv1alpha1 "github.com/crossplane-contrib/provider-aws/apis/ec2/manualv1alpha1" + route53v1alpha1 "github.com/crossplane-contrib/provider-aws/apis/route53/v1alpha1" + "github.com/crossplane-contrib/provider-aws/pkg/monitor" +) + +type j = map[string]any + +func Test_EventMessage_events(t *testing.T) { + jsonStr := `{ + "version": "0", + "detail": { + "resultCode":1, + "requestParameters": { + "instanceIds": ["i-1234"] + } + } + }` + msg := j{} + require.NoError(t, json.Unmarshal([]byte(jsonStr), &msg)) + assert.Equal(t, "0", dig(msg, "version").(string)) + assert.InEpsilon(t, 1.0, dig(msg, "detail", "resultCode").(float64), 0.001) + assert.Equal(t, []any{"i-1234"}, dig(msg, "detail", "requestParameters", "instanceIds").([]any)) +} + +func eventsFromJson(t *testing.T, str string) []monitor.Event { + msg := j{} + require.NoError(t, json.Unmarshal([]byte(str), &msg)) + return eventsFromMessage(msg) +} + +func Test_eventsFromMessage_instanceStateChange(t *testing.T) { + events := eventsFromJson(t, `{ + "detail-type": "EC2 Instance State-change Notification", + "detail": { + "instance-id": "i-1234" + } + }`) + assert.Equal(t, []monitor.Event{ + { + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: "i-1234", + }, + }, events) + + // Don't produce duplicates when both resources and detail.instance-id are present + events = eventsFromJson(t, `{ + "detail-type": "EC2 Instance State-change Notification", + "resources": [ + "arn:aws:ec2:us-east-1:0123456789ab:instance/i-1234" + ], + "detail": { + "instance-id": "i-1234" + } + }`) + assert.Equal(t, []monitor.Event{ + { + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: "i-1234", + }, + }, events) +} + +func Test_eventsFromMessage_instanceApiCall(t *testing.T) { + events := eventsFromJson(t, `{ + "detail-type": "AWS API Call via CloudTrail", + "detail": { + "requestParameters": { + "instanceId": "i-1234" + } + } + }`) + assert.Equal(t, []monitor.Event{ + { + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: "i-1234", + }, + }, events) +} + +func Test_eventsFromMessage_instancesApiCall(t *testing.T) { + events := eventsFromJson(t, `{ + "detail-type": "AWS API Call via CloudTrail", + "detail": { + "requestParameters": { + "instancesSet": { + "items": [ + {"instanceId": "i-1234"}, + {"instanceId": "i-5678"} + ] + } + } + } + }`) + assert.Equal(t, []monitor.Event{ + { + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: "i-1234", + }, + { + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: "i-5678", + }, + }, events) +} + +func Test_eventsFromMessage_ec2ApiCall(t *testing.T) { + events := eventsFromJson(t, `{ + "detail-type": "AWS API Call via CloudTrail", + "detail": { + "requestParameters": { + "resourcesSet": { + "items": [ + {"resourceId": "i-1234"}, + {"resourceId": "sg-5678"} + ] + } + } + } + }`) + assert.Equal(t, []monitor.Event{ + { + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: "i-1234", + }, + }, events) +} + +func Test_eventsFromMessage_ebsNotification(t *testing.T) { + events := eventsFromJson(t, `{ + "detail-type": "EBS Volume Notification", + "resources": [ + "arn:aws:ec2:us-east-1:0123456789ab:volume/vol-01234567", + "arn:aws:kms:us-east-1:0123456789ab:key/01234567-0123-0123-0123-0123456789ab", + "arn:aws:ec2:us-east-1:0123456789ab:instance/i-1234" + ] + }`) + assert.Equal(t, []monitor.Event{ + { + GVK: ec2manualv1alpha1.InstanceGroupVersionKind, + ExternalName: "i-1234", + }, + }, events) +} + +func Test_eventsFromMessage_route53ApiCall(t *testing.T) { + events := eventsFromJson(t, `{ + "detail-type": "AWS API Call via CloudTrail", + "detail": { + "requestParameters": { + "changeBatch": { + "changes": [ + {"resourceRecordSet": {"name": "test1.com"}}, + {"resourceRecordSet": {"name": "test2.com"}} + ] + } + } + } + }`) + assert.Equal(t, []monitor.Event{ + { + GVK: route53v1alpha1.ResourceRecordSetGroupVersionKind, + ExternalName: "test1.com", + }, + { + GVK: route53v1alpha1.ResourceRecordSetGroupVersionKind, + ExternalName: "test2.com", + }, + }, events) +} diff --git a/pkg/monitor/sqs/monitor.go b/pkg/monitor/sqs/monitor.go new file mode 100644 index 0000000000..0c121c89a8 --- /dev/null +++ b/pkg/monitor/sqs/monitor.go @@ -0,0 +1,116 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqs + +import ( + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + smithytime "github.com/aws/smithy-go/time" + "github.com/crossplane/crossplane-runtime/pkg/logging" + + "github.com/crossplane-contrib/provider-aws/pkg/monitor" +) + +type Monitor struct { + client *sqs.Client + receiveOptions sqs.ReceiveMessageInput + logger logging.Logger + subscribers []monitor.Subscriber + prepareHooks []monitor.PrepareHook +} + +var _ monitor.Monitor = &Monitor{} + +func NewMonitor( + config aws.Config, + receiveOptions sqs.ReceiveMessageInput, + logger logging.Logger, +) *Monitor { + return &Monitor{ + client: sqs.NewFromConfig(config), + receiveOptions: receiveOptions, + logger: logger.WithValues("monitor", "SQSMonitor"), + } +} + +// AddSubscriber implements monitor.Monitor. +func (monitor *Monitor) AddSubscriber(subscriber monitor.Subscriber) { + monitor.subscribers = append(monitor.subscribers, subscriber) +} + +// AddPrepareHook implements monitor.Monitor. +func (monitor *Monitor) AddPrepareHook(hook monitor.PrepareHook) { + monitor.prepareHooks = append(monitor.prepareHooks, hook) +} + +// Prepare implements monitor.Monitor. +func (monitor *Monitor) Prepare(ctx context.Context) error { + for i, hook := range monitor.prepareHooks { + if err := hook(ctx); err != nil { + return fmt.Errorf("running hook %d: %w", i, err) + } + } + return nil +} + +// Start implements monitor.Monitor. +func (monitor *Monitor) Start(ctx context.Context) error { + for ctx.Err() == nil { + // SQS supports long polling and timeout is configured with receiveOptions.WaitTimeSeconds + response, err := monitor.client.ReceiveMessage(ctx, &monitor.receiveOptions) + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } else if err != nil { + monitor.logger.Info("failed to receive message, pausing for 5s", "error", err) + _ = smithytime.SleepWithContext(ctx, 5*time.Second) + continue + } + // monitor.logger.Debug("received messages", "count", len(response.Messages)) + for _, message := range response.Messages { + if err := monitor.publish(ctx, message); err != nil { + monitor.logger.Info("failed to publish message", "error", err) + } + if _, err := monitor.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{ + QueueUrl: monitor.receiveOptions.QueueUrl, + ReceiptHandle: message.ReceiptHandle, + }); err != nil { + monitor.logger.Info("failed to delete message", "error", err) + } + } + } + return nil +} + +func (monitor *Monitor) publish(ctx context.Context, message types.Message) error { + // monitor.logger.Debug("parsing events", "body", message.Body) + events, err := parseEventsFromMessage(message) + if err != nil { + return err + } + for _, event := range events { + // monitor.logger.Debug("publishing event", "event", event) + for _, subscriber := range monitor.subscribers { + subscriber(ctx, event) + } + } + return nil +} diff --git a/pkg/utils/cache/cache.go b/pkg/utils/cache/cache.go new file mode 100644 index 0000000000..6959f7dcf6 --- /dev/null +++ b/pkg/utils/cache/cache.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + + "github.com/crossplane/crossplane-runtime/pkg/meta" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const EXTERNAL_NAME_FIELD = "externalName" + +// IndexByExternalName setups index by extrenal name in the cache for a given resource type. +func IndexByExternalName(ctx context.Context, cache cache.Cache, object client.Object) error { + return cache.IndexField(ctx, object, EXTERNAL_NAME_FIELD, func(o client.Object) []string { + return []string{meta.GetExternalName(o)} + }) +} + +// ListByExternalName lists objects in the cache by external name. +func ListByExternalName(ctx context.Context, cache cache.Cache, objects client.ObjectList, value string) error { + return cache.List(ctx, objects, client.MatchingFields{EXTERNAL_NAME_FIELD: value}) +} diff --git a/pkg/utils/controller/options.go b/pkg/utils/controller/options.go new file mode 100644 index 0000000000..15ffeea61c --- /dev/null +++ b/pkg/utils/controller/options.go @@ -0,0 +1,120 @@ +package controller + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/crossplane/crossplane-runtime/pkg/controller" + + "github.com/crossplane-contrib/provider-aws/pkg/monitor" +) + +// OptionsSet allows to override Options for specific controllers. +type OptionsSet struct { + defaultOptions Options + specific map[string]OptionsOverride +} + +type Options struct { + controller.Options + PollIntervalJitter time.Duration + Monitor monitor.Monitor +} + +// OptionsOverride allows to override specific Options properties. +type OptionsOverride struct { + PollInterval *time.Duration + PollIntervalJitter *time.Duration + MaxConcurrentReconciles *int +} + +func (override OptionsOverride) applyTo(options *Options) { + if override.PollInterval != nil { + options.PollInterval = *override.PollInterval + } + + if override.PollIntervalJitter != nil { + options.PollIntervalJitter = *override.PollIntervalJitter + } + + if override.MaxConcurrentReconciles != nil { + options.MaxConcurrentReconciles = *override.MaxConcurrentReconciles + } +} + +func NewOptionsSet(defaultOptions Options) OptionsSet { + return OptionsSet{ + defaultOptions: defaultOptions, + specific: map[string]OptionsOverride{}, + } +} + +// AddOverrides adds overrides for specific controllers from the provided map +// which is similar to ConfigMap data. +// Key format is ".". Properties without scope or with "default" scope +// owerride default values. +func (set *OptionsSet) AddOverrides(values map[string]string) error { + for key, value := range values { + if err := set.addOverride(key, value); err != nil { + return fmt.Errorf("failed to add override for %s: %w", key, err) + } + } + return nil +} + +func (set *OptionsSet) addOverride(key, value string) error { + propSeparatorIdx := strings.LastIndex(key, ".") + propName := key + scope := "default" + if propSeparatorIdx != -1 { + propName = key[propSeparatorIdx+1:] + scope = key[:propSeparatorIdx] + } + overrides := set.specific[scope] + + switch propName { + case "pollInterval": + if duration, err := time.ParseDuration(value); err != nil { + return fmt.Errorf("failed to parse pollInterval value %s: %w", value, err) + } else { + overrides.PollInterval = &duration + } + case "pollIntervalJitter": + if duration, err := time.ParseDuration(value); err != nil { + return fmt.Errorf("failed to parse pollIntervalJitter value %s: %w", value, err) + } else { + overrides.PollIntervalJitter = &duration + } + case "maxConcurrentReconciles": + if maxConcurrentReconciles, err := strconv.Atoi(value); err != nil { + return fmt.Errorf("failed to parse maxConcurrentReconciles value %s: %w", value, err) + } else { + overrides.MaxConcurrentReconciles = &maxConcurrentReconciles + } + default: + return fmt.Errorf("unknown override property %s", propName) + } + + if scope == "default" { + overrides.applyTo(&set.defaultOptions) + } else { + set.specific[scope] = overrides + } + return nil +} + +// Default returns default Options. +func (set OptionsSet) Default() Options { + return set.defaultOptions +} + +// Get returns Options for the specific controller. +func (set OptionsSet) Get(name string) Options { + result := set.defaultOptions + if override, ok := set.specific[name]; ok { + override.applyTo(&result) + } + return result +} diff --git a/pkg/utils/controller/options_test.go b/pkg/utils/controller/options_test.go new file mode 100644 index 0000000000..14975371c8 --- /dev/null +++ b/pkg/utils/controller/options_test.go @@ -0,0 +1,68 @@ +package controller + +import ( + "testing" + "time" + + "github.com/crossplane/crossplane-runtime/pkg/controller" + "github.com/google/go-cmp/cmp" +) + +func TestOptionsOverrides(t *testing.T) { + options := NewOptionsSet(Options{ + Options: controller.Options{ + PollInterval: 2 * time.Minute, + MaxConcurrentReconciles: 3, + }, + }) + options.AddOverrides(map[string]string{ + "pollInterval": "1m", + "ec2.instance.pollInterval": "30s", + "ec2.instance.pollIntervalJitter": "10s", + "route53.maxConcurrentReconciles": "5", + }) + + // defaults with overrides + if diff := cmp.Diff(1*time.Minute, options.Default().PollInterval); diff != "" { + t.Errorf("default.PollInterval: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(0*time.Second, options.Default().PollIntervalJitter); diff != "" { + t.Errorf("default.PollIntervalJitter: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(3, options.Default().MaxConcurrentReconciles); diff != "" { + t.Errorf("default.MaxConcurrentReconciles: -want, +got:\n%s", diff) + } + + // overrides with dot in the scope name + if diff := cmp.Diff(30*time.Second, options.Get("ec2.instance").PollInterval); diff != "" { + t.Errorf("ec2.instance.PollInterval: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(10*time.Second, options.Get("ec2.instance").PollIntervalJitter); diff != "" { + t.Errorf("ec2.instance.PollIntervalJitter: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(3, options.Get("ec2.instance").MaxConcurrentReconciles); diff != "" { + t.Errorf("ec2.instance.MaxConcurrentReconciles: -want, +got:\n%s", diff) + } + + // overrides without dot in the scope name + if diff := cmp.Diff(1*time.Minute, options.Get("route53").PollInterval); diff != "" { + t.Errorf("route53.PollInterval: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(0*time.Second, options.Get("route53").PollIntervalJitter); diff != "" { + t.Errorf("route53.PollIntervalJitter: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(5, options.Get("route53").MaxConcurrentReconciles); diff != "" { + t.Errorf("route53.MaxConcurrentReconciles: -want, +got:\n%s", diff) + } + + // No overrides + if diff := cmp.Diff(1*time.Minute, options.Get("sqs").PollInterval); diff != "" { + t.Errorf("sqs.PollInterval: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(0*time.Second, options.Get("sqs").PollIntervalJitter); diff != "" { + t.Errorf("sqs.PollIntervalJitter: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(3, options.Get("sqs").MaxConcurrentReconciles); diff != "" { + t.Errorf("sqs.MaxConcurrentReconciles: -want, +got:\n%s", diff) + } +} diff --git a/pkg/utils/setup/batch.go b/pkg/utils/setup/batch.go new file mode 100644 index 0000000000..921546e17f --- /dev/null +++ b/pkg/utils/setup/batch.go @@ -0,0 +1,72 @@ +package setup + +import ( + "strings" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/crossplane-contrib/provider-aws/pkg/utils/controller" +) + +// SetupControllerFn is a delegate to initialize a controller with provider-aws Options. +type SetupControllerFn func(ctrl.Manager, controller.Options) error //nolint:golint + +// Batch is a helper for setting up multiple controllers. +type Batch struct { + manager ctrl.Manager + options controller.OptionsSet + prefix string + fns []func() error +} + +func NewBatch(manager ctrl.Manager, options controller.OptionsSet, prefix string) *Batch { + if prefix != "" && !strings.HasSuffix(prefix, ".") { + prefix += "." + } + return &Batch{ + manager: manager, + options: options, + prefix: prefix, + } +} + +// Add adds a controller setup function to the batch, scoping its options with a given name. +func (b *Batch) Add(name string, fn SetupControllerFn) { + b.fns = append(b.fns, func() error { + return fn(b.manager, b.options.Get(b.prefix+name)) + }) +} + +// Add adds a controller setup function to the batch, scoping its options with a given name. +// Setup function takes crossplane Options. +func (b *Batch) AddXp(name string, fn SetupControllerFnXp) { + b.fns = append(b.fns, func() error { + return fn(b.manager, b.options.Get(b.prefix+name).Options) + }) +} + +// AddProxy adds a controller setup function to the batch. +// Setup function takes a controller.OptionsSet. +func (b *Batch) AddProxy(fn func(ctrl.Manager, controller.OptionsSet) error) { + b.fns = append(b.fns, func() error { + return fn(b.manager, b.options) + }) +} + +// AddProxyXp adds a controller setup function to the batch using default crossplane options. +// Setup function takes crossplane Options. +func (b *Batch) AddProxyXp(fn SetupControllerFnXp) { + b.fns = append(b.fns, func() error { + return fn(b.manager, b.options.Default().Options) + }) +} + +// Run runs all controller setup functions in the batch. +func (b *Batch) Run() error { + for _, fn := range b.fns { + if err := fn(); err != nil { + return err + } + } + return nil +} diff --git a/pkg/utils/setup/setup_controller.go b/pkg/utils/setup/setup_controller.go index fecc7ffedd..12c35f0dc2 100644 --- a/pkg/utils/setup/setup_controller.go +++ b/pkg/utils/setup/setup_controller.go @@ -21,12 +21,12 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -// SetupControllerFn is a delegate to initialize a controller. -type SetupControllerFn func(ctrl.Manager, controller.Options) error //nolint:golint +// SetupControllerFnXp is a delegate to initialize a controller with crossplane Options. +type SetupControllerFnXp func(ctrl.Manager, controller.Options) error //nolint:golint // SetupControllers is a shortcut to call a list of SetupControllerFns with mgr // and o. -func SetupControllers(mgr ctrl.Manager, o controller.Options, setups ...SetupControllerFn) error { //nolint:golint +func SetupControllers(mgr ctrl.Manager, o controller.Options, setups ...SetupControllerFnXp) error { //nolint:golint for _, setup := range setups { if err := setup(mgr, o); err != nil { return err