Skip to content

Commit

Permalink
Add monitor for event-based reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
max-melentyev committed Apr 23, 2024
1 parent a55852f commit 3923e8b
Show file tree
Hide file tree
Showing 18 changed files with 1,084 additions and 111 deletions.
61 changes: 56 additions & 5 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (
"context"
"os"
"path/filepath"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
xpcontroller "github.com/crossplane/crossplane-runtime/pkg/controller"
"github.com/crossplane/crossplane-runtime/pkg/feature"
Expand All @@ -41,9 +44,15 @@ import (
"github.com/crossplane-contrib/provider-aws/apis/v1alpha1"
"github.com/crossplane-contrib/provider-aws/pkg/controller"
"github.com/crossplane-contrib/provider-aws/pkg/features"
sqsmonitor "github.com/crossplane-contrib/provider-aws/pkg/monitor/sqs"
utilscontroller "github.com/crossplane-contrib/provider-aws/pkg/utils/controller"
"github.com/crossplane-contrib/provider-aws/pkg/utils/metrics"
)

// Env prefix for options to configure controllers.
// Example usage: `PROVIDER_AWS_ec2.instance.pollInterval=10m`.
const OPTION_ENV_PREFIX = "PROVIDER_AWS_"

func main() {
var (
app = kingpin.New(filepath.Base(os.Args[0]), "AWS support for Crossplane.").DefaultEnvars()
Expand All @@ -52,6 +61,7 @@ func main() {
pollInterval = app.Flag("poll", "Poll interval controls how often an individual resource should be checked for drift.").Default("1m").Duration()
leaderElection = app.Flag("leader-election", "Use leader election for the conroller manager.").Short('l').Default("false").OverrideDefaultFromEnvar("LEADER_ELECTION").Bool()
maxReconcileRate = app.Flag("max-reconcile-rate", "The global maximum rate per second at which resources may checked for drift from the desired state.").Default("10").Int()
eventsSqsUrl = app.Flag("events-sqs-url", "SQS queue with AWS events").Default("").String()

namespace = app.Flag("namespace", "Namespace used to set as default scope in default secret store config.").Default("crossplane-system").Envar("POD_NAMESPACE").String()
enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool()
Expand Down Expand Up @@ -94,13 +104,13 @@ func main() {
kingpin.FatalIfError(err, "Cannot create controller manager")
kingpin.FatalIfError(apis.AddToScheme(mgr.GetScheme()), "Cannot add AWS APIs to scheme")

o := xpcontroller.Options{
o := utilscontroller.Options{Options: xpcontroller.Options{
Logger: log,
MaxConcurrentReconciles: *maxReconcileRate,
PollInterval: *pollInterval,
GlobalRateLimiter: ratelimiter.NewGlobal(*maxReconcileRate),
Features: &feature.Flags{},
}
}}

if *enableExternalSecretStores {
o.Features.Enable(features.EnableAlphaExternalSecretStores)
Expand All @@ -126,10 +136,23 @@ func main() {
log.Info("Alpha feature enabled", "flag", features.EnableAlphaManagementPolicies)
}

kingpin.FatalIfError(metrics.SetupMetrics(), "Cannot setup AWS metrics hook")
kingpin.FatalIfError(controller.Setup(mgr, o), "Cannot setup AWS controllers")
kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")
if *eventsSqsUrl != "" {
o.Monitor = newSqsMonitor(*eventsSqsUrl, log)
}

optionsWithOverrides := utilscontroller.NewOptionsSet(o)
kingpin.FatalIfError(optionsWithOverrides.AddOverrides(optionsOverridesFromEnv()), "Cannot add overrides")

ctx := ctrl.SetupSignalHandler()

kingpin.FatalIfError(metrics.SetupMetrics(), "Cannot setup AWS metrics hook")
kingpin.FatalIfError(controller.Setup(mgr, optionsWithOverrides), "Cannot setup AWS controllers")
// Must be added after controllers so that received messages are processed by the controllers
if o.Monitor != nil {
kingpin.FatalIfError(mgr.Add(o.Monitor), "Cannot add monitor to manager")
kingpin.FatalIfError(o.Monitor.Prepare(ctx), "Monitor.Prepare() failed")
}
kingpin.FatalIfError(mgr.Start(ctx), "Cannot start controller manager")
}

// UseISO8601 sets the logger to use ISO8601 timestamp format
Expand All @@ -138,3 +161,31 @@ func UseISO8601() zap.Opts {
o.TimeEncoder = zapcore.ISO8601TimeEncoder
}
}

// Collects all env variables with the prefix OPTION_ENV_PREFIX and returns them as a map
// with the prefix removed.
func optionsOverridesFromEnv() map[string]string {
result := make(map[string]string)
for _, str := range os.Environ() {
if rest, ok := strings.CutPrefix(str, OPTION_ENV_PREFIX); ok {
parts := strings.SplitN(rest, "=", 2)
if len(parts) == 2 {
result[parts[0]] = parts[1]
}
}
}
return result
}

func newSqsMonitor(url string, logger logging.Logger) *sqsmonitor.Monitor {
awsCfg, err := config.LoadDefaultConfig(context.Background())
kingpin.FatalIfError(err, "Cannot load AWS config")
return sqsmonitor.NewMonitor(
awsCfg,
sqs.ReceiveMessageInput{
QueueUrl: &url,
WaitTimeSeconds: 20,
},
logger,
)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/onsi/gomega v1.27.10
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/stretchr/testify v1.8.2
go.uber.org/zap v1.26.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
k8s.io/api v0.28.3
Expand Down Expand Up @@ -108,6 +109,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/operator-framework/api v0.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
Expand Down
121 changes: 60 additions & 61 deletions pkg/controller/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package controller

import (
"github.com/crossplane/crossplane-runtime/pkg/controller"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/crossplane-contrib/provider-aws/pkg/controller/acm"
Expand Down Expand Up @@ -76,69 +75,69 @@ import (
"github.com/crossplane-contrib/provider-aws/pkg/controller/sns"
"github.com/crossplane-contrib/provider-aws/pkg/controller/sqs"
"github.com/crossplane-contrib/provider-aws/pkg/controller/transfer"
"github.com/crossplane-contrib/provider-aws/pkg/utils/controller"
"github.com/crossplane-contrib/provider-aws/pkg/utils/setup"
)

// Setup creates all AWS controllers with the supplied logger and adds them to
// the supplied manager.
func Setup(mgr ctrl.Manager, o controller.Options) error {
return setup.SetupControllers(
mgr, o,
acm.Setup,
acmpca.Setup,
apigateway.Setup,
apigatewayv2.Setup,
athena.Setup,
autoscaling.Setup,
batch.Setup,
cache.Setup,
cloudfront.Setup,
cloudsearch.Setup,
cloudwatchlogs.Setup,
cognitoidentity.Setup,
cognitoidentityprovider.Setup,
config.Setup,
database.Setup,
dax.Setup,
docdb.Setup,
dynamodb.Setup,
ec2.Setup,
ecr.Setup,
ecs.Setup,
efs.Setup,
eks.Setup,
elasticache.Setup,
elasticloadbalancing.Setup,
elbv2.Setup,
emrcontainers.Setup,
firehose.Setup,
glue.Setup,
globalaccelerator.Setup,
iam.Setup,
iot.Setup,
kafka.Setup,
kinesis.Setup,
kms.Setup,
lambda.Setup,
mq.Setup,
mwaa.Setup,
neptune.Setup,
opensearchservice.Setup,
prometheusservice.Setup,
ram.Setup,
rds.Setup,
redshift.Setup,
route53.Setup,
route53resolver.Setup,
s3.Setup,
s3control.Setup,
secretsmanager.Setup,
servicecatalog.Setup,
servicediscovery.Setup,
sesv2.Setup,
sfn.Setup,
sns.Setup,
sqs.Setup,
transfer.Setup,
)
func Setup(mgr ctrl.Manager, o controller.OptionsSet) error {
b := setup.NewBatch(mgr, o, "")
b.AddProxyXp(acm.Setup)
b.AddProxyXp(acmpca.Setup)
b.AddProxyXp(apigateway.Setup)
b.AddProxyXp(apigatewayv2.Setup)
b.AddProxyXp(athena.Setup)
b.AddProxyXp(autoscaling.Setup)
b.AddProxyXp(batch.Setup)
b.AddProxyXp(cache.Setup)
b.AddProxyXp(cloudfront.Setup)
b.AddProxyXp(cloudsearch.Setup)
b.AddProxyXp(cloudwatchlogs.Setup)
b.AddProxyXp(cognitoidentity.Setup)
b.AddProxyXp(cognitoidentityprovider.Setup)
b.AddProxyXp(config.Setup)
b.AddProxyXp(database.Setup)
b.AddProxyXp(dax.Setup)
b.AddProxyXp(docdb.Setup)
b.AddProxyXp(dynamodb.Setup)
b.AddProxy(ec2.Setup)
b.AddProxyXp(ecr.Setup)
b.AddProxyXp(ecs.Setup)
b.AddProxyXp(efs.Setup)
b.AddProxyXp(eks.Setup)
b.AddProxyXp(elasticache.Setup)
b.AddProxyXp(elasticloadbalancing.Setup)
b.AddProxyXp(elbv2.Setup)
b.AddProxyXp(emrcontainers.Setup)
b.AddProxyXp(firehose.Setup)
b.AddProxyXp(glue.Setup)
b.AddProxyXp(globalaccelerator.Setup)
b.AddProxyXp(iam.Setup)
b.AddProxyXp(iot.Setup)
b.AddProxyXp(kafka.Setup)
b.AddProxyXp(kinesis.Setup)
b.AddProxyXp(kms.Setup)
b.AddProxyXp(lambda.Setup)
b.AddProxyXp(mq.Setup)
b.AddProxyXp(mwaa.Setup)
b.AddProxyXp(neptune.Setup)
b.AddProxyXp(opensearchservice.Setup)
b.AddProxyXp(prometheusservice.Setup)
b.AddProxyXp(ram.Setup)
b.AddProxyXp(rds.Setup)
b.AddProxyXp(redshift.Setup)
b.AddProxy(route53.Setup)
b.AddProxyXp(route53resolver.Setup)
b.AddProxyXp(s3.Setup)
b.AddProxyXp(s3control.Setup)
b.AddProxyXp(secretsmanager.Setup)
b.AddProxyXp(servicecatalog.Setup)
b.AddProxyXp(servicediscovery.Setup)
b.AddProxyXp(sesv2.Setup)
b.AddProxyXp(sfn.Setup)
b.AddProxyXp(sns.Setup)
b.AddProxyXp(sqs.Setup)
b.AddProxyXp(transfer.Setup)
return b.Run()
}
18 changes: 14 additions & 4 deletions pkg/controller/ec2/instance/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/connection"
"github.com/crossplane/crossplane-runtime/pkg/controller"
"github.com/crossplane/crossplane-runtime/pkg/event"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
Expand All @@ -34,12 +33,14 @@ import (
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"

svcapitypes "github.com/crossplane-contrib/provider-aws/apis/ec2/manualv1alpha1"
"github.com/crossplane-contrib/provider-aws/apis/v1alpha1"
"github.com/crossplane-contrib/provider-aws/pkg/clients/ec2"
"github.com/crossplane-contrib/provider-aws/pkg/features"
connectaws "github.com/crossplane-contrib/provider-aws/pkg/utils/connect/aws"
"github.com/crossplane-contrib/provider-aws/pkg/utils/controller"
errorutils "github.com/crossplane-contrib/provider-aws/pkg/utils/errors"
"github.com/crossplane-contrib/provider-aws/pkg/utils/pointer"
custommanaged "github.com/crossplane-contrib/provider-aws/pkg/utils/reconciler/managed"
Expand Down Expand Up @@ -79,6 +80,10 @@ func SetupInstance(mgr ctrl.Manager, o controller.Options) error {
managed.WithConnectionPublishers(cps...),
}

if o.PollIntervalJitter != 0 {
reconcilerOpts = append(reconcilerOpts, managed.WithPollJitterHook(o.PollIntervalJitter))
}

if o.Features.Enabled(features.EnableAlphaManagementPolicies) {
reconcilerOpts = append(reconcilerOpts, managed.WithManagementPolicies())
}
Expand All @@ -87,12 +92,17 @@ func SetupInstance(mgr ctrl.Manager, o controller.Options) error {
resource.ManagedKind(svcapitypes.InstanceGroupVersionKind),
reconcilerOpts...)

return ctrl.NewControllerManagedBy(mgr).
bldr := ctrl.NewControllerManagedBy(mgr).
Named(name).
WithOptions(o.ForControllerRuntime()).
WithEventFilter(resource.DesiredStateChanged()).
For(&svcapitypes.Instance{}).
Complete(r)
For(&svcapitypes.Instance{})

if o.Monitor != nil {
bldr = bldr.WatchesRawSource(eventsFromMonitor(mgr, o.Monitor), &handler.EnqueueRequestForObject{})
}

return bldr.Complete(r)
}

type connector struct {
Expand Down
56 changes: 56 additions & 0 deletions pkg/controller/ec2/instance/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2024 The Crossplane Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package instance

import (
"context"
"fmt"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/source"

svcapitypes "github.com/crossplane-contrib/provider-aws/apis/ec2/manualv1alpha1"
"github.com/crossplane-contrib/provider-aws/pkg/monitor"
"github.com/crossplane-contrib/provider-aws/pkg/utils/cache"
)

func eventsFromMonitor(mgr ctrl.Manager, mnt monitor.Monitor) source.Source {
mnt.AddPrepareHook(func(ctx context.Context) error {
if err := cache.IndexByExternalName(ctx, mgr.GetCache(), &svcapitypes.Instance{}); err != nil {
return fmt.Errorf("setting up indexer: %w", err)
}
return nil
})

eventsChannel := make(chan event.GenericEvent)
mnt.AddSubscriber(func(ctx context.Context, evt monitor.Event) {
if evt.GVK != svcapitypes.InstanceGroupVersionKind {
return
}
var list svcapitypes.InstanceList
if err := cache.ListByExternalName(ctx, mgr.GetCache(), &list, evt.ExternalName); err != nil {
mgr.GetLogger().Error(err, "failed to list objects", "externalName", evt.ExternalName)
return
}
for _, object := range list.Items {
object := object
eventsChannel <- event.GenericEvent{Object: &object}
}
})
return &source.Channel{Source: eventsChannel}
}
Loading

0 comments on commit 3923e8b

Please sign in to comment.