Skip to content

Commit

Permalink
option to query metric only on polling interval
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik committed Dec 6, 2022
1 parent 9dde386 commit 0960d3e
Show file tree
Hide file tree
Showing 92 changed files with 1,334 additions and 1,155 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: **EXPERIMENTAL** Adding an option to cache metric values for a scaler during the polling interval ([#2282](https://github.com/kedacore/keda/issues/2282))
- **General**: Consolidate all exposed Prometheus Metrics in KEDA Operator ([#3919](https://github.com/kedacore/keda/issues/3919))
- **General**: Disable response compression for k8s restAPI in client-go ([#3863](https://github.com/kedacore/keda/issues/3863)). Kubernetes issue for reference (https://github.com/kubernetes/kubernetes/issues/112296)
- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588))
Expand Down
42 changes: 29 additions & 13 deletions CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,42 @@ If you want to deploy locally

The scalers in KEDA are implementations of a KEDA `Scaler` Go interface declared in `pkg/scalers/scaler.go`. This documentation describes how scalers work and is targeted towards contributors and maintainers.

### GetMetrics
### GetMetricsAndActivity

This is the key function of a scaler; it returns a value that represents a current state of an external metric (e.g. length of a queue). The return type is an `ExternalMetricValue` struct which has the following fields:
- `MetricName`: this is the name of the metric that we are returning. The name should be unique, to allow setting multiple (even the same type) Triggers in one ScaledObject, but each function call should return the same name.
- `Timestamp`: indicates the time at which the metrics were produced.
- `WindowSeconds`: //TODO
- `Value`: A numerical value that represents the state of the metric. It could be the length of a queue, or it can be the amount of lag in a stream, but it can also be a simple representation of the state.
This is the key function of a scaler; it returns:

Kubernetes HPA (Horizontal Pod Autoscaler) will poll `GetMetrics` regularly through KEDA's metric server (as long as there is at least one pod), and compare the returned value to a configured value in the ScaledObject configuration. Kubernetes will use the following formula to decide whether to scale the pods up and down:
1. A value that represents a current state of an external metric (e.g. length of a queue). The return type is an `ExternalMetricValue` struct which has the following fields:
- `MetricName`: this is the name of the metric that we are returning. The name should be unique, to allow setting multiple (even the same type) Triggers in one ScaledObject, but each function call should return the same name.
- `Timestamp`: indicates the time at which the metrics were produced.
- `WindowSeconds`: //TODO
- `Value`: A numerical value that represents the state of the metric. It could be the length of a queue, or it can be the amount of lag in a stream, but it can also be a simple representation of the state.
2. A value that represents an activity of the scaler. The return type is `bool`.

KEDA polls ScaledObject object according to the `pollingInterval` configured in the ScaledObject; it checks the last time it was polled, it checks if the number of replicas is greater than 0, and if the scaler itself is active. So if the scaler returns false for `IsActive`, and if current number of replicas is greater than 0, and there is no configured minimum pods, then KEDA scales down to 0.

Kubernetes HPA (Horizontal Pod Autoscaler) will poll `GetMetricsAndActivity` regularly through KEDA's metric server (as long as there is at least one pod), and compare the returned value to a configured value in the ScaledObject configuration. Kubernetes will use the following formula to decide whether to scale the pods up and down:

`desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]`.

For more details check [Kubernetes HPA documentation](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/).

<br>Next lines are an example about how to use it:
>```golang
func (s *artemisScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
messages, err := s.getQueueMessageCount(ctx)
if err != nil {
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return []external_metrics.ExternalMetricValue{}, false, err
}
metric := GenerateMetricInMili(metricName, float64(messages))
return []external_metrics.ExternalMetricValue{metric}, messages > s.metadata.activationQueueLength, nil
}
>```
### GetMetricSpecForScaling
KEDA works in conjunction with Kubernetes Horizontal Pod Autoscaler (HPA). When KEDA notices a new ScaledObject, it creates an HPA object that has basic information about the metric it needs to poll and scale the pods accordingly. To create this HPA object, KEDA invokes `GetMetricSpecForScaling`.
Expand Down Expand Up @@ -64,12 +86,6 @@ For example:
>```
### IsActive
For some reason, the scaler might need to declare itself as in-active, and the way it can do this is through implementing the function `IsActive`.
KEDA polls ScaledObject object according to the `pollingInterval` configured in the ScaledObject; it checks the last time it was polled, it checks if the number of replicas is greater than 0, and if the scaler itself is active. So if the scaler returns false for `IsActive`, and if current number of replicas is greater than 0, and there is no configured minimum pods, then KEDA scales down to 0.
### Close
After each poll on the scaler to retrieve the metrics, KEDA calls this function for each scaler to give the scaler the opportunity to close any resources, like http clients for example.
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ GO_LDFLAGS="-X=github.com/kedacore/keda/v2/version.GitCommit=$(GIT_COMMIT) -X=gi
COSIGN_FLAGS ?= -a GIT_HASH=${GIT_COMMIT} -a GIT_VERSION=${VERSION} -a BUILD_DATE=${DATE}

# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.24
ENVTEST_K8S_VERSION = 1.25

# Setting SHELL to bash allows bash commands to be executed by recipes.
# This is a requirement for 'setup-envtest.sh' in the test target.
Expand Down Expand Up @@ -147,10 +147,12 @@ proto-gen: protoc-gen ## Generate Liiklus, ExternalScaler and MetricsService pro
PATH="$(LOCALBIN):$(PATH)" protoc -I vendor --proto_path=pkg/metricsservice/api metrics.proto --go_out=pkg/metricsservice/api --go-grpc_out=pkg/metricsservice/api

.PHONY: mockgen-gen
mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go
mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaling/mock_executor/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go

pkg/mock/mock_scaling/mock_interface.go: pkg/scaling/scale_handler.go
$(MOCKGEN) -destination=$@ -package=mock_scaling -source=$^
pkg/mock/mock_scaling/mock_executor/mock_interface.go: pkg/scaling/executor/scale_executor.go
$(MOCKGEN) -destination=$@ -package=mock_executor -source=$^
pkg/mock/mock_scaler/mock_scaler.go: pkg/scalers/scaler.go
$(MOCKGEN) -destination=$@ -package=mock_scalers -source=$^
pkg/mock/mock_scale/mock_interfaces.go: vendor/k8s.io/client-go/scale/interfaces.go
Expand Down
81 changes: 81 additions & 0 deletions apis/keda/v1alpha1/identifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package v1alpha1

import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type testData struct {
name string
expectedIdentifier string
soName string
soNamespace string
soKind string
}

var tests = []testData{
{
name: "all lowercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "all uppercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "NAME",
soNamespace: "NAMESPACE",
soKind: "SCALEDOBJECT",
},
{
name: "camel case",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "missing namespace",
expectedIdentifier: "scaledobject..name",
soName: "name",
soKind: "scaledobject",
},
}

func TestGeneratedIdentifierForScaledObject(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
expectedIdentifier := test.expectedIdentifier
genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName)

scaledObject := &ScaledObject{
ObjectMeta: metav1.ObjectMeta{
Name: test.soName,
Namespace: test.soNamespace,
},
}
scaledObjectIdentifier := scaledObject.GenerateIdentifier()

withTriggers, err := AsDuckWithTriggers(scaledObject)
if err != nil {
t.Errorf("got error while converting to WithTriggers object: %s", err)
}
withTriggersIdentifier := withTriggers.GenerateIdentifier()

if expectedIdentifier != genericIdentifier {
t.Errorf("genericIdentifier=%q doesn't equal the expectedIdentifier=%q", genericIdentifier, expectedIdentifier)
}

if expectedIdentifier != scaledObjectIdentifier {
t.Errorf("scaledObjectIdentifier=%q doesn't equal the expectedIdentifier=%q", scaledObjectIdentifier, expectedIdentifier)
}

if expectedIdentifier != withTriggersIdentifier {
t.Errorf("withTriggersIdentifier=%q doesn't equal the expectedIdentifier=%q", withTriggersIdentifier, expectedIdentifier)
}
})
}
}
10 changes: 9 additions & 1 deletion apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ type ScaleTarget struct {
type ScaleTriggers struct {
Type string `json:"type"`
// +optional
Name string `json:"name,omitempty"`
Name string `json:"name,omitempty"`

UseCachedMetrics bool `json:"useCachedMetrics,omitempty"`

Metadata map[string]string `json:"metadata"`
// +optional
AuthenticationRef *ScaledObjectAuthRef `json:"authenticationRef,omitempty"`
Expand Down Expand Up @@ -179,3 +182,8 @@ type ScaledObjectAuthRef struct {
func init() {
SchemeBuilder.Register(&ScaledObject{}, &ScaledObjectList{})
}

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (s *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier("ScaledObject", s.Namespace, s.Name)
}
36 changes: 34 additions & 2 deletions apis/keda/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,7 +38,8 @@ type WithTriggers struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec WithTriggersSpec `json:"spec"`
InternalKind string `json:"internalKind"`
Spec WithTriggersSpec `json:"spec"`
}

// WithTriggersSpec is the spec for a an object with triggers resource
Expand Down Expand Up @@ -89,5 +91,35 @@ func (t *WithTriggers) GetPollingInterval() time.Duration {

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdentifier() string {
return GenerateIdentifier(t.Kind, t.Namespace, t.Name)
return GenerateIdentifier(t.InternalKind, t.Namespace, t.Name)
}

// AsDuckWithTriggers tries to generates WithTriggers object for input object
// returns error if input object is unknown
func AsDuckWithTriggers(scalableObject interface{}) (*WithTriggers, error) {
switch obj := scalableObject.(type) {
case *ScaledObject:
return &WithTriggers{
TypeMeta: obj.TypeMeta,
ObjectMeta: obj.ObjectMeta,
InternalKind: "ScaledObject",
Spec: WithTriggersSpec{
PollingInterval: obj.Spec.PollingInterval,
Triggers: obj.Spec.Triggers,
},
}, nil
case *ScaledJob:
return &WithTriggers{
TypeMeta: obj.TypeMeta,
ObjectMeta: obj.ObjectMeta,
InternalKind: "ScaledJob",
Spec: WithTriggersSpec{
PollingInterval: obj.Spec.PollingInterval,
Triggers: obj.Spec.Triggers,
},
}, nil
default:
// here could be the conversion from unknown Duck type potentially in the future
return nil, fmt.Errorf("unknown scalable object type %v", scalableObject)
}
}
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7874,6 +7874,8 @@ spec:
type: string
type:
type: string
useCachedMetrics:
type: boolean
required:
- metadata
- type
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ spec:
type: string
type:
type: string
useCachedMetrics:
type: boolean
required:
- metadata
- type
Expand Down
1 change: 0 additions & 1 deletion controllers/keda/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc
return scaler, &scalers.ScalerConfig{}, nil
},
}},
Logger: logr.Discard(),
Recorder: nil,
}
metricSpec := v2.MetricSpec{
Expand Down
3 changes: 3 additions & 0 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log
}

for _, trigger := range scaledJob.Spec.Triggers {
if trigger.UseCachedMetrics {
logger.Info("Warning: property useCachedMetrics is not supported for ScaledJobs.")
}
if trigger.MetricType != "" {
err := fmt.Errorf("metricType is set in one of the ScaledJob scaler")
logger.Error(err, "metricType cannot be set in ScaledJob triggers")
Expand Down
18 changes: 14 additions & 4 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
return "ScaledObject doesn't have correct Idle/Min/Max Replica Counts specification", err
}

err = r.checkTriggerNamesAreUnique(scaledObject)
err = r.checkTriggers(scaledObject)
if err != nil {
return "ScaledObject doesn't have correct triggers specification", err
}
Expand Down Expand Up @@ -338,14 +338,24 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
return gvkr, nil
}

// checkTriggerNamesAreUnique checks that all triggerNames in ScaledObject are unique
func (r *ScaledObjectReconciler) checkTriggerNamesAreUnique(scaledObject *kedav1alpha1.ScaledObject) error {
// checkTriggers checks that general trigger metadata are valid, it checks:
// - triggerNames in ScaledObject are unique
// - useCachedMetrics is defined only for a supported triggers
func (r *ScaledObjectReconciler) checkTriggers(scaledObject *kedav1alpha1.ScaledObject) error {
triggersCount := len(scaledObject.Spec.Triggers)

if triggersCount > 1 {
triggerNames := make(map[string]bool, triggersCount)
for i := 0; i < triggersCount; i++ {
name := scaledObject.Spec.Triggers[i].Name
trigger := scaledObject.Spec.Triggers[i]

if trigger.UseCachedMetrics {
if trigger.Type == "cpu" || trigger.Type == "memory" || trigger.Type == "cron" {
return fmt.Errorf("property \"useCachedMetrics\" is not supported for %q scaler", trigger.Type)
}
}

name := trigger.Name
if name != "" {
if _, found := triggerNames[name]; found {
// found duplicate name
Expand Down
Loading

0 comments on commit 0960d3e

Please sign in to comment.