Skip to content

Commit

Permalink
option to query metric only on polling interval
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik committed Dec 2, 2022
1 parent a489ca7 commit 829a541
Show file tree
Hide file tree
Showing 85 changed files with 679 additions and 1,135 deletions.
42 changes: 29 additions & 13 deletions CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,42 @@ If you want to deploy locally

The scalers in KEDA are implementations of a KEDA `Scaler` Go interface declared in `pkg/scalers/scaler.go`. This documentation describes how scalers work and is targeted towards contributors and maintainers.

### GetMetrics
### GetMetricsAndActivity

This is the key function of a scaler; it returns a value that represents a current state of an external metric (e.g. length of a queue). The return type is an `ExternalMetricValue` struct which has the following fields:
- `MetricName`: this is the name of the metric that we are returning. The name should be unique, to allow setting multiple (even the same type) Triggers in one ScaledObject, but each function call should return the same name.
- `Timestamp`: indicates the time at which the metrics were produced.
- `WindowSeconds`: //TODO
- `Value`: A numerical value that represents the state of the metric. It could be the length of a queue, or it can be the amount of lag in a stream, but it can also be a simple representation of the state.
This is the key function of a scaler; it returns:

Kubernetes HPA (Horizontal Pod Autoscaler) will poll `GetMetrics` regularly through KEDA's metric server (as long as there is at least one pod), and compare the returned value to a configured value in the ScaledObject configuration. Kubernetes will use the following formula to decide whether to scale the pods up and down:
1. A value that represents a current state of an external metric (e.g. length of a queue). The return type is an `ExternalMetricValue` struct which has the following fields:
- `MetricName`: this is the name of the metric that we are returning. The name should be unique, to allow setting multiple (even the same type) Triggers in one ScaledObject, but each function call should return the same name.
- `Timestamp`: indicates the time at which the metrics were produced.
- `WindowSeconds`: //TODO
- `Value`: A numerical value that represents the state of the metric. It could be the length of a queue, or it can be the amount of lag in a stream, but it can also be a simple representation of the state.
2. A value that represents an activity of the scaler. The return type is `bool`.

KEDA polls ScaledObject object according to the `pollingInterval` configured in the ScaledObject; it checks the last time it was polled, it checks if the number of replicas is greater than 0, and if the scaler itself is active. So if the scaler returns false for `IsActive`, and if current number of replicas is greater than 0, and there is no configured minimum pods, then KEDA scales down to 0.

Kubernetes HPA (Horizontal Pod Autoscaler) will poll `GetMetricsAndActivity` regularly through KEDA's metric server (as long as there is at least one pod), and compare the returned value to a configured value in the ScaledObject configuration. Kubernetes will use the following formula to decide whether to scale the pods up and down:

`desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]`.

For more details check [Kubernetes HPA documentation](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/).

<br>Next lines are an example about how to use it:
>```golang
func (s *artemisScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
messages, err := s.getQueueMessageCount(ctx)
if err != nil {
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return []external_metrics.ExternalMetricValue{}, false, err
}
metric := GenerateMetricInMili(metricName, float64(messages))
return []external_metrics.ExternalMetricValue{metric}, messages > s.metadata.activationQueueLength, nil
}
>```
### GetMetricSpecForScaling
KEDA works in conjunction with Kubernetes Horizontal Pod Autoscaler (HPA). When KEDA notices a new ScaledObject, it creates an HPA object that has basic information about the metric it needs to poll and scale the pods accordingly. To create this HPA object, KEDA invokes `GetMetricSpecForScaling`.
Expand Down Expand Up @@ -64,12 +86,6 @@ For example:
>```
### IsActive
For some reason, the scaler might need to declare itself as in-active, and the way it can do this is through implementing the function `IsActive`.
KEDA polls ScaledObject object according to the `pollingInterval` configured in the ScaledObject; it checks the last time it was polled, it checks if the number of replicas is greater than 0, and if the scaler itself is active. So if the scaler returns false for `IsActive`, and if current number of replicas is greater than 0, and there is no configured minimum pods, then KEDA scales down to 0.
### Close
After each poll on the scaler to retrieve the metrics, KEDA calls this function for each scaler to give the scaler the opportunity to close any resources, like http clients for example.
Expand Down
10 changes: 9 additions & 1 deletion apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ type ScaleTarget struct {
type ScaleTriggers struct {
Type string `json:"type"`
// +optional
Name string `json:"name,omitempty"`
Name string `json:"name,omitempty"`

QueryMetricsOnPollingInterval bool `json:"queryMetricsOnPollingInterval,omitempty"`

Metadata map[string]string `json:"metadata"`
// +optional
AuthenticationRef *ScaledObjectAuthRef `json:"authenticationRef,omitempty"`
Expand Down Expand Up @@ -179,3 +182,8 @@ type ScaledObjectAuthRef struct {
func init() {
SchemeBuilder.Register(&ScaledObject{}, &ScaledObjectList{})
}

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (s *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier(s.Kind, s.Namespace, s.Name)
}
5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7861,6 +7861,11 @@ spec:
required:
- name
type: object
cacheDuration:
format: int32
type: integer
enableCache:
type: boolean
metadata:
additionalProperties:
type: string
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ spec:
required:
- name
type: object
cacheDuration:
format: int32
type: integer
enableCache:
type: boolean
metadata:
additionalProperties:
type: string
Expand Down
2 changes: 1 addition & 1 deletion config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: ghcr.io/kedacore/keda
newName: ghcr.io/kedacore/keda
newName: quay.io/zroubalik/keda
newTag: main
2 changes: 1 addition & 1 deletion config/metrics-server/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: ghcr.io/kedacore/keda-metrics-apiserver
newName: ghcr.io/kedacore/keda-metrics-apiserver
newName: quay.io/zroubalik/keda-metrics-apiserver
newTag: main
1 change: 0 additions & 1 deletion controllers/keda/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc
return scaler, &scalers.ScalerConfig{}, nil
},
}},
Logger: logr.Discard(),
Recorder: nil,
}
metricSpec := v2.MetricSpec{
Expand Down
4 changes: 2 additions & 2 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: "keda-hpa-clean-up-test", Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
return len(hpa.Spec.Metrics)
}).Should(Equal(1))
}, 5*time.Second).Should(Equal(1))
// And it should only be the first one left.
Expect(hpa.Spec.Metrics[0].External.Metric.Name).To(Equal("s0-cron-UTC-0xxxx-1xxxx"))
})
Expand Down Expand Up @@ -782,7 +782,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown)))
}, 10*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown)))

// mock kube-controller-manager request v1beta1.custom.metrics.k8s.io api GetMetrics
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expand Down
34 changes: 17 additions & 17 deletions pkg/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -107,7 +107,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand All @@ -117,21 +117,21 @@ var _ = Describe("fallback", func() {
})

It("should propagate the error when fallback is disabled", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))

so := buildScaledObject(nil, nil)
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
})

It("should bump the number of failures when metrics call fails", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(0)

so := buildScaledObject(
Expand All @@ -152,7 +152,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expand All @@ -161,7 +161,7 @@ var _ = Describe("fallback", func() {
})

It("should return a normalised metric when number of failures are beyond threshold", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
expectedMetricValue := int64(100)

Expand All @@ -182,7 +182,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -214,7 +214,7 @@ var _ = Describe("fallback", func() {
})

It("should ignore error if we fail to update kubernetes status", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
expectedMetricValue := int64(100)

Expand All @@ -238,7 +238,7 @@ var _ = Describe("fallback", func() {
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Some error"))
client.EXPECT().Status().Return(statusWriter)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand All @@ -248,7 +248,7 @@ var _ = Describe("fallback", func() {
})

It("should return error when fallback is enabled but scaledobject has invalid parameter", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)

so := buildScaledObject(
Expand All @@ -268,15 +268,15 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
})

It("should set the fallback condition when a fallback exists in the scaled object", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
failingNumberOfFailures := int32(6)
anotherMetricName := "another metric name"
Expand All @@ -302,15 +302,15 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)
Expect(err).ToNot(HaveOccurred())
condition := so.Status.Conditions.GetFallbackCondition()
Expect(condition.IsTrue()).Should(BeTrue())
})

It("should set the fallback condition to false if the config is invalid", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
failingNumberOfFailures := int32(6)
anotherMetricName := "another metric name"
Expand All @@ -336,7 +336,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)
Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
Expand Down Expand Up @@ -425,7 +425,7 @@ func primeGetMetrics(scaler *mock_scalers.MockScaler, value int64) {
Timestamp: metav1.Now(),
}

scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return([]external_metrics.ExternalMetricValue{expectedMetric}, nil)
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return([]external_metrics.ExternalMetricValue{expectedMetric}, true, nil)
}

func createMetricSpec(averageValue int) v2.MetricSpec {
Expand Down
6 changes: 3 additions & 3 deletions pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (

"google.golang.org/grpc"
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kedacore/keda/v2/pkg/metricsservice/api"
"github.com/kedacore/keda/v2/pkg/scaling"
)

var log = ctrl.Log.WithName("grpc_server")
var log = logf.Log.WithName("grpc_server")

type GrpcServer struct {
server *grpc.Server
Expand All @@ -41,7 +41,7 @@ type GrpcServer struct {
// GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject reference
func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*api.Response, error) {
v1beta1ExtMetrics := &v1beta1.ExternalMetricValueList{}
extMetrics, exportedMetrics, err := (*s.scalerHandler).GetExternalMetrics(ctx, in.Name, in.Namespace, in.MetricName)
extMetrics, exportedMetrics, err := (*s.scalerHandler).GetScaledObjectMetrics(ctx, in.Name, in.Namespace, in.MetricName)
if err != nil {
return nil, fmt.Errorf("error when getting metric values %s", err)
}
Expand Down
Loading

0 comments on commit 829a541

Please sign in to comment.