Skip to content

Commit

Permalink
add reconcilation metrics, draft
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Sushkov (teeverr) committed Oct 9, 2023
1 parent 58b5685 commit a8d28fd
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 85 deletions.
2 changes: 1 addition & 1 deletion cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func main() {
log.Info("Alpha feature enabled", "flag", features.EnableAlphaManagementPolicies)
}

kingpin.FatalIfError(metrics.SetupMetrics(), "Cannot setup AWS metrics hook")
metrics.SetupMetrics()
kingpin.FatalIfError(controller.Setup(mgr, o), "Cannot setup AWS controllers")
kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")

Expand Down
204 changes: 123 additions & 81 deletions pkg/controller/servicecatalog/provisionedproduct/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"fmt"
"strings"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/crossplane-contrib/provider-aws/pkg/utils/metrics"

cfsdkv2 "github.com/aws/aws-sdk-go-v2/service/cloudformation"
cfsdkv2types "github.com/aws/aws-sdk-go-v2/service/cloudformation/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
requestv1 "github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
svcsdk "github.com/aws/aws-sdk-go/service/servicecatalog"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/connection"
Expand All @@ -34,6 +37,7 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/resource"
cpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/google/uuid"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -65,23 +69,75 @@ const (
errAwsAPICodeInvalidParametersException = "Last Successful Provisioning Record doesn't exist."
)

type custom struct {
*external
kube client.Client
client clientset.Client
session *session.Session
cache cache
}

type cache struct {
getProvisionedProductOutputs []*svcsdk.RecordOutput
}

func (c *custom) Connect(ctx context.Context, mg cpresource.Managed) (managed.ExternalClient, error) {
cr, ok := mg.(*svcapitypes.ProvisionedProduct)
if !ok {
return nil, errors.New(errUnexpectedObject)
}
sess, err := awsclient.GetConfigV1(ctx, c.kube, mg, cr.Spec.ForProvider.Region)
c.session = sess
if err != nil {
return nil, errors.Wrap(err, errCreateSession)
}

awsCfg, err := awsclient.GetConfig(ctx, c.kube, mg, cr.Spec.ForProvider.Region)
if err != nil {
return nil, errors.Wrap(err, errCreateSession)
}
cfClient := cfsdkv2.NewFromConfig(*awsCfg)
svcClient := svcsdk.New(sess)
c.client = &clientset.CustomServiceCatalogClient{CfClient: cfClient, Client: svcClient}
// We do not re-implement all the ExternalClient interface, so we want
// to reuse the generated one as much as we can (mostly for the Observe,
// Create, Update, Delete methods which call all of our custom hooks)
c.external = &external{
kube: c.kube,
client: svcClient,

// All of our overrides must go here
isUpToDate: c.isUpToDate,
lateInitialize: c.lateInitialize,
preObserve: c.preObserve,
postObserve: c.postObserve,
preUpdate: c.preUpdate,
preCreate: c.preCreate,
preDelete: c.preDelete,

// If we do not implement a method, we must specify the no-op function
postCreate: nopPostCreate,
postDelete: nopPostDelete,
postUpdate: nopPostUpdate,
}
metrics.MetricAWSAPIRecCalls.WithLabelValues(cr.GetObjectKind().GroupVersionKind().Kind, cr.GetObjectKind().GroupVersionKind().Group, cr.Name, "create").Set(0)
metrics.MetricAWSAPIRecCalls.WithLabelValues(cr.GetObjectKind().GroupVersionKind().Kind, cr.GetObjectKind().GroupVersionKind().Group, cr.Name, "observe").Set(0)
metrics.MetricAWSAPIRecCalls.WithLabelValues(cr.GetObjectKind().GroupVersionKind().Kind, cr.GetObjectKind().GroupVersionKind().Group, cr.Name, "update").Set(0)
metrics.MetricAWSAPIRecCalls.WithLabelValues(cr.GetObjectKind().GroupVersionKind().Kind, cr.GetObjectKind().GroupVersionKind().Group, cr.Name, "delete").Set(0)

return c.external, nil
}

// SetupProvisionedProduct adds a controller that reconciles a ProvisionedProduct
func SetupProvisionedProduct(mgr ctrl.Manager, o controller.Options) error {
name := managed.ControllerName(svcapitypes.ProvisionedProductKind)
awsCfg, err := awsconfig.LoadDefaultConfig(context.TODO())
if err != nil {
return err
}
cfClient := cfsdkv2.NewFromConfig(awsCfg)
kube := mgr.GetClient()
opts := []option{prepareSetupExternal(cfClient, kube)}
cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())}
if o.Features.Enabled(features.EnableAlphaExternalSecretStores) {
cps = append(cps, connection.NewDetailsManager(mgr.GetClient(), v1alpha1.StoreConfigGroupVersionKind))
}

reconcilerOpts := []managed.ReconcilerOption{
managed.WithExternalConnecter(&connector{kube: kube, opts: opts}),
managed.WithExternalConnecter(&custom{kube: mgr.GetClient()}),
managed.WithPollInterval(o.PollInterval),
managed.WithLogger(o.Logger.WithValues("controller", name)),
managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))),
Expand All @@ -102,51 +158,40 @@ func SetupProvisionedProduct(mgr ctrl.Manager, o controller.Options) error {
reconcilerOpts...))
}

func prepareSetupExternal(cfClient *cfsdkv2.Client, kube client.Client) func(*external) {
return func(e *external) {
c := &custom{client: &clientset.CustomServiceCatalogClient{CfClient: cfClient, Client: e.client}, kube: kube}
e.preCreate = preCreate
e.preUpdate = c.preUpdate
e.lateInitialize = c.lateInitialize
e.isUpToDate = c.isUpToDate
e.preObserve = c.preObserve
e.postObserve = c.postObserve
e.preDelete = preDelete
}
}

type custom struct {
kube client.Client
client clientset.Client
cache cache
}

type cache struct {
getProvisionedProductOutputs []*svcsdk.RecordOutput
}

func (c *custom) lateInitialize(spec *svcapitypes.ProvisionedProductParameters, _ *svcsdk.DescribeProvisionedProductOutput) error {
acceptLanguageEnglish := acceptLanguageEnglish
spec.AcceptLanguage = awsclient.LateInitializeStringPtr(spec.AcceptLanguage, &acceptLanguageEnglish)
return nil
}

func preCreate(_ context.Context, cr *svcapitypes.ProvisionedProduct, input *svcsdk.ProvisionProductInput) error {
func (c *custom) preCreate(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.ProvisionProductInput) error {
metrics.MetricAWSAPIRecCalls.WithLabelValues(ds.GetObjectKind().GroupVersionKind().Kind, ds.GetObjectKind().GroupVersionKind().Group, ds.Name, "create").Inc()
input.ProvisionToken = aws.String(genIdempotencyToken())
if cr.GetName() != meta.GetExternalName(cr) {
if ds.GetName() != meta.GetExternalName(ds) {
return errors.New(errCreatExternalNameIsNotValid)
}
input.ProvisionedProductName = aws.String(meta.GetExternalName(cr))
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
return nil
}

func (c *custom) preUpdate(_ context.Context, cr *svcapitypes.ProvisionedProduct, input *svcsdk.UpdateProvisionedProductInput) error {
func (c *custom) preUpdate(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.UpdateProvisionedProductInput) error {
metrics.MetricAWSAPIRecCalls.WithLabelValues(ds.GetObjectKind().GroupVersionKind().Kind, ds.GetObjectKind().GroupVersionKind().Group, ds.Name, "update").Inc()
input.UpdateToken = aws.String(genIdempotencyToken())
if cr.GetName() == meta.GetExternalName(cr) {
input.ProvisionedProductName = aws.String(meta.GetExternalName(cr))
if ds.GetName() == meta.GetExternalName(ds) {
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
} else {
input.ProvisionedProductId = aws.String(meta.GetExternalName(ds))
}
return nil
}

func (c *custom) preObserve(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.DescribeProvisionedProductInput) error {
if ds.GetName() == meta.GetExternalName(ds) {
input.Name = aws.String(meta.GetExternalName(ds))
} else {
input.ProvisionedProductId = aws.String(meta.GetExternalName(cr))
input.Id = aws.String(meta.GetExternalName(ds))
}
metrics.MetricAWSAPIRecCalls.WithLabelValues(ds.GetObjectKind().GroupVersionKind().Kind, ds.GetObjectKind().GroupVersionKind().Group, ds.Name, "observe").Inc()
return nil
}

Expand All @@ -161,6 +206,9 @@ func (c *custom) isUpToDate(ctx context.Context, ds *svcapitypes.ProvisionedProd

getPPOutputInput := &svcsdk.GetProvisionedProductOutputsInput{ProvisionedProductId: resp.ProvisionedProductDetail.Id}
getPPOutput, err := c.client.GetProvisionedProductOutputs(getPPOutputInput)
c.session.Handlers.Send.PushFront(func(r *requestv1.Request) {
metrics.MetricAWSAPIRecCalls.WithLabelValues(ds.GetObjectKind().GroupVersionKind().Kind, ds.GetObjectKind().GroupVersionKind().Group, ds.Name, "observe").Inc()
})
if err != nil {
// We want to specifically handle this exception, since it will occur when something
// is wrong with the provisioned product (error on creation, tainted, etc)
Expand All @@ -173,11 +221,12 @@ func (c *custom) isUpToDate(ctx context.Context, ds *svcapitypes.ProvisionedProd
}
c.cache.getProvisionedProductOutputs = getPPOutput.Outputs
cfStackParameters, err := c.client.GetCloudformationStackParameters(getPPOutput.Outputs)
metrics.MetricAWSAPIRecCalls.WithLabelValues(ds.GetObjectKind().GroupVersionKind().Kind, ds.GetObjectKind().GroupVersionKind().Group, ds.Name, "observe").Inc()
if err != nil {
return false, "", errors.Wrap(err, errCouldNotGetCFParameters)
}

productOrArtifactIsChanged, err := c.productOrArtifactIsChanged(&ds.Spec.ForProvider, resp.ProvisionedProductDetail)
productOrArtifactIsChanged, err := c.productOrArtifactIsChanged(ds, resp.ProvisionedProductDetail)
if err != nil {
return false, "", errors.Wrap(err, "could not discover if product or artifact ids have changed")
}
Expand All @@ -192,28 +241,19 @@ func (c *custom) isUpToDate(ctx context.Context, ds *svcapitypes.ProvisionedProd
return true, "", nil
}

func (c *custom) preObserve(_ context.Context, cr *svcapitypes.ProvisionedProduct, input *svcsdk.DescribeProvisionedProductInput) error {
if cr.GetName() == meta.GetExternalName(cr) {
input.Name = aws.String(meta.GetExternalName(cr))
} else {
input.Id = aws.String(meta.GetExternalName(cr))
}
return nil

}

func (c *custom) postObserve(_ context.Context, cr *svcapitypes.ProvisionedProduct, resp *svcsdk.DescribeProvisionedProductOutput, obs managed.ExternalObservation, err error) (managed.ExternalObservation, error) {
func (c *custom) postObserve(_ context.Context, ds *svcapitypes.ProvisionedProduct, resp *svcsdk.DescribeProvisionedProductOutput, obs managed.ExternalObservation, err error) (managed.ExternalObservation, error) {
if err != nil {
return managed.ExternalObservation{}, err
}

describeRecordInput := svcsdk.DescribeRecordInput{Id: resp.ProvisionedProductDetail.LastRecordId}
describeRecordOutput, err := c.client.DescribeRecord(&describeRecordInput)
metrics.MetricAWSAPIRecCalls.WithLabelValues(ds.GetObjectKind().GroupVersionKind().Kind, ds.GetObjectKind().GroupVersionKind().Group, ds.Name, "observe").Inc()
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, errCouldNotDescribeRecord)
}

setConditions(describeRecordOutput, resp, cr)
setConditions(describeRecordOutput, resp, ds)

var outputs = make(map[string]*svcapitypes.RecordOutput)
for _, v := range c.cache.getProvisionedProductOutputs {
Expand All @@ -222,33 +262,34 @@ func (c *custom) postObserve(_ context.Context, cr *svcapitypes.ProvisionedProdu
OutputValue: v.OutputValue}
}

cr.Status.AtProvider.Outputs = outputs
cr.Status.AtProvider.ARN = resp.ProvisionedProductDetail.Arn
cr.Status.AtProvider.CreatedTime = &metav1.Time{Time: *resp.ProvisionedProductDetail.CreatedTime}
cr.Status.AtProvider.LastProvisioningRecordID = resp.ProvisionedProductDetail.LastProvisioningRecordId
cr.Status.AtProvider.LaunchRoleARN = resp.ProvisionedProductDetail.LaunchRoleArn
cr.Status.AtProvider.Status = resp.ProvisionedProductDetail.Status
cr.Status.AtProvider.StatusMessage = resp.ProvisionedProductDetail.StatusMessage
cr.Status.AtProvider.ProvisionedProductType = resp.ProvisionedProductDetail.Type
cr.Status.AtProvider.RecordType = describeRecordOutput.RecordDetail.RecordType
cr.Status.AtProvider.LastPathID = describeRecordOutput.RecordDetail.PathId
cr.Status.AtProvider.LastProductID = describeRecordOutput.RecordDetail.ProductId
cr.Status.AtProvider.LastProvisioningArtifactID = describeRecordOutput.RecordDetail.ProvisioningArtifactId
cr.Status.AtProvider.LastProvisioningParameters = cr.Spec.ForProvider.ProvisioningParameters
ds.Status.AtProvider.Outputs = outputs
ds.Status.AtProvider.ARN = resp.ProvisionedProductDetail.Arn
ds.Status.AtProvider.CreatedTime = &metav1.Time{Time: *resp.ProvisionedProductDetail.CreatedTime}
ds.Status.AtProvider.LastProvisioningRecordID = resp.ProvisionedProductDetail.LastProvisioningRecordId
ds.Status.AtProvider.LaunchRoleARN = resp.ProvisionedProductDetail.LaunchRoleArn
ds.Status.AtProvider.Status = resp.ProvisionedProductDetail.Status
ds.Status.AtProvider.StatusMessage = resp.ProvisionedProductDetail.StatusMessage
ds.Status.AtProvider.ProvisionedProductType = resp.ProvisionedProductDetail.Type
ds.Status.AtProvider.RecordType = describeRecordOutput.RecordDetail.RecordType
ds.Status.AtProvider.LastPathID = describeRecordOutput.RecordDetail.PathId
ds.Status.AtProvider.LastProductID = describeRecordOutput.RecordDetail.ProductId
ds.Status.AtProvider.LastProvisioningArtifactID = describeRecordOutput.RecordDetail.ProvisioningArtifactId
ds.Status.AtProvider.LastProvisioningParameters = ds.Spec.ForProvider.ProvisioningParameters

return obs, nil
}

func preDelete(_ context.Context, cr *svcapitypes.ProvisionedProduct, input *svcsdk.TerminateProvisionedProductInput) (bool, error) {
if pointer.StringDeref(cr.Status.AtProvider.Status, "") == string(svcapitypes.ProvisionedProductStatus_SDK_UNDER_CHANGE) {
func (c *custom) preDelete(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.TerminateProvisionedProductInput) (bool, error) {
if pointer.StringDeref(ds.Status.AtProvider.Status, "") == string(svcapitypes.ProvisionedProductStatus_SDK_UNDER_CHANGE) {
return true, nil
}
input.TerminateToken = aws.String(genIdempotencyToken())
if cr.GetName() == meta.GetExternalName(cr) {
input.ProvisionedProductName = aws.String(meta.GetExternalName(cr))
if ds.GetName() == meta.GetExternalName(ds) {
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
} else {
input.ProvisionedProductId = aws.String(meta.GetExternalName(cr))
input.ProvisionedProductId = aws.String(meta.GetExternalName(ds))
}
metrics.MetricAWSAPIRecCalls.WithLabelValues(ds.GetObjectKind().GroupVersionKind().Kind, ds.GetObjectKind().GroupVersionKind().Group, ds.Name, "delete").Inc()
return false, nil
}

Expand Down Expand Up @@ -312,18 +353,18 @@ func (c *custom) provisioningParamsAreChanged(ctx context.Context, cfStackParams
return false, nil
}

func (c *custom) productOrArtifactIsChanged(ds *svcapitypes.ProvisionedProductParameters, resp *svcsdk.ProvisionedProductDetail) (bool, error) {
func (c *custom) productOrArtifactIsChanged(ds *svcapitypes.ProvisionedProduct, resp *svcsdk.ProvisionedProductDetail) (bool, error) {
// ProvisioningArtifactID and ProvisioningArtifactName are mutual exclusive params, the same about ProductID and ProductName
// But if describe a provisioned product aws api will return only IDs, so it's impossible to compare names with ids
// Conditional statement below works only if desired state includes ProvisioningArtifactID and ProductID
if ds.ProvisioningArtifactID != nil && ds.ProductID != nil &&
(*ds.ProvisioningArtifactID != *resp.ProvisioningArtifactId ||
*ds.ProductID != *resp.ProductId) {
if ds.Spec.ForProvider.ProvisioningArtifactID != nil && ds.Spec.ForProvider.ProductID != nil &&
(*ds.Spec.ForProvider.ProvisioningArtifactID != *resp.ProvisioningArtifactId ||
*ds.Spec.ForProvider.ProductID != *resp.ProductId) {
return true, nil
// In case if desired state includes not only IDs provider runs func `getArtifactID`, which produces
// additional request to aws api and retrieves an artifact id(even if it is already defined in the desired state)
// based on ProductId/ProductName for further comparison with artifact id in the current state
} else if ds.ProvisioningArtifactName != nil || ds.ProductName != nil {
} else if ds.Spec.ForProvider.ProvisioningArtifactName != nil || ds.Spec.ForProvider.ProductName != nil {
desiredArtifactID, err := c.getArtifactID(ds)
if err != nil {
return false, err
Expand All @@ -335,24 +376,25 @@ func (c *custom) productOrArtifactIsChanged(ds *svcapitypes.ProvisionedProductPa
return false, nil
}

func (c *custom) getArtifactID(ds *svcapitypes.ProvisionedProductParameters) (string, error) {
if ds.ProvisioningArtifactName != nil && ds.ProvisioningArtifactID != nil {
func (c *custom) getArtifactID(ds *svcapitypes.ProvisionedProduct) (string, error) {
if ds.Spec.ForProvider.ProvisioningArtifactName != nil && ds.Spec.ForProvider.ProvisioningArtifactID != nil {
return "", errors.Wrap(errors.New("artifact id and name are mutually exclusive"), errCouldNotLookupProduct)
}

input := svcsdk.DescribeProductInput{
Id: ds.ProductID,
Name: ds.ProductName,
Id: ds.Spec.ForProvider.ProductID,
Name: ds.Spec.ForProvider.ProductName,
}
// DescribeProvisioningArtifact method fits much better, but it has a bug
output, err := c.client.DescribeProduct(&input)
metrics.MetricAWSAPIRecCalls.WithLabelValues(ds.GetObjectKind().GroupVersionKind().Kind, ds.GetObjectKind().GroupVersionKind().Group, ds.Name, "observe").Inc()
if err != nil {
return "", errors.Wrap(err, errCouldNotLookupProduct)
}

for _, artifact := range output.ProvisioningArtifacts {
if pointer.StringDeref(ds.ProvisioningArtifactName, "") == *artifact.Name ||
pointer.StringDeref(ds.ProvisioningArtifactID, "") == *artifact.Id {
if pointer.StringDeref(ds.Spec.ForProvider.ProvisioningArtifactName, "") == *artifact.Name ||
pointer.StringDeref(ds.Spec.ForProvider.ProvisioningArtifactID, "") == *artifact.Id {
return *artifact.Id, nil
}
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/utils/metrics/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package metrics

import (
"github.com/prometheus/client_golang/prometheus"

k8smetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

Expand All @@ -11,11 +10,17 @@ var (
Name: "aws_api_calls_total",
Help: "Number of API calls to the AWS API",
}, []string{"service", "operation", "api_version"})
MetricAWSAPIRecCalls = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "aws_api_reconciliation_calls",
Help: "Number of calls of the AWS API produced by controller to provide reconciliation per managed resource and operation type",
}, []string{"service", "resource_group", "resource_name", "controller_operation_type"})
)

// SetupMetrics will register the known Prometheus metrics with controller-runtime's metrics registry
func SetupMetrics() error {
return k8smetrics.Registry.Register(metricAWSAPICalls)
func SetupMetrics() {
k8smetrics.Registry.MustRegister(
metricAWSAPICalls,
MetricAWSAPIRecCalls)
}

// IncAWSAPICall will increment the aws_api_calls_total metric for the specified service, operation, and apiVersion tuple
Expand Down

0 comments on commit a8d28fd

Please sign in to comment.