From f6223aa60e4759437294a6a9271f20f79eb14783 Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Thu, 24 Oct 2024 14:40:04 +0200 Subject: [PATCH] Refactor azure queue scaler Signed-off-by: rickbrouwer --- pkg/scalers/aws_sqs_queue_scaler.go | 1 + pkg/scalers/azure_queue_scaler.go | 146 ++++++++++--------------- pkg/scalers/azure_queue_scaler_test.go | 113 ++++++++++++++----- 3 files changed, 142 insertions(+), 118 deletions(-) diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index 1c5976685fb..a99851a87ea 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -20,6 +20,7 @@ import ( ) const ( + defaultTargetQueueLength = 5 targetQueueLengthDefault = 5 activationTargetQueueLengthDefault = 0 defaultScaleOnInFlight = true diff --git a/pkg/scalers/azure_queue_scaler.go b/pkg/scalers/azure_queue_scaler.go index 6f642ec04bf..8e22c9a191e 100644 --- a/pkg/scalers/azure_queue_scaler.go +++ b/pkg/scalers/azure_queue_scaler.go @@ -19,7 +19,6 @@ package scalers import ( "context" "fmt" - "strconv" "strings" "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" @@ -34,37 +33,47 @@ import ( ) const ( - queueLengthMetricName = "queueLength" - activationQueueLengthMetricName = "activationQueueLength" - defaultTargetQueueLength = 5 - externalMetricType = "External" - QueueLengthStrategyAll string = "all" - QueueLengthStrategyVisibleOnly string = "visibleonly" + externalMetricType = "External" + queueLengthStrategyAll = "all" + queueLengthStrategyVisibleOnly = "visibleonly" ) -var ( - maxPeekMessages int32 = 32 -) +var maxPeekMessages int32 = 32 type azureQueueScaler struct { metricType v2.MetricTargetType - metadata *azureQueueMetadata + metadata azureQueueMetadata queueClient *azqueue.QueueClient logger logr.Logger } type azureQueueMetadata struct { - targetQueueLength int64 - activationTargetQueueLength int64 - queueName string - connection string - accountName string - endpointSuffix string - queueLengthStrategy string - triggerIndex int + ActivationQueueLength int64 `keda:"name=activationQueueLength,order=triggerMetadata,default=0"` + QueueName string `keda:"name=queueName,order=triggerMetadata"` + QueueLength int64 `keda:"name=queueLength,order=triggerMetadata,default=5"` + Connection string `keda:"name=connection,order=authParams;triggerMetadata;resolvedEnv,optional"` + AccountName string `keda:"name=accountName,order=triggerMetadata,optional"` + EndpointSuffix string `keda:"name=endpointSuffix,order=triggerMetadata,optional"` + QueueLengthStrategy string `keda:"name=queueLengthStrategy,default=all"` + TriggerIndex int +} + +func (m *azureQueueMetadata) Validate() error { + if m.QueueName == "" { + return fmt.Errorf("no queueName given") + } + + if m.QueueLengthStrategy != "" { + strategy := strings.ToLower(m.QueueLengthStrategy) + if strategy != queueLengthStrategyAll && strategy != queueLengthStrategyVisibleOnly { + return fmt.Errorf("invalid queueLengthStrategy %s given", m.QueueLengthStrategy) + } + m.QueueLengthStrategy = strategy + } + + return nil } -// NewAzureQueueScaler creates a new scaler for queue func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) if err != nil { @@ -73,14 +82,14 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { logger := InitializeLogger(config, "azure_queue_scaler") - meta, podIdentity, err := parseAzureQueueMetadata(config, logger) + meta, podIdentity, err := parseAzureQueueMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing azure queue metadata: %w", err) } - queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.connection, meta.accountName, meta.endpointSuffix, meta.queueName, config.GlobalHTTPTimeout) + queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.Connection, meta.AccountName, meta.EndpointSuffix, meta.QueueName, config.GlobalHTTPTimeout) if err != nil { - return nil, fmt.Errorf("error creating azure blob client: %w", err) + return nil, fmt.Errorf("error creating azure queue client: %w", err) } return &azureQueueScaler{ @@ -91,105 +100,63 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { }, nil } -func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) { +func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig) (azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) { meta := azureQueueMetadata{} - meta.targetQueueLength = defaultTargetQueueLength - - if val, ok := config.TriggerMetadata[queueLengthMetricName]; ok { - queueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - logger.Error(err, "Error parsing azure queue metadata", "queueLengthMetricName", queueLengthMetricName) - return nil, kedav1alpha1.AuthPodIdentity{}, - fmt.Errorf("error parsing azure queue metadata %s: %w", queueLengthMetricName, err) - } - - meta.targetQueueLength = queueLength + err := config.TypedConfig(&meta) + if err != nil { + return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure queue metadata: %w", err) } - meta.activationTargetQueueLength = 0 - if val, ok := config.TriggerMetadata[activationQueueLengthMetricName]; ok { - activationQueueLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - logger.Error(err, "Error parsing azure queue metadata", activationQueueLengthMetricName, activationQueueLengthMetricName) - return nil, kedav1alpha1.AuthPodIdentity{}, - fmt.Errorf("error parsing azure queue metadata %s: %w", activationQueueLengthMetricName, err) - } - - meta.activationTargetQueueLength = activationQueueLength + err = meta.Validate() + if err != nil { + return meta, kedav1alpha1.AuthPodIdentity{}, err } endpointSuffix, err := azure.ParseAzureStorageEndpointSuffix(config.TriggerMetadata, azure.QueueEndpoint) if err != nil { - return nil, kedav1alpha1.AuthPodIdentity{}, err - } - - meta.endpointSuffix = endpointSuffix - - if val, ok := config.TriggerMetadata["queueName"]; ok && val != "" { - meta.queueName = val - } else { - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given") - } - - if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" { - strategy := strings.ToLower(val) - if strategy == QueueLengthStrategyAll || strategy == QueueLengthStrategyVisibleOnly { - meta.queueLengthStrategy = strategy - } else { - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val) - } - } else { - meta.queueLengthStrategy = QueueLengthStrategyAll + return meta, kedav1alpha1.AuthPodIdentity{}, err } + meta.EndpointSuffix = endpointSuffix // If the Use AAD Pod Identity is not present, or set to "none" // then check for connection string switch config.PodIdentity.Provider { case "", kedav1alpha1.PodIdentityProviderNone: // Azure Queue Scaler expects a "connection" parameter in the metadata - // of the scaler or in a TriggerAuthentication object - if config.AuthParams["connection"] != "" { - // Found the connection in a parameter from TriggerAuthentication - meta.connection = config.AuthParams["connection"] - } else if config.TriggerMetadata["connectionFromEnv"] != "" { - meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] - } - - if len(meta.connection) == 0 { - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given") + // of the scaler or in a TriggerAuthentication object + if meta.Connection == "" { + return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given") } case kedav1alpha1.PodIdentityProviderAzureWorkload: - // If the Use AAD Pod Identity is present then check account name - if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" { - meta.accountName = val - } else { - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given") + // If the Use AAD Pod Identity is present then check account name + if meta.AccountName == "" { + return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given") } default: - return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider) + return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider) } - meta.triggerIndex = config.TriggerIndex - - return &meta, config.PodIdentity, nil + meta.TriggerIndex = config.TriggerIndex + return meta, config.PodIdentity, nil } func (s *azureQueueScaler) Close(context.Context) error { return nil } +// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric func (s *azureQueueScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + metricName := kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.QueueName)) externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.queueName))), + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength), + Target: GetMetricTarget(s.metricType, s.metadata.QueueLength), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} } -// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { queuelen, err := s.getMessageCount(ctx) if err != nil { @@ -198,12 +165,11 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName } metric := GenerateMetricInMili(metricName, float64(queuelen)) - return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil + return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationQueueLength, nil } func (s *azureQueueScaler) getMessageCount(ctx context.Context) (int64, error) { - strategy := strings.ToLower(s.metadata.queueLengthStrategy) - if strategy == QueueLengthStrategyVisibleOnly { + if strings.ToLower(s.metadata.QueueLengthStrategy) == queueLengthStrategyVisibleOnly { queue, err := s.queueClient.PeekMessages(ctx, &azqueue.PeekMessagesOptions{NumberOfMessages: &maxPeekMessages}) if err != nil { return 0, err diff --git a/pkg/scalers/azure_queue_scaler_test.go b/pkg/scalers/azure_queue_scaler_test.go index a36da33123c..9eece331ef1 100644 --- a/pkg/scalers/azure_queue_scaler_test.go +++ b/pkg/scalers/azure_queue_scaler_test.go @@ -18,9 +18,12 @@ package scalers import ( "context" + "fmt" "testing" "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + v2 "k8s.io/api/autoscaling/v2" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" @@ -81,40 +84,94 @@ var azQueueMetricIdentifiers = []azQueueMetricIdentifier{ } func TestAzQueueParseMetadata(t *testing.T) { - for _, testData := range testAzQueueMetadata { - _, podIdentity, err := parseAzureQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, - ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, - PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.podIdentity}}, - logr.Discard()) - if err != nil && !testData.isError { - t.Error("Expected success but got error", err) - } - if testData.isError && err == nil { - t.Errorf("Expected error but got success. testData: %v", testData) - } - if testData.podIdentity != "" && testData.podIdentity != podIdentity.Provider && err == nil { - t.Error("Expected success but got error: podIdentity value is not returned as expected") + for i, testData := range testAzQueueMetadata { + testName := fmt.Sprintf("test case %d", i) + switch i { + case 0: + testName = "nothing passed" + case 1: + testName = "properly formed" + case 2: + testName = "empty queueName" + case 3: + testName = "improperly formed queueLength" + case 4: + testName = "improperly formed activationQueueLength" + case 5: + testName = "podIdentity azure-workload with account name" + case 6: + testName = "podIdentity azure-workload without account name" + case 7: + testName = "podIdentity azure-workload without queue name" + case 8: + testName = "podIdentity azure-workload with cloud" + case 9: + testName = "podIdentity azure-workload with invalid cloud" + case 10: + testName = "podIdentity azure-workload with private cloud and endpoint suffix" + case 11: + testName = "podIdentity azure-workload with private cloud and no endpoint suffix" + case 12: + testName = "podIdentity azure-workload with endpoint suffix and no cloud" + case 13: + testName = "connection from authParams" } + + t.Run(testName, func(t *testing.T) { + config := &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadata, + ResolvedEnv: testData.resolvedEnv, + AuthParams: testData.authParams, + PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.podIdentity}, + } + + _, podIdentity, err := parseAzureQueueMetadata(config) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Errorf("Expected error but got success. testData: %v", testData) + } + if testData.podIdentity != "" && testData.podIdentity != podIdentity.Provider && err == nil { + t.Error("Expected success but got error: podIdentity value is not returned as expected") + } + }) } } func TestAzQueueGetMetricSpecForScaling(t *testing.T) { - for _, testData := range azQueueMetricIdentifiers { - meta, _, err := parseAzureQueueMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, - ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams, - PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.metadataTestData.podIdentity}, TriggerIndex: testData.triggerIndex}, - logr.Discard()) - if err != nil { - t.Fatal("Could not parse metadata:", err) - } - mockAzQueueScaler := azureQueueScaler{ - metadata: meta, + for i, testData := range azQueueMetricIdentifiers { + testName := fmt.Sprintf("test case %d", i) + switch i { + case 0: + testName = "properly formed queue metric" + case 1: + testName = "azure-workload queue metric" } - metricSpec := mockAzQueueScaler.GetMetricSpecForScaling(context.Background()) - metricName := metricSpec[0].External.Metric.Name - if metricName != testData.name { - t.Error("Wrong External metric source name:", metricName) - } + t.Run(testName, func(t *testing.T) { + config := &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadataTestData.metadata, + ResolvedEnv: testData.metadataTestData.resolvedEnv, + AuthParams: testData.metadataTestData.authParams, + PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: testData.metadataTestData.podIdentity}, + TriggerIndex: testData.triggerIndex, + } + + meta, _, err := parseAzureQueueMetadata(config) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + + mockAzQueueScaler := azureQueueScaler{ + metadata: meta, + logger: logr.Discard(), + metricType: v2.AverageValueMetricType, + } + + metricSpec := mockAzQueueScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + assert.Equal(t, testData.name, metricName) + }) } }