Skip to content

Commit

Permalink
FEAT: LBC metric collector (kubernetes-sigs#3941)
Browse files Browse the repository at this point in the history
  • Loading branch information
zac-nixon authored Nov 21, 2024
1 parent c701a42 commit 13538cb
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 77 deletions.
13 changes: 11 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
"sigs.k8s.io/aws-load-balancer-controller/pkg/inject"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
awsmetrics "sigs.k8s.io/aws-load-balancer-controller/pkg/metrics/aws"
lbcmetrics "sigs.k8s.io/aws-load-balancer-controller/pkg/metrics/lbc"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
Expand Down Expand Up @@ -81,7 +83,14 @@ func main() {
ctrl.SetLogger(appLogger)
klog.SetLoggerWithOptions(appLogger, klog.ContextualLogger(true))

cloud, err := aws.NewCloud(controllerCFG.AWSConfig, metrics.Registry, ctrl.Log, nil)
var awsMetricsCollector *awsmetrics.Collector
lbcMetricsCollector := lbcmetrics.NewCollector(metrics.Registry)

if metrics.Registry != nil {
awsMetricsCollector = awsmetrics.NewCollector(metrics.Registry)
}

cloud, err := aws.NewCloud(controllerCFG.AWSConfig, awsMetricsCollector, ctrl.Log, nil)
if err != nil {
setupLog.Error(err, "unable to initialize AWS cloud")
os.Exit(1)
Expand Down Expand Up @@ -113,7 +122,7 @@ func main() {
subnetResolver := networking.NewDefaultSubnetsResolver(azInfoProvider, cloud.EC2(), cloud.VpcID(), controllerCFG.ClusterName, ctrl.Log.WithName("subnets-resolver"))
multiClusterManager := targetgroupbinding.NewMultiClusterManager(mgr.GetClient(), mgr.GetAPIReader(), ctrl.Log)
tgbResManager := targetgroupbinding.NewDefaultResourceManager(mgr.GetClient(), cloud.ELBV2(), cloud.EC2(),
podInfoRepo, sgManager, sgReconciler, vpcInfoProvider, multiClusterManager,
podInfoRepo, sgManager, sgReconciler, vpcInfoProvider, multiClusterManager, lbcMetricsCollector,
cloud.VpcID(), controllerCFG.ClusterName, controllerCFG.FeatureGates.Enabled(config.EndpointsFailOpen), controllerCFG.EnableEndpointSlices, controllerCFG.DisableRestrictedSGRules,
controllerCFG.ServiceTargetENISGTags, mgr.GetEventRecorderFor("targetGroupBinding"), ctrl.Log)
backendSGProvider := networking.NewBackendSGProvider(controllerCFG.ClusterName, controllerCFG.BackendSecurityGroup,
Expand Down
13 changes: 4 additions & 9 deletions pkg/aws/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
smithymiddleware "github.com/aws/smithy-go/middleware"
"net"
"os"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/metrics"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/throttle"
"sigs.k8s.io/aws-load-balancer-controller/pkg/version"
"strings"
Expand All @@ -21,11 +20,11 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
amerrors "k8s.io/apimachinery/pkg/util/errors"
epresolver "sigs.k8s.io/aws-load-balancer-controller/pkg/aws/endpoints"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/provider"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
aws_metrics "sigs.k8s.io/aws-load-balancer-controller/pkg/metrics/aws"
)

const userAgent = "elbv2.k8s.aws"
Expand Down Expand Up @@ -60,7 +59,7 @@ type Cloud interface {
}

// NewCloud constructs new Cloud implementation.
func NewCloud(cfg CloudConfig, metricsRegisterer prometheus.Registerer, logger logr.Logger, awsClientsProvider provider.AWSClientsProvider) (Cloud, error) {
func NewCloud(cfg CloudConfig, metricsCollector *aws_metrics.Collector, logger logr.Logger, awsClientsProvider provider.AWSClientsProvider) (Cloud, error) {
hasIPv4 := true
addrs, err := net.InterfaceAddrs()
if err == nil {
Expand Down Expand Up @@ -122,12 +121,8 @@ func NewCloud(cfg CloudConfig, metricsRegisterer prometheus.Registerer, logger l
})
}

if metricsRegisterer != nil {
metricsCollector, err := metrics.NewCollector(metricsRegisterer)
if err != nil {
return nil, errors.Wrapf(err, "failed to initialize sdk metrics collector")
}
awsConfig.APIOptions = metrics.WithSDKMetricCollector(metricsCollector, awsConfig.APIOptions)
if metricsCollector != nil {
awsConfig.APIOptions = aws_metrics.WithSDKMetricCollector(metricsCollector, awsConfig.APIOptions)
}

if awsClientsProvider == nil {
Expand Down
21 changes: 9 additions & 12 deletions pkg/aws/metrics/collector.go → pkg/metrics/aws/collector.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package aws

import (
"context"
Expand All @@ -18,24 +18,21 @@ const (
sdkMiddlewareCollectAPIRequestMetric = "collectAPIRequestMetric"
)

type collector struct {
type Collector struct {
instruments *instruments
}

func NewCollector(registerer prometheus.Registerer) (*collector, error) {
instruments, err := newInstruments(registerer)
if err != nil {
return nil, err
}
return &collector{
func NewCollector(registerer prometheus.Registerer) *Collector {
instruments := newInstruments(registerer)
return &Collector{
instruments: instruments,
}, nil
}
}

/*
WithSDKMetricCollector is a function that collects prometheus metrics for the AWS SDK Go v2 API calls ad requests
*/
func WithSDKMetricCollector(c *collector, apiOptions []func(*smithymiddleware.Stack) error) []func(*smithymiddleware.Stack) error {
func WithSDKMetricCollector(c *Collector, apiOptions []func(*smithymiddleware.Stack) error) []func(*smithymiddleware.Stack) error {
apiOptions = append(apiOptions, func(stack *smithymiddleware.Stack) error {
return WithSDKCallMetricCollector(c)(stack)
}, func(stack *smithymiddleware.Stack) error {
Expand All @@ -48,7 +45,7 @@ func WithSDKMetricCollector(c *collector, apiOptions []func(*smithymiddleware.St
WithSDKCallMetricCollector is a middleware for the AWS SDK Go v2 that collects and reports metrics on API calls.
The call metrics are collected after the call is completed
*/
func WithSDKCallMetricCollector(c *collector) func(stack *smithymiddleware.Stack) error {
func WithSDKCallMetricCollector(c *Collector) func(stack *smithymiddleware.Stack) error {
return func(stack *smithymiddleware.Stack) error {
return stack.Initialize.Add(smithymiddleware.InitializeMiddlewareFunc(sdkMiddlewareCollectAPICallMetric, func(
ctx context.Context, input smithymiddleware.InitializeInput, next smithymiddleware.InitializeHandler,
Expand Down Expand Up @@ -91,7 +88,7 @@ func WithSDKCallMetricCollector(c *collector) func(stack *smithymiddleware.Stack
WithSDKRequestMetricCollector is a middleware for the AWS SDK Go v2 that collects and reports metrics on API requests.
The request metrics are collected after each retry attempts
*/
func WithSDKRequestMetricCollector(c *collector) func(stack *smithymiddleware.Stack) error {
func WithSDKRequestMetricCollector(c *Collector) func(stack *smithymiddleware.Stack) error {
return func(stack *smithymiddleware.Stack) error {
return stack.Finalize.Add(smithymiddleware.FinalizeMiddlewareFunc(sdkMiddlewareCollectAPIRequestMetric, func(
ctx context.Context, input smithymiddleware.FinalizeInput, next smithymiddleware.FinalizeHandler,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package aws

import (
"errors"
Expand Down
34 changes: 10 additions & 24 deletions pkg/aws/metrics/instruments.go → pkg/metrics/aws/instruments.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package metrics
package aws

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
metricSubsystemAWS = "aws"
metricSubSystem = "aws"

metricAPICallsTotal = "api_calls_total"
metricAPICallDurationSeconds = "api_call_duration_seconds"
Expand All @@ -31,55 +31,41 @@ type instruments struct {
}

// newInstruments allocates and register new metrics to registerer
func newInstruments(registerer prometheus.Registerer) (*instruments, error) {
func newInstruments(registerer prometheus.Registerer) *instruments {
apiCallsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: metricSubsystemAWS,
Subsystem: metricSubSystem,
Name: metricAPICallsTotal,
Help: "Total number of SDK API calls from the customer's code to AWS services",
}, []string{labelService, labelOperation, labelStatusCode, labelErrorCode})
apiCallDurationSeconds := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: metricSubsystemAWS,
Subsystem: metricSubSystem,
Name: metricAPICallDurationSeconds,
Help: "Perceived latency from when your code makes an SDK call, includes retries",
}, []string{labelService, labelOperation})
apiCallRetries := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: metricSubsystemAWS,
Subsystem: metricSubSystem,
Name: metricAPICallRetries,
Help: "Number of times the SDK retried requests to AWS services for SDK API calls",
Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
}, []string{labelService, labelOperation})

apiRequestsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: metricSubsystemAWS,
Subsystem: metricSubSystem,
Name: metricAPIRequestsTotal,
Help: "Total number of HTTP requests that the SDK made",
}, []string{labelService, labelOperation, labelStatusCode, labelErrorCode})
apiRequestDurationSecond := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: metricSubsystemAWS,
Subsystem: metricSubSystem,
Name: metricAPIRequestDurationSeconds,
Help: "Latency of an individual HTTP request to the service endpoint",
}, []string{labelService, labelOperation})

if err := registerer.Register(apiCallsTotal); err != nil {
return nil, err
}
if err := registerer.Register(apiCallDurationSeconds); err != nil {
return nil, err
}
if err := registerer.Register(apiCallRetries); err != nil {
return nil, err
}
if err := registerer.Register(apiRequestsTotal); err != nil {
return nil, err
}
if err := registerer.Register(apiRequestDurationSecond); err != nil {
return nil, err
}
registerer.MustRegister(apiCallsTotal, apiCallDurationSeconds, apiCallRetries, apiRequestsTotal, apiRequestDurationSecond)
return &instruments{
apiCallsTotal: apiCallsTotal,
apiCallDurationSeconds: apiCallDurationSeconds,
apiCallRetries: apiCallRetries,
apiRequestsTotal: apiRequestsTotal,
apiRequestDurationSecond: apiRequestDurationSecond,
}, nil
}
}
39 changes: 39 additions & 0 deletions pkg/metrics/lbc/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package lbc

import (
"github.com/prometheus/client_golang/prometheus"
"time"
)

type MetricCollector interface {
// ObservePodReadinessGateReady this metric is useful to determine how fast pods are becoming ready in the load balancer.
// Due to some architectural constraints, we can only emit this metric for pods that are using readiness gates.
ObservePodReadinessGateReady(namespace string, tgbName string, duration time.Duration)
}

type collector struct {
instruments *instruments
}

type noOpCollector struct{}

func (n *noOpCollector) ObservePodReadinessGateReady(_ string, _ string, _ time.Duration) {
}

func NewCollector(registerer prometheus.Registerer) MetricCollector {
if registerer == nil {
return &noOpCollector{}
}

instruments := newInstruments(registerer)
return &collector{
instruments: instruments,
}
}

func (c *collector) ObservePodReadinessGateReady(namespace string, tgbName string, duration time.Duration) {
c.instruments.podReadinessFlipSeconds.With(prometheus.Labels{
labelNamespace: namespace,
labelName: tgbName,
}).Observe(duration.Seconds())
}
38 changes: 38 additions & 0 deletions pkg/metrics/lbc/instruments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package lbc

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
metricSubsystem = "awslbc"
)

// These metrics are exported to be used in unit test validation.
const (
// MetricPodReadinessGateReady tracks the time to flip a readiness gate to true
MetricPodReadinessGateReady = "readiness_gate_ready_seconds"
)

const (
labelNamespace = "namespace"
labelName = "name"
)

type instruments struct {
podReadinessFlipSeconds *prometheus.HistogramVec
}

// newInstruments allocates and register new metrics to registerer
func newInstruments(registerer prometheus.Registerer) *instruments {
podReadinessFlipSeconds := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: metricSubsystem,
Name: MetricPodReadinessGateReady,
Help: "Latency from pod getting added to the load balancer until the readiness gate is flipped to healthy.",
}, []string{labelNamespace, labelName})

registerer.MustRegister(podReadinessFlipSeconds)
return &instruments{
podReadinessFlipSeconds: podReadinessFlipSeconds,
}
}
37 changes: 37 additions & 0 deletions pkg/metrics/lbc/mockcollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package lbc

import (
"time"
)

type MockCollector struct {
Invocations map[string][]interface{}
}

type MockHistogramMetric struct {
namespace string
name string
duration time.Duration
}

func (m *MockCollector) ObservePodReadinessGateReady(namespace string, tgbName string, d time.Duration) {
m.recordHistogram(MetricPodReadinessGateReady, namespace, tgbName, d)
}

func (m *MockCollector) recordHistogram(metricName string, namespace string, name string, d time.Duration) {
m.Invocations[metricName] = append(m.Invocations[MetricPodReadinessGateReady], MockHistogramMetric{
namespace: namespace,
name: name,
duration: d,
})
}

func NewMockCollector() MetricCollector {

mockInvocations := make(map[string][]interface{})
mockInvocations[MetricPodReadinessGateReady] = make([]interface{}, 0)

return &MockCollector{
Invocations: mockInvocations,
}
}
Loading

0 comments on commit 13538cb

Please sign in to comment.