Skip to content

Commit

Permalink
add managed resource metrics with feature flag(implemented for Provis…
Browse files Browse the repository at this point in the history
…ionedProduct only)

Signed-off-by: Kirill Sushkov <[email protected]>
  • Loading branch information
Kirill Sushkov (teeverr) committed Nov 5, 2024
1 parent 11cd0c4 commit cf157ca
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 38 deletions.
14 changes: 10 additions & 4 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func main() {
leaderElection = app.Flag("leader-election", "Use leader election for the conroller manager.").Short('l').Default("false").OverrideDefaultFromEnvar("LEADER_ELECTION").Bool()
maxReconcileRate = app.Flag("max-reconcile-rate", "The global maximum rate per second at which resources may checked for drift from the desired state.").Default("10").Int()

namespace = app.Flag("namespace", "Namespace used to set as default scope in default secret store config.").Default("crossplane-system").Envar("POD_NAMESPACE").String()
enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool()
enableManagementPolicies = app.Flag("enable-management-policies", "Enable support for Management Policies.").Default("false").Envar("ENABLE_MANAGEMENT_POLICIES").Bool()
namespace = app.Flag("namespace", "Namespace used to set as default scope in default secret store config.").Default("crossplane-system").Envar("POD_NAMESPACE").String()
enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool()
enableManagementPolicies = app.Flag("enable-management-policies", "Enable support for Management Policies.").Default("false").Envar("ENABLE_MANAGEMENT_POLICIES").Bool()
enableManagedResourceMetrics = app.Flag("enable-managed-resource-metrics", "Enable prometheus metrics for every managed resource").Default("false").Envar("ENABLE_MANAGED_RESOURCE_METRICS").Bool()
)
kingpin.MustParse(app.Parse(os.Args[1:]))

Expand Down Expand Up @@ -130,7 +131,12 @@ func main() {
log.Info("Alpha feature enabled", "flag", features.EnableAlphaManagementPolicies)
}

kingpin.FatalIfError(metrics.SetupMetrics(), "Cannot setup AWS metrics hook")
if *enableManagedResourceMetrics {
o.Features.Enable(features.EnableManagedResourceMetrics)
log.Info("Managed resource prometheus metrics are enabled", "flag", features.EnableManagedResourceMetrics)
}

kingpin.FatalIfError(metrics.SetupMetrics(o.Features), "Cannot setup AWS metrics")
kingpin.FatalIfError(controller.Setup(mgr, o), "Cannot setup AWS controllers")
kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")

Expand Down
144 changes: 115 additions & 29 deletions pkg/controller/servicecatalog/provisionedproduct/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strings"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
cfsdkv2 "github.com/aws/aws-sdk-go-v2/service/cloudformation"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -33,18 +32,23 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/resource"
cpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

svcapitypes "github.com/crossplane-contrib/provider-aws/apis/servicecatalog/v1alpha1"
"github.com/crossplane-contrib/provider-aws/apis/v1alpha1"
clientset "github.com/crossplane-contrib/provider-aws/pkg/clients/servicecatalog"
"github.com/crossplane-contrib/provider-aws/pkg/features"
awsclient "github.com/crossplane-contrib/provider-aws/pkg/utils/connect/aws"
"github.com/crossplane-contrib/provider-aws/pkg/utils/metrics"
"github.com/crossplane-contrib/provider-aws/pkg/utils/pointer"
custommanaged "github.com/crossplane-contrib/provider-aws/pkg/utils/reconciler/managed"
)
Expand All @@ -64,34 +68,97 @@ const (
errAwsAPICodeInvalidParametersException = "Last Successful Provisioning Record doesn't exist."
)

type customConnector struct {
kube client.Client
options controller.Options
}

type custom struct {
*external
client clientset.Client
cache cache
client clientset.Client
cache cache
metrics metricsRec
}

type cache struct {
describeProductOutput *svcsdk.DescribeProductOutput
getProvisionedProductOutputs []*svcsdk.RecordOutput
lastProvisioningParameters []*svcapitypes.ProvisioningParameter
}

type metricsRec struct {
enabled bool
create prometheus.Counter
observe prometheus.Counter
update prometheus.Counter
delete prometheus.Counter
}

func (c *customConnector) Connect(ctx context.Context, mg cpresource.Managed) (managed.ExternalClient, error) {
cr, ok := mg.(*svcapitypes.ProvisionedProduct)
if !ok {
return nil, errors.New(errUnexpectedObject)
}
sess, err := awsclient.GetConfigV1(ctx, c.kube, mg, cr.Spec.ForProvider.Region)
if err != nil {
return nil, errors.Wrap(err, errCreateSession)
}

awsCfg, err := awsclient.GetConfig(ctx, c.kube, mg, cr.Spec.ForProvider.Region)
if err != nil {
return nil, errors.Wrap(err, errCreateSession)
}
cfClient := cfsdkv2.NewFromConfig(*awsCfg)
svcClient := svcsdk.New(sess)
cust := &custom{
client: &clientset.CustomServiceCatalogClient{
CfClient: cfClient,
Client: svcClient},
}
// We do not re-implement all the ExternalClient interface, so we want
// to reuse the generated one as much as we can (mostly for the Observe,
// Create, Update, Delete methods which call all of our custom hooks)
cust.external = &external{
kube: c.kube,
client: svcClient,

isUpToDate: cust.isUpToDate,
lateInitialize: cust.lateInitialize,
preObserve: cust.preObserve,
postObserve: cust.postObserve,
preUpdate: cust.preUpdate,
preCreate: cust.preCreate,
preDelete: cust.preDelete,

postCreate: nopPostCreate,
postDelete: nopPostDelete,
postUpdate: nopPostUpdate,
}

if c.options.Features.Enabled(features.EnableManagedResourceMetrics) {
cust.metrics.enabled = c.options.Features.Enabled(features.EnableManagedResourceMetrics)
customResourceAPIVersion := fmt.Sprintf("%s/%s", cr.GetObjectKind().GroupVersionKind().Group, cr.GetObjectKind().GroupVersionKind().Version)
metrics.MetricManagedResRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name).Inc()
cust.metrics.create = metrics.MetricAWSAPICallsRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name, "create")
cust.metrics.observe = metrics.MetricAWSAPICallsRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name, "observe")
cust.metrics.update = metrics.MetricAWSAPICallsRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name, "update")
cust.metrics.delete = metrics.MetricAWSAPICallsRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name, "delete")
}

return cust.external, nil
}

// SetupProvisionedProduct adds a controller that reconciles a ProvisionedProduct
func SetupProvisionedProduct(mgr ctrl.Manager, o controller.Options) error {
name := managed.ControllerName(svcapitypes.ProvisionedProductKind)
awsCfg, err := awsconfig.LoadDefaultConfig(context.TODO())
if err != nil {
return err
}
cfClient := cfsdkv2.NewFromConfig(awsCfg)
opts := []option{prepareSetupExternal(cfClient)}
cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())}
if o.Features.Enabled(features.EnableAlphaExternalSecretStores) {
cps = append(cps, connection.NewDetailsManager(mgr.GetClient(), v1alpha1.StoreConfigGroupVersionKind))
}

reconcilerOpts := []managed.ReconcilerOption{
managed.WithCriticalAnnotationUpdater(custommanaged.NewRetryingCriticalAnnotationUpdater(mgr.GetClient())),
managed.WithExternalConnecter(&connector{kube: mgr.GetClient(), opts: opts}),
managed.WithExternalConnecter(&customConnector{kube: mgr.GetClient(), options: o}),
managed.WithInitializers(managed.NewNameAsExternalName(mgr.GetClient())),
managed.WithPollInterval(o.PollInterval),
managed.WithLogger(o.Logger.WithValues("controller", name)),
Expand All @@ -113,40 +180,36 @@ func SetupProvisionedProduct(mgr ctrl.Manager, o controller.Options) error {
reconcilerOpts...))
}

func prepareSetupExternal(cfClient *cfsdkv2.Client) func(*external) {
return func(e *external) {
c := &custom{client: &clientset.CustomServiceCatalogClient{CfClient: cfClient, Client: e.client}}
e.preCreate = preCreate
e.preUpdate = c.preUpdate
e.lateInitialize = c.lateInitialize
e.isUpToDate = c.isUpToDate
e.preObserve = c.preObserve
e.postObserve = c.postObserve
e.preDelete = preDelete
}
}

func (c *custom) lateInitialize(spec *svcapitypes.ProvisionedProductParameters, _ *svcsdk.DescribeProvisionedProductOutput) error {
acceptLanguageEnglish := acceptLanguageEnglish
spec.AcceptLanguage = pointer.LateInitialize(spec.AcceptLanguage, &acceptLanguageEnglish)
return nil
}

func preCreate(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.ProvisionProductInput) error {
func (c *custom) preCreate(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.ProvisionProductInput) error {
input.ProvisionToken = aws.String(genIdempotencyToken())
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
if c.metrics.enabled {
c.metrics.create.Inc()
}
return nil
}

func (c *custom) preUpdate(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.UpdateProvisionedProductInput) error {
input.UpdateToken = aws.String(genIdempotencyToken())
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
if c.metrics.enabled {
c.metrics.update.Inc()
}
return nil
}

func (c *custom) preObserve(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.DescribeProvisionedProductInput) error {
input.Name = aws.String(meta.GetExternalName(ds))
c.cache.lastProvisioningParameters = ds.Status.AtProvider.DeepCopy().LastProvisioningParameters
if c.metrics.enabled {
c.metrics.observe.Inc()
}
return nil
}

Expand All @@ -162,6 +225,9 @@ func (c *custom) isUpToDate(_ context.Context, ds *svcapitypes.ProvisionedProduc

getPPOutputInput := &svcsdk.GetProvisionedProductOutputsInput{ProvisionedProductId: resp.ProvisionedProductDetail.Id}
getPPOutput, err := c.client.GetProvisionedProductOutputs(getPPOutputInput)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
if err != nil {
// We want to specifically handle this exception, since it will occur when something
// is wrong with the provisioned product (error on creation, tainted, etc.)
Expand Down Expand Up @@ -196,6 +262,9 @@ func (c *custom) postObserve(_ context.Context, ds *svcapitypes.ProvisionedProdu

describeRecordInput := svcsdk.DescribeRecordInput{Id: resp.ProvisionedProductDetail.LastRecordId}
describeRecordOutput, err := c.client.DescribeRecord(&describeRecordInput)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, errCouldNotDescribeRecord)
}
Expand Down Expand Up @@ -225,12 +294,15 @@ func (c *custom) postObserve(_ context.Context, ds *svcapitypes.ProvisionedProdu
return obs, nil
}

func preDelete(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.TerminateProvisionedProductInput) (bool, error) {
func (c custom) preDelete(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.TerminateProvisionedProductInput) (bool, error) {
if ptr.Deref(ds.Status.AtProvider.Status, "") == string(svcapitypes.ProvisionedProductStatus_SDK_UNDER_CHANGE) {
return true, nil
}
input.TerminateToken = aws.String(genIdempotencyToken())
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
if c.metrics.enabled {
c.metrics.delete.Inc()
}
return false, nil
}

Expand Down Expand Up @@ -275,6 +347,9 @@ func (c *custom) provisioningParamsAreChanged(ds *svcapitypes.ProvisionedProduct
}

cfStackParams, err := c.client.GetCloudformationStackParameters(c.cache.getProvisionedProductOutputs)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
if err != nil {
return false, "", errors.Wrap(err, errCouldNotGetCFParameters)
}
Expand Down Expand Up @@ -358,9 +433,13 @@ func (c *custom) getArtifactID(ds *svcapitypes.ProvisionedProduct) (string, erro
}
// DescribeProvisioningArtifact method fits much better, but it has a bug - it returns nothing if a product is a part of imported portfolio
output, err := c.client.DescribeProduct(&input)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
if err != nil {
return "", errors.Wrap(err, errCouldNotLookupProduct)
}
c.cache.describeProductOutput = output
for _, artifact := range output.ProvisioningArtifacts {
if ptr.Deref(ds.Spec.ForProvider.ProvisioningArtifactName, "") == *artifact.Name ||
ptr.Deref(ds.Spec.ForProvider.ProvisioningArtifactID, "") == *artifact.Id {
Expand All @@ -372,10 +451,17 @@ func (c *custom) getArtifactID(ds *svcapitypes.ProvisionedProduct) (string, erro

func (c *custom) getProductID(productName *string) (string, error) {
input := svcsdk.DescribeProductInput{Name: productName}
// DescribeProvisioningArtifact method fits much better, but it has a bug - it returns nothing if a product is a part of imported portfolio
output, err := c.client.DescribeProduct(&input)
if err != nil {
return "", errors.Wrap(err, errCouldNotLookupProduct)
output := c.cache.describeProductOutput
if output == nil {
// DescribeProvisioningArtifact method fits much better, but it has a bug - it returns nothing if a product is a part of imported portfolio
o, err := c.client.DescribeProduct(&input)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
output = o
if err != nil {
return "", errors.Wrap(err, errCouldNotLookupProduct)
}
}
return ptr.Deref(output.ProductViewSummary.ProductId, ""), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func describeProvisionedProduct(m ...describeProvisionedProductOutputModifier) *
func setupFakeExternal(fakeClient clientset.Client, cache cache) func(*external) {
return func(e *external) {
c := &custom{client: fakeClient, cache: cache}
e.preCreate = preCreate
e.preCreate = c.preCreate
e.preUpdate = c.preUpdate
e.lateInitialize = c.lateInitialize
e.isUpToDate = c.isUpToDate
e.preObserve = c.preObserve
e.postObserve = c.postObserve
e.preDelete = preDelete
e.preDelete = c.preDelete
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ const (
// Management Policies. See the below design for more details.
// https://github.com/crossplane/crossplane/pull/3531
EnableAlphaManagementPolicies feature.Flag = "EnableAlphaManagementPolicies"

// EnableManagedResourceMetrics enables additional metrics for every managed resource, if implemented
EnableManagedResourceMetrics feature.Flag = "EnableManagedResourceMetrics"
)
41 changes: 38 additions & 3 deletions pkg/utils/metrics/setup.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,55 @@
package metrics

import (
"github.com/crossplane/crossplane-runtime/pkg/feature"
"github.com/prometheus/client_golang/prometheus"
k8smetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/crossplane-contrib/provider-aws/pkg/features"
)

var (
metricAWSAPICalls = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "aws_api_calls_total",
Help: "Number of API calls to the AWS API",
}, []string{"service", "operation", "api_version"})

MetricAWSAPICallsRec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "aws_api_calls_reconcile_managed_resource_total",
Help: "Amount of calls to the AWS API produced by controller per reconciliation for every managed resource and controller operation type",
}, []string{"api_version", "kind", "resource_name", "controller_operation_type"})

MetricManagedResRec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "managed_resource_reconciles_total",
Help: "Total amount of reconciliation loops per managed resource",
}, []string{"api_version", "kind", "resource_name"})
)

type metric interface {
Describe(chan<- *prometheus.Desc)
Collect(chan<- prometheus.Metric)
}

// SetupMetrics will register the known Prometheus metrics with controller-runtime's metrics registry
func SetupMetrics() error {
return k8smetrics.Registry.Register(metricAWSAPICalls)
func SetupMetrics(flags *feature.Flags) error {
metricsList := []metric{
metricAWSAPICalls,
}
managedResourceMetricsList := []metric{
MetricAWSAPICallsRec,
MetricManagedResRec,
}

if flags.Enabled(features.EnableManagedResourceMetrics) {
metricsList = append(metricsList, managedResourceMetricsList...)
}
for _, m := range metricsList {
err := metrics.Registry.Register(m)
if err != nil {
return err
}
}
return nil
}

// IncAWSAPICall will increment the aws_api_calls_total metric for the specified service, operation, and apiVersion tuple
Expand Down

0 comments on commit cf157ca

Please sign in to comment.