Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

git add managed resource metrics with feature flag(implemented for Provis… #12

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func main() {
leaderElection = app.Flag("leader-election", "Use leader election for the conroller manager.").Short('l').Default("false").OverrideDefaultFromEnvar("LEADER_ELECTION").Bool()
maxReconcileRate = app.Flag("max-reconcile-rate", "The global maximum rate per second at which resources may checked for drift from the desired state.").Default("10").Int()

namespace = app.Flag("namespace", "Namespace used to set as default scope in default secret store config.").Default("crossplane-system").Envar("POD_NAMESPACE").String()
enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool()
enableManagementPolicies = app.Flag("enable-management-policies", "Enable support for Management Policies.").Default("false").Envar("ENABLE_MANAGEMENT_POLICIES").Bool()
namespace = app.Flag("namespace", "Namespace used to set as default scope in default secret store config.").Default("crossplane-system").Envar("POD_NAMESPACE").String()
enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool()
enableManagementPolicies = app.Flag("enable-management-policies", "Enable support for Management Policies.").Default("false").Envar("ENABLE_MANAGEMENT_POLICIES").Bool()
enableManagedResourceMetrics = app.Flag("enable-managed-resource-metrics", "Enable prometheus metrics for every managed resource").Default("false").Envar("ENABLE_MANAGED_RESOURCE_METRICS").Bool()
)
kingpin.MustParse(app.Parse(os.Args[1:]))

Expand Down Expand Up @@ -130,7 +131,12 @@ func main() {
log.Info("Alpha feature enabled", "flag", features.EnableAlphaManagementPolicies)
}

kingpin.FatalIfError(metrics.SetupMetrics(), "Cannot setup AWS metrics hook")
if *enableManagedResourceMetrics {
o.Features.Enable(features.EnableManagedResourceMetrics)
log.Info("Managed resource prometheus metrics are enabled", "flag", features.EnableManagedResourceMetrics)
}

kingpin.FatalIfError(metrics.SetupMetrics(o.Features), "Cannot setup AWS metrics")
kingpin.FatalIfError(controller.Setup(mgr, o), "Cannot setup AWS controllers")
kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")

Expand Down
144 changes: 115 additions & 29 deletions pkg/controller/servicecatalog/provisionedproduct/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strings"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
cfsdkv2 "github.com/aws/aws-sdk-go-v2/service/cloudformation"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -33,18 +32,23 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/resource"
cpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

svcapitypes "github.com/crossplane-contrib/provider-aws/apis/servicecatalog/v1alpha1"
"github.com/crossplane-contrib/provider-aws/apis/v1alpha1"
clientset "github.com/crossplane-contrib/provider-aws/pkg/clients/servicecatalog"
"github.com/crossplane-contrib/provider-aws/pkg/features"
awsclient "github.com/crossplane-contrib/provider-aws/pkg/utils/connect/aws"
"github.com/crossplane-contrib/provider-aws/pkg/utils/metrics"
"github.com/crossplane-contrib/provider-aws/pkg/utils/pointer"
custommanaged "github.com/crossplane-contrib/provider-aws/pkg/utils/reconciler/managed"
)
Expand All @@ -64,34 +68,97 @@ const (
errAwsAPICodeInvalidParametersException = "Last Successful Provisioning Record doesn't exist."
)

type customConnector struct {
kube client.Client
options controller.Options
}

type custom struct {
*external
client clientset.Client
cache cache
client clientset.Client
cache cache
metrics metricsRec
}

type cache struct {
describeProductOutput *svcsdk.DescribeProductOutput
getProvisionedProductOutputs []*svcsdk.RecordOutput
lastProvisioningParameters []*svcapitypes.ProvisioningParameter
}

type metricsRec struct {
enabled bool
create prometheus.Counter
observe prometheus.Counter
update prometheus.Counter
delete prometheus.Counter
}

func (c *customConnector) Connect(ctx context.Context, mg cpresource.Managed) (managed.ExternalClient, error) {
cr, ok := mg.(*svcapitypes.ProvisionedProduct)
if !ok {
return nil, errors.New(errUnexpectedObject)
}
sess, err := awsclient.GetConfigV1(ctx, c.kube, mg, cr.Spec.ForProvider.Region)
if err != nil {
return nil, errors.Wrap(err, errCreateSession)
}

awsCfg, err := awsclient.GetConfig(ctx, c.kube, mg, cr.Spec.ForProvider.Region)
if err != nil {
return nil, errors.Wrap(err, errCreateSession)
}
cfClient := cfsdkv2.NewFromConfig(*awsCfg)
svcClient := svcsdk.New(sess)
cust := &custom{
client: &clientset.CustomServiceCatalogClient{
CfClient: cfClient,
Client: svcClient},
}
// We do not re-implement all the ExternalClient interface, so we want
// to reuse the generated one as much as we can (mostly for the Observe,
// Create, Update, Delete methods which call all of our custom hooks)
cust.external = &external{
kube: c.kube,
client: svcClient,

isUpToDate: cust.isUpToDate,
lateInitialize: cust.lateInitialize,
preObserve: cust.preObserve,
postObserve: cust.postObserve,
preUpdate: cust.preUpdate,
preCreate: cust.preCreate,
preDelete: cust.preDelete,

postCreate: nopPostCreate,
postDelete: nopPostDelete,
postUpdate: nopPostUpdate,
}

if c.options.Features.Enabled(features.EnableManagedResourceMetrics) {
cust.metrics.enabled = c.options.Features.Enabled(features.EnableManagedResourceMetrics)
customResourceAPIVersion := fmt.Sprintf("%s/%s", cr.GetObjectKind().GroupVersionKind().Group, cr.GetObjectKind().GroupVersionKind().Version)
metrics.MetricManagedResRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name).Inc()
cust.metrics.create = metrics.MetricAWSAPICallsRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name, "create")
cust.metrics.observe = metrics.MetricAWSAPICallsRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name, "observe")
cust.metrics.update = metrics.MetricAWSAPICallsRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name, "update")
cust.metrics.delete = metrics.MetricAWSAPICallsRec.WithLabelValues(customResourceAPIVersion, cr.GetObjectKind().GroupVersionKind().Kind, cr.Name, "delete")
}

return cust.external, nil
}

// SetupProvisionedProduct adds a controller that reconciles a ProvisionedProduct
func SetupProvisionedProduct(mgr ctrl.Manager, o controller.Options) error {
name := managed.ControllerName(svcapitypes.ProvisionedProductKind)
awsCfg, err := awsconfig.LoadDefaultConfig(context.TODO())
if err != nil {
return err
}
cfClient := cfsdkv2.NewFromConfig(awsCfg)
opts := []option{prepareSetupExternal(cfClient)}
cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())}
if o.Features.Enabled(features.EnableAlphaExternalSecretStores) {
cps = append(cps, connection.NewDetailsManager(mgr.GetClient(), v1alpha1.StoreConfigGroupVersionKind))
}

reconcilerOpts := []managed.ReconcilerOption{
managed.WithCriticalAnnotationUpdater(custommanaged.NewRetryingCriticalAnnotationUpdater(mgr.GetClient())),
managed.WithExternalConnecter(&connector{kube: mgr.GetClient(), opts: opts}),
managed.WithExternalConnecter(&customConnector{kube: mgr.GetClient(), options: o}),
managed.WithInitializers(managed.NewNameAsExternalName(mgr.GetClient())),
managed.WithPollInterval(o.PollInterval),
managed.WithLogger(o.Logger.WithValues("controller", name)),
Expand All @@ -113,40 +180,36 @@ func SetupProvisionedProduct(mgr ctrl.Manager, o controller.Options) error {
reconcilerOpts...))
}

func prepareSetupExternal(cfClient *cfsdkv2.Client) func(*external) {
return func(e *external) {
c := &custom{client: &clientset.CustomServiceCatalogClient{CfClient: cfClient, Client: e.client}}
e.preCreate = preCreate
e.preUpdate = c.preUpdate
e.lateInitialize = c.lateInitialize
e.isUpToDate = c.isUpToDate
e.preObserve = c.preObserve
e.postObserve = c.postObserve
e.preDelete = preDelete
}
}

func (c *custom) lateInitialize(spec *svcapitypes.ProvisionedProductParameters, _ *svcsdk.DescribeProvisionedProductOutput) error {
acceptLanguageEnglish := acceptLanguageEnglish
spec.AcceptLanguage = pointer.LateInitialize(spec.AcceptLanguage, &acceptLanguageEnglish)
return nil
}

func preCreate(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.ProvisionProductInput) error {
func (c *custom) preCreate(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.ProvisionProductInput) error {
input.ProvisionToken = aws.String(genIdempotencyToken())
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
if c.metrics.enabled {
c.metrics.create.Inc()
}
return nil
}

func (c *custom) preUpdate(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.UpdateProvisionedProductInput) error {
input.UpdateToken = aws.String(genIdempotencyToken())
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
if c.metrics.enabled {
c.metrics.update.Inc()
}
return nil
}

func (c *custom) preObserve(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.DescribeProvisionedProductInput) error {
input.Name = aws.String(meta.GetExternalName(ds))
c.cache.lastProvisioningParameters = ds.Status.AtProvider.DeepCopy().LastProvisioningParameters
if c.metrics.enabled {
c.metrics.observe.Inc()
}
return nil
}

Expand All @@ -162,6 +225,9 @@ func (c *custom) isUpToDate(_ context.Context, ds *svcapitypes.ProvisionedProduc

getPPOutputInput := &svcsdk.GetProvisionedProductOutputsInput{ProvisionedProductId: resp.ProvisionedProductDetail.Id}
getPPOutput, err := c.client.GetProvisionedProductOutputs(getPPOutputInput)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
if err != nil {
// We want to specifically handle this exception, since it will occur when something
// is wrong with the provisioned product (error on creation, tainted, etc.)
Expand Down Expand Up @@ -196,6 +262,9 @@ func (c *custom) postObserve(_ context.Context, ds *svcapitypes.ProvisionedProdu

describeRecordInput := svcsdk.DescribeRecordInput{Id: resp.ProvisionedProductDetail.LastRecordId}
describeRecordOutput, err := c.client.DescribeRecord(&describeRecordInput)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, errCouldNotDescribeRecord)
}
Expand Down Expand Up @@ -225,12 +294,15 @@ func (c *custom) postObserve(_ context.Context, ds *svcapitypes.ProvisionedProdu
return obs, nil
}

func preDelete(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.TerminateProvisionedProductInput) (bool, error) {
func (c custom) preDelete(_ context.Context, ds *svcapitypes.ProvisionedProduct, input *svcsdk.TerminateProvisionedProductInput) (bool, error) {
if ptr.Deref(ds.Status.AtProvider.Status, "") == string(svcapitypes.ProvisionedProductStatus_SDK_UNDER_CHANGE) {
return true, nil
}
input.TerminateToken = aws.String(genIdempotencyToken())
input.ProvisionedProductName = aws.String(meta.GetExternalName(ds))
if c.metrics.enabled {
c.metrics.delete.Inc()
}
return false, nil
}

Expand Down Expand Up @@ -275,6 +347,9 @@ func (c *custom) provisioningParamsAreChanged(ds *svcapitypes.ProvisionedProduct
}

cfStackParams, err := c.client.GetCloudformationStackParameters(c.cache.getProvisionedProductOutputs)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
if err != nil {
return false, "", errors.Wrap(err, errCouldNotGetCFParameters)
}
Expand Down Expand Up @@ -358,9 +433,13 @@ func (c *custom) getArtifactID(ds *svcapitypes.ProvisionedProduct) (string, erro
}
// DescribeProvisioningArtifact method fits much better, but it has a bug - it returns nothing if a product is a part of imported portfolio
output, err := c.client.DescribeProduct(&input)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
if err != nil {
return "", errors.Wrap(err, errCouldNotLookupProduct)
}
c.cache.describeProductOutput = output
for _, artifact := range output.ProvisioningArtifacts {
if ptr.Deref(ds.Spec.ForProvider.ProvisioningArtifactName, "") == *artifact.Name ||
ptr.Deref(ds.Spec.ForProvider.ProvisioningArtifactID, "") == *artifact.Id {
Expand All @@ -372,10 +451,17 @@ func (c *custom) getArtifactID(ds *svcapitypes.ProvisionedProduct) (string, erro

func (c *custom) getProductID(productName *string) (string, error) {
input := svcsdk.DescribeProductInput{Name: productName}
// DescribeProvisioningArtifact method fits much better, but it has a bug - it returns nothing if a product is a part of imported portfolio
output, err := c.client.DescribeProduct(&input)
if err != nil {
return "", errors.Wrap(err, errCouldNotLookupProduct)
output := c.cache.describeProductOutput
if output == nil {
// DescribeProvisioningArtifact method fits much better, but it has a bug - it returns nothing if a product is a part of imported portfolio
o, err := c.client.DescribeProduct(&input)
if c.metrics.enabled {
c.metrics.observe.Inc()
}
output = o
if err != nil {
return "", errors.Wrap(err, errCouldNotLookupProduct)
}
}
return ptr.Deref(output.ProductViewSummary.ProductId, ""), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func describeProvisionedProduct(m ...describeProvisionedProductOutputModifier) *
func setupFakeExternal(fakeClient clientset.Client, cache cache) func(*external) {
return func(e *external) {
c := &custom{client: fakeClient, cache: cache}
e.preCreate = preCreate
e.preCreate = c.preCreate
e.preUpdate = c.preUpdate
e.lateInitialize = c.lateInitialize
e.isUpToDate = c.isUpToDate
e.preObserve = c.preObserve
e.postObserve = c.postObserve
e.preDelete = preDelete
e.preDelete = c.preDelete
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ const (
// Management Policies. See the below design for more details.
// https://github.com/crossplane/crossplane/pull/3531
EnableAlphaManagementPolicies feature.Flag = "EnableAlphaManagementPolicies"

// EnableManagedResourceMetrics enables additional metrics for every managed resource, if implemented
EnableManagedResourceMetrics feature.Flag = "EnableManagedResourceMetrics"
)
41 changes: 38 additions & 3 deletions pkg/utils/metrics/setup.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,55 @@
package metrics

import (
"github.com/crossplane/crossplane-runtime/pkg/feature"
"github.com/prometheus/client_golang/prometheus"
k8smetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/crossplane-contrib/provider-aws/pkg/features"
)

var (
metricAWSAPICalls = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "aws_api_calls_total",
Help: "Number of API calls to the AWS API",
}, []string{"service", "operation", "api_version"})

MetricAWSAPICallsRec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "aws_api_calls_reconcile_managed_resource_total",
Help: "Amount of calls to the AWS API produced by controller per reconciliation for every managed resource and controller operation type",
}, []string{"api_version", "kind", "resource_name", "controller_operation_type"})

MetricManagedResRec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "managed_resource_reconciles_total",
Help: "Total amount of reconciliation loops per managed resource",
}, []string{"api_version", "kind", "resource_name"})
)

type metric interface {
Describe(chan<- *prometheus.Desc)
Collect(chan<- prometheus.Metric)
}

// SetupMetrics will register the known Prometheus metrics with controller-runtime's metrics registry
func SetupMetrics() error {
return k8smetrics.Registry.Register(metricAWSAPICalls)
func SetupMetrics(flags *feature.Flags) error {
metricsList := []metric{
metricAWSAPICalls,
}
managedResourceMetricsList := []metric{
MetricAWSAPICallsRec,
MetricManagedResRec,
}

if flags.Enabled(features.EnableManagedResourceMetrics) {
metricsList = append(metricsList, managedResourceMetricsList...)
}
for _, m := range metricsList {
err := metrics.Registry.Register(m)
if err != nil {
return err
}
}
return nil
}

// IncAWSAPICall will increment the aws_api_calls_total metric for the specified service, operation, and apiVersion tuple
Expand Down