From d55f4a007b38d7f749dc5e08d0f806379577da0b Mon Sep 17 00:00:00 2001 From: wangrushen Date: Tue, 29 Oct 2024 23:06:19 +0800 Subject: [PATCH] Refactor predictkube scaler config Signed-off-by: wangrushen --- pkg/scalers/predictkube_scaler.go | 163 ++++++++++-------------------- 1 file changed, 55 insertions(+), 108 deletions(-) diff --git a/pkg/scalers/predictkube_scaler.go b/pkg/scalers/predictkube_scaler.go index 78e2e5b446c..fe80fda1fca 100644 --- a/pkg/scalers/predictkube_scaler.go +++ b/pkg/scalers/predictkube_scaler.go @@ -83,18 +83,51 @@ type PredictKubeScaler struct { } type predictKubeMetadata struct { - predictHorizon time.Duration - historyTimeWindow time.Duration - stepDuration time.Duration - apiKey string - prometheusAddress string - prometheusAuth *authentication.AuthMeta - query string - threshold float64 - activationThreshold float64 - triggerIndex int + PrometheusAddress string `keda:"name=prometheusAddress, order=triggerMetadata"` + PrometheusAuth *authentication.Config `keda:"optional"` + Query string `keda:"name=query, order=triggerMetadata"` + PredictHorizon string `keda:"name=predictHorizon, order=triggerMetadata"` + QueryStep string `keda:"name=queryStep, order=triggerMetadata"` + HistoryTimeWindow string `keda:"name=historyTimeWindow, order=triggerMetadata"` + APIKey string `keda:"name=apiKey, order=authParams"` + Threshold float64 `keda:"name=threshold, order=triggerMetadata, optional"` + ActivationThreshold float64 `keda:"name=activationThreshold, order=triggerMetadata, optional"` + + predictHorizon time.Duration + historyTimeWindow time.Duration + stepDuration time.Duration + triggerIndex int } +func (p *predictKubeMetadata) Validate() error { + validate := validator.New() + err := validate.Var(p.PrometheusAddress, "url") + if err != nil { + return fmt.Errorf("invalid prometheusAddress") + } + + p.predictHorizon, err = str2duration.ParseDuration(p.PredictHorizon) + if err != nil { + return fmt.Errorf("predictHorizon parsing error %w", err) + } + + p.stepDuration, err = str2duration.ParseDuration(p.QueryStep) + if err != nil { + return fmt.Errorf("queryStep parsing error %w", err) + } + + p.historyTimeWindow, err = str2duration.ParseDuration(p.HistoryTimeWindow) + if err != nil { + return fmt.Errorf("historyTimeWindow parsing error %w", err) + } + + err = validate.Var(p.APIKey, "jwt") + if err != nil { + return fmt.Errorf("invalid apiKey") + } + + return nil +} func (s *PredictKubeScaler) setupClientConn() error { clientOpt, err := pc.SetGrpcClientOptions(grpcConf, &libs.Base{ @@ -108,7 +141,7 @@ func (s *PredictKubeScaler) setupClientConn() error { Enabled: false, }, }, - pc.InjectPublicClientMetadataInterceptor(s.metadata.apiKey), + pc.InjectPublicClientMetadataInterceptor(s.metadata.APIKey), ) if !grpcConf.Conn.Insecure { @@ -186,7 +219,7 @@ func (s *PredictKubeScaler) GetMetricSpecForScaling(context.Context) []v2.Metric Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.threshold), + Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold), } metricSpec := v2.MetricSpec{ @@ -211,7 +244,7 @@ func (s *PredictKubeScaler) GetMetricsAndActivity(ctx context.Context, metricNam metric := GenerateMetricInMili(metricName, value) - return []external_metrics.ExternalMetricValue{metric}, activationValue > s.metadata.activationThreshold, nil + return []external_metrics.ExternalMetricValue{metric}, activationValue > s.metadata.ActivationThreshold, nil } func (s *PredictKubeScaler) doPredictRequest(ctx context.Context) (float64, float64, error) { @@ -257,7 +290,7 @@ func (s *PredictKubeScaler) doQuery(ctx context.Context) ([]*commonproto.Item, e Step: s.metadata.stepDuration, } - val, warns, err := s.api.QueryRange(ctx, s.metadata.query, r) + val, warns, err := s.api.QueryRange(ctx, s.metadata.Query, r) if len(warns) > 0 { s.logger.V(1).Info("warnings", warns) @@ -345,103 +378,17 @@ func (s *PredictKubeScaler) parsePrometheusResult(result model.Value) (out []*co } func parsePredictKubeMetadata(config *scalersconfig.ScalerConfig) (result *predictKubeMetadata, err error) { - validate := validator.New() - meta := predictKubeMetadata{} - - if val, ok := config.TriggerMetadata["query"]; ok { - if len(val) == 0 { - return nil, fmt.Errorf("no query given") - } - - meta.query = val - } else { - return nil, fmt.Errorf("no query given") - } - - if val, ok := config.TriggerMetadata["prometheusAddress"]; ok { - err = validate.Var(val, "url") - if err != nil { - return nil, fmt.Errorf("invalid prometheusAddress") - } - - meta.prometheusAddress = val - } else { - return nil, fmt.Errorf("no prometheusAddress given") - } - - if val, ok := config.TriggerMetadata["predictHorizon"]; ok { - predictHorizon, err := str2duration.ParseDuration(val) - if err != nil { - return nil, fmt.Errorf("predictHorizon parsing error %w", err) - } - meta.predictHorizon = predictHorizon - } else { - return nil, fmt.Errorf("no predictHorizon given") - } - - if val, ok := config.TriggerMetadata["queryStep"]; ok { - stepDuration, err := str2duration.ParseDuration(val) - if err != nil { - return nil, fmt.Errorf("queryStep parsing error %w", err) - } - meta.stepDuration = stepDuration - } else { - return nil, fmt.Errorf("no queryStep given") - } - - if val, ok := config.TriggerMetadata["historyTimeWindow"]; ok { - historyTimeWindow, err := str2duration.ParseDuration(val) - if err != nil { - return nil, fmt.Errorf("historyTimeWindow parsing error %w", err) - } - meta.historyTimeWindow = historyTimeWindow - } else { - return nil, fmt.Errorf("no historyTimeWindow given") - } - - if val, ok := config.TriggerMetadata["threshold"]; ok { - threshold, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("threshold parsing error %w", err) - } - meta.threshold = threshold - } else { - if config.AsMetricSource { - meta.threshold = 0 - } else { - return nil, fmt.Errorf("no threshold given") - } + meta := &predictKubeMetadata{} + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing arango metadata: %w", err) } - meta.activationThreshold = 0 - if val, ok := config.TriggerMetadata["activationThreshold"]; ok { - activationThreshold, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationThreshold parsing error %w", err) - } - meta.activationThreshold = activationThreshold + if !config.AsMetricSource && meta.Threshold == 0 { + return nil, fmt.Errorf("no threshold given") } meta.triggerIndex = config.TriggerIndex - - if val, ok := config.AuthParams["apiKey"]; ok { - err = validate.Var(val, "jwt") - if err != nil { - return nil, fmt.Errorf("invalid apiKey") - } - - meta.apiKey = val - } else { - return nil, fmt.Errorf("no api key given") - } - - // parse auth configs from ScalerConfig - auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) - if err != nil { - return nil, err - } - meta.prometheusAuth = auth - return &meta, nil + return meta, nil } func (s *PredictKubeScaler) ping(ctx context.Context) (err error) { @@ -454,14 +401,14 @@ func (s *PredictKubeScaler) initPredictKubePrometheusConn(ctx context.Context) ( // create http.RoundTripper with auth settings from ScalerConfig roundTripper, err := authentication.CreateHTTPRoundTripper( authentication.FastHTTP, - s.metadata.prometheusAuth, + s.metadata.PrometheusAuth.ToAuthMeta(), ) if err != nil { s.logger.V(1).Error(err, "init Prometheus client http transport") return err } client, err := api.NewClient(api.Config{ - Address: s.metadata.prometheusAddress, + Address: s.metadata.PrometheusAddress, RoundTripper: roundTripper, }) if err != nil {