Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor predictkube scaler config #6282

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 55 additions & 108 deletions pkg/scalers/predictkube_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,51 @@ type PredictKubeScaler struct {
}

type predictKubeMetadata struct {
predictHorizon time.Duration
historyTimeWindow time.Duration
stepDuration time.Duration
apiKey string
prometheusAddress string
prometheusAuth *authentication.AuthMeta
query string
threshold float64
activationThreshold float64
triggerIndex int
PrometheusAddress string `keda:"name=prometheusAddress, order=triggerMetadata"`
PrometheusAuth *authentication.Config `keda:"optional"`
Query string `keda:"name=query, order=triggerMetadata"`
PredictHorizon string `keda:"name=predictHorizon, order=triggerMetadata"`
QueryStep string `keda:"name=queryStep, order=triggerMetadata"`
HistoryTimeWindow string `keda:"name=historyTimeWindow, order=triggerMetadata"`
APIKey string `keda:"name=apiKey, order=authParams"`
Threshold float64 `keda:"name=threshold, order=triggerMetadata, optional"`
ActivationThreshold float64 `keda:"name=activationThreshold, order=triggerMetadata, optional"`

predictHorizon time.Duration
historyTimeWindow time.Duration
stepDuration time.Duration
triggerIndex int
}

func (p *predictKubeMetadata) Validate() error {
validate := validator.New()
err := validate.Var(p.PrometheusAddress, "url")
if err != nil {
return fmt.Errorf("invalid prometheusAddress")
}

p.predictHorizon, err = str2duration.ParseDuration(p.PredictHorizon)
if err != nil {
return fmt.Errorf("predictHorizon parsing error %w", err)
}

p.stepDuration, err = str2duration.ParseDuration(p.QueryStep)
if err != nil {
return fmt.Errorf("queryStep parsing error %w", err)
}

p.historyTimeWindow, err = str2duration.ParseDuration(p.HistoryTimeWindow)
if err != nil {
return fmt.Errorf("historyTimeWindow parsing error %w", err)
}

err = validate.Var(p.APIKey, "jwt")
if err != nil {
return fmt.Errorf("invalid apiKey")
}

return nil
}
func (s *PredictKubeScaler) setupClientConn() error {
clientOpt, err := pc.SetGrpcClientOptions(grpcConf,
&libs.Base{
Expand All @@ -108,7 +141,7 @@ func (s *PredictKubeScaler) setupClientConn() error {
Enabled: false,
},
},
pc.InjectPublicClientMetadataInterceptor(s.metadata.apiKey),
pc.InjectPublicClientMetadataInterceptor(s.metadata.APIKey),
)

if !grpcConf.Conn.Insecure {
Expand Down Expand Up @@ -186,7 +219,7 @@ func (s *PredictKubeScaler) GetMetricSpecForScaling(context.Context) []v2.Metric
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.threshold),
Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold),
}

metricSpec := v2.MetricSpec{
Expand All @@ -211,7 +244,7 @@ func (s *PredictKubeScaler) GetMetricsAndActivity(ctx context.Context, metricNam

metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, activationValue > s.metadata.activationThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, activationValue > s.metadata.ActivationThreshold, nil
}

func (s *PredictKubeScaler) doPredictRequest(ctx context.Context) (float64, float64, error) {
Expand Down Expand Up @@ -257,7 +290,7 @@ func (s *PredictKubeScaler) doQuery(ctx context.Context) ([]*commonproto.Item, e
Step: s.metadata.stepDuration,
}

val, warns, err := s.api.QueryRange(ctx, s.metadata.query, r)
val, warns, err := s.api.QueryRange(ctx, s.metadata.Query, r)

if len(warns) > 0 {
s.logger.V(1).Info("warnings", warns)
Expand Down Expand Up @@ -345,103 +378,17 @@ func (s *PredictKubeScaler) parsePrometheusResult(result model.Value) (out []*co
}

func parsePredictKubeMetadata(config *scalersconfig.ScalerConfig) (result *predictKubeMetadata, err error) {
validate := validator.New()
meta := predictKubeMetadata{}

if val, ok := config.TriggerMetadata["query"]; ok {
if len(val) == 0 {
return nil, fmt.Errorf("no query given")
}

meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["prometheusAddress"]; ok {
err = validate.Var(val, "url")
if err != nil {
return nil, fmt.Errorf("invalid prometheusAddress")
}

meta.prometheusAddress = val
} else {
return nil, fmt.Errorf("no prometheusAddress given")
}

if val, ok := config.TriggerMetadata["predictHorizon"]; ok {
predictHorizon, err := str2duration.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("predictHorizon parsing error %w", err)
}
meta.predictHorizon = predictHorizon
} else {
return nil, fmt.Errorf("no predictHorizon given")
}

if val, ok := config.TriggerMetadata["queryStep"]; ok {
stepDuration, err := str2duration.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("queryStep parsing error %w", err)
}
meta.stepDuration = stepDuration
} else {
return nil, fmt.Errorf("no queryStep given")
}

if val, ok := config.TriggerMetadata["historyTimeWindow"]; ok {
historyTimeWindow, err := str2duration.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("historyTimeWindow parsing error %w", err)
}
meta.historyTimeWindow = historyTimeWindow
} else {
return nil, fmt.Errorf("no historyTimeWindow given")
}

if val, ok := config.TriggerMetadata["threshold"]; ok {
threshold, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("threshold parsing error %w", err)
}
meta.threshold = threshold
} else {
if config.AsMetricSource {
meta.threshold = 0
} else {
return nil, fmt.Errorf("no threshold given")
}
meta := &predictKubeMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing arango metadata: %w", err)
}

meta.activationThreshold = 0
if val, ok := config.TriggerMetadata["activationThreshold"]; ok {
activationThreshold, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationThreshold parsing error %w", err)
}
meta.activationThreshold = activationThreshold
if !config.AsMetricSource && meta.Threshold == 0 {
return nil, fmt.Errorf("no threshold given")
}

meta.triggerIndex = config.TriggerIndex

if val, ok := config.AuthParams["apiKey"]; ok {
err = validate.Var(val, "jwt")
if err != nil {
return nil, fmt.Errorf("invalid apiKey")
}

meta.apiKey = val
} else {
return nil, fmt.Errorf("no api key given")
}

// parse auth configs from ScalerConfig
auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams)
if err != nil {
return nil, err
}
meta.prometheusAuth = auth
return &meta, nil
return meta, nil
}

func (s *PredictKubeScaler) ping(ctx context.Context) (err error) {
Expand All @@ -454,14 +401,14 @@ func (s *PredictKubeScaler) initPredictKubePrometheusConn(ctx context.Context) (
// create http.RoundTripper with auth settings from ScalerConfig
roundTripper, err := authentication.CreateHTTPRoundTripper(
authentication.FastHTTP,
s.metadata.prometheusAuth,
s.metadata.PrometheusAuth.ToAuthMeta(),
)
if err != nil {
s.logger.V(1).Error(err, "init Prometheus client http transport")
return err
}
client, err := api.NewClient(api.Config{
Address: s.metadata.prometheusAddress,
Address: s.metadata.PrometheusAddress,
RoundTripper: roundTripper,
})
if err != nil {
Expand Down
Loading