Skip to content

Commit

Permalink
option to query metric only on polling interval
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik committed Dec 3, 2022
1 parent a489ca7 commit 48c351c
Show file tree
Hide file tree
Showing 88 changed files with 1,247 additions and 1,167 deletions.
42 changes: 29 additions & 13 deletions CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,42 @@ If you want to deploy locally

The scalers in KEDA are implementations of a KEDA `Scaler` Go interface declared in `pkg/scalers/scaler.go`. This documentation describes how scalers work and is targeted towards contributors and maintainers.

### GetMetrics
### GetMetricsAndActivity

This is the key function of a scaler; it returns a value that represents a current state of an external metric (e.g. length of a queue). The return type is an `ExternalMetricValue` struct which has the following fields:
- `MetricName`: this is the name of the metric that we are returning. The name should be unique, to allow setting multiple (even the same type) Triggers in one ScaledObject, but each function call should return the same name.
- `Timestamp`: indicates the time at which the metrics were produced.
- `WindowSeconds`: //TODO
- `Value`: A numerical value that represents the state of the metric. It could be the length of a queue, or it can be the amount of lag in a stream, but it can also be a simple representation of the state.
This is the key function of a scaler; it returns:

Kubernetes HPA (Horizontal Pod Autoscaler) will poll `GetMetrics` regularly through KEDA's metric server (as long as there is at least one pod), and compare the returned value to a configured value in the ScaledObject configuration. Kubernetes will use the following formula to decide whether to scale the pods up and down:
1. A value that represents a current state of an external metric (e.g. length of a queue). The return type is an `ExternalMetricValue` struct which has the following fields:
- `MetricName`: this is the name of the metric that we are returning. The name should be unique, to allow setting multiple (even the same type) Triggers in one ScaledObject, but each function call should return the same name.
- `Timestamp`: indicates the time at which the metrics were produced.
- `WindowSeconds`: //TODO
- `Value`: A numerical value that represents the state of the metric. It could be the length of a queue, or it can be the amount of lag in a stream, but it can also be a simple representation of the state.
2. A value that represents an activity of the scaler. The return type is `bool`.

KEDA polls ScaledObject object according to the `pollingInterval` configured in the ScaledObject; it checks the last time it was polled, it checks if the number of replicas is greater than 0, and if the scaler itself is active. So if the scaler returns false for `IsActive`, and if current number of replicas is greater than 0, and there is no configured minimum pods, then KEDA scales down to 0.

Kubernetes HPA (Horizontal Pod Autoscaler) will poll `GetMetricsAndActivity` regularly through KEDA's metric server (as long as there is at least one pod), and compare the returned value to a configured value in the ScaledObject configuration. Kubernetes will use the following formula to decide whether to scale the pods up and down:

`desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]`.

For more details check [Kubernetes HPA documentation](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/).

<br>Next lines are an example about how to use it:
>```golang
func (s *artemisScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
messages, err := s.getQueueMessageCount(ctx)
if err != nil {
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return []external_metrics.ExternalMetricValue{}, false, err
}
metric := GenerateMetricInMili(metricName, float64(messages))
return []external_metrics.ExternalMetricValue{metric}, messages > s.metadata.activationQueueLength, nil
}
>```
### GetMetricSpecForScaling
KEDA works in conjunction with Kubernetes Horizontal Pod Autoscaler (HPA). When KEDA notices a new ScaledObject, it creates an HPA object that has basic information about the metric it needs to poll and scale the pods accordingly. To create this HPA object, KEDA invokes `GetMetricSpecForScaling`.
Expand Down Expand Up @@ -64,12 +86,6 @@ For example:
>```
### IsActive
For some reason, the scaler might need to declare itself as in-active, and the way it can do this is through implementing the function `IsActive`.
KEDA polls ScaledObject object according to the `pollingInterval` configured in the ScaledObject; it checks the last time it was polled, it checks if the number of replicas is greater than 0, and if the scaler itself is active. So if the scaler returns false for `IsActive`, and if current number of replicas is greater than 0, and there is no configured minimum pods, then KEDA scales down to 0.
### Close
After each poll on the scaler to retrieve the metrics, KEDA calls this function for each scaler to give the scaler the opportunity to close any resources, like http clients for example.
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ GO_LDFLAGS="-X=github.com/kedacore/keda/v2/version.GitCommit=$(GIT_COMMIT) -X=gi
COSIGN_FLAGS ?= -a GIT_HASH=${GIT_COMMIT} -a GIT_VERSION=${VERSION} -a BUILD_DATE=${DATE}

# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.24
ENVTEST_K8S_VERSION = 1.25

# Setting SHELL to bash allows bash commands to be executed by recipes.
# This is a requirement for 'setup-envtest.sh' in the test target.
Expand Down Expand Up @@ -144,10 +144,12 @@ proto-gen: protoc-gen ## Generate Liiklus, ExternalScaler and MetricsService pro
PATH="$(LOCALBIN):$(PATH)" protoc -I vendor --proto_path=pkg/metricsservice/api metrics.proto --go_out=pkg/metricsservice/api --go-grpc_out=pkg/metricsservice/api

.PHONY: mockgen-gen
mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go
mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaling/mock_executor/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go

pkg/mock/mock_scaling/mock_interface.go: pkg/scaling/scale_handler.go
$(MOCKGEN) -destination=$@ -package=mock_scaling -source=$^
pkg/mock/mock_scaling/mock_executor/mock_interface.go: pkg/scaling/executor/scale_executor.go
$(MOCKGEN) -destination=$@ -package=mock_executor -source=$^
pkg/mock/mock_scaler/mock_scaler.go: pkg/scalers/scaler.go
$(MOCKGEN) -destination=$@ -package=mock_scalers -source=$^
pkg/mock/mock_scale/mock_interfaces.go: vendor/k8s.io/client-go/scale/interfaces.go
Expand Down
81 changes: 81 additions & 0 deletions apis/keda/v1alpha1/identifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package v1alpha1

import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type testData struct {
name string
expectedIdentifier string
soName string
soNamespace string
soKind string
}

var tests = []testData{
{
name: "all lowercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "all uppercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "NAME",
soNamespace: "NAMESPACE",
soKind: "SCALEDOBJECT",
},
{
name: "camel case",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "missing namespace",
expectedIdentifier: "scaledobject..name",
soName: "name",
soKind: "scaledobject",
},
}

func TestGeneratedIdentifierForScaledObject(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
expectedIdentifier := test.expectedIdentifier
genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName)

scaledObject := &ScaledObject{
ObjectMeta: metav1.ObjectMeta{
Name: test.soName,
Namespace: test.soNamespace,
},
}
scaledObjectIdentifier := scaledObject.GenerateIdentifier()

withTriggers, err := AsDuckWithTriggers(scaledObject)
if err != nil {
t.Errorf("got error while converting to WithTriggers object: %s", err)
}
withTriggersIdentifier := withTriggers.GenerateIdentifier()

if expectedIdentifier != genericIdentifier {
t.Errorf("genericIdentifier=%q doesn't equal the expectedIdentifier=%q", genericIdentifier, expectedIdentifier)
}

if expectedIdentifier != scaledObjectIdentifier {
t.Errorf("scaledObjectIdentifier=%q doesn't equal the expectedIdentifier=%q", scaledObjectIdentifier, expectedIdentifier)
}

if expectedIdentifier != withTriggersIdentifier {
t.Errorf("withTriggersIdentifier=%q doesn't equal the expectedIdentifier=%q", withTriggersIdentifier, expectedIdentifier)
}
})
}
}
10 changes: 9 additions & 1 deletion apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ type ScaleTarget struct {
type ScaleTriggers struct {
Type string `json:"type"`
// +optional
Name string `json:"name,omitempty"`
Name string `json:"name,omitempty"`

QueryMetricsOnPollingInterval bool `json:"queryMetricsOnPollingInterval,omitempty"`

Metadata map[string]string `json:"metadata"`
// +optional
AuthenticationRef *ScaledObjectAuthRef `json:"authenticationRef,omitempty"`
Expand Down Expand Up @@ -179,3 +182,8 @@ type ScaledObjectAuthRef struct {
func init() {
SchemeBuilder.Register(&ScaledObject{}, &ScaledObjectList{})
}

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (s *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier("ScaledObject", s.Namespace, s.Name)
}
36 changes: 34 additions & 2 deletions apis/keda/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,7 +38,8 @@ type WithTriggers struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec WithTriggersSpec `json:"spec"`
InternalKind string `json:"internalKind"`
Spec WithTriggersSpec `json:"spec"`
}

// WithTriggersSpec is the spec for a an object with triggers resource
Expand Down Expand Up @@ -89,5 +91,35 @@ func (t *WithTriggers) GetPollingInterval() time.Duration {

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdentifier() string {
return GenerateIdentifier(t.Kind, t.Namespace, t.Name)
return GenerateIdentifier(t.InternalKind, t.Namespace, t.Name)
}

// AsDuckWithTriggers tries to generates WithTriggers object for input object
// returns error if input object is unknown
func AsDuckWithTriggers(scalableObject interface{}) (*WithTriggers, error) {
switch obj := scalableObject.(type) {
case *ScaledObject:
return &WithTriggers{
TypeMeta: obj.TypeMeta,
ObjectMeta: obj.ObjectMeta,
InternalKind: "ScaledObject",
Spec: WithTriggersSpec{
PollingInterval: obj.Spec.PollingInterval,
Triggers: obj.Spec.Triggers,
},
}, nil
case *ScaledJob:
return &WithTriggers{
TypeMeta: obj.TypeMeta,
ObjectMeta: obj.ObjectMeta,
InternalKind: "ScaledJob",
Spec: WithTriggersSpec{
PollingInterval: obj.Spec.PollingInterval,
Triggers: obj.Spec.Triggers,
},
}, nil
default:
// here could be the conversion from unknown Duck type potentially in the future
return nil, fmt.Errorf("unknown scalable object type %v", scalableObject)
}
}
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7872,6 +7872,8 @@ spec:
type: string
name:
type: string
queryMetricsOnPollingInterval:
type: boolean
type:
type: string
required:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ spec:
type: string
name:
type: string
queryMetricsOnPollingInterval:
type: boolean
type:
type: string
required:
Expand Down
1 change: 0 additions & 1 deletion controllers/keda/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc
return scaler, &scalers.ScalerConfig{}, nil
},
}},
Logger: logr.Discard(),
Recorder: nil,
}
metricSpec := v2.MetricSpec{
Expand Down
4 changes: 2 additions & 2 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: "keda-hpa-clean-up-test", Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
return len(hpa.Spec.Metrics)
}).Should(Equal(1))
}, 5*time.Second).Should(Equal(1))
// And it should only be the first one left.
Expect(hpa.Spec.Metrics[0].External.Metric.Name).To(Equal("s0-cron-UTC-0xxxx-1xxxx"))
})
Expand Down Expand Up @@ -782,7 +782,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown)))
}, 10*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown)))

// mock kube-controller-manager request v1beta1.custom.metrics.k8s.io api GetMetrics
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expand Down
Loading

0 comments on commit 48c351c

Please sign in to comment.