Skip to content

Commit

Permalink
cache metrics in polling interval call
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik committed Dec 2, 2022
1 parent a489ca7 commit dc764d4
Show file tree
Hide file tree
Showing 82 changed files with 631 additions and 1,088 deletions.
12 changes: 11 additions & 1 deletion apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ type ScaleTarget struct {
type ScaleTriggers struct {
Type string `json:"type"`
// +optional
Name string `json:"name,omitempty"`
Name string `json:"name,omitempty"`
// +optional
CacheDuration *int32 `json:"cacheDuration,omitempty"`

EnableCache bool `json:"enableCache,omitempty"`

Metadata map[string]string `json:"metadata"`
// +optional
AuthenticationRef *ScaledObjectAuthRef `json:"authenticationRef,omitempty"`
Expand Down Expand Up @@ -179,3 +184,8 @@ type ScaledObjectAuthRef struct {
func init() {
SchemeBuilder.Register(&ScaledObject{}, &ScaledObjectList{})
}

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (s *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier(s.Kind, s.Namespace, s.Name)
}
5 changes: 5 additions & 0 deletions apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7861,6 +7861,11 @@ spec:
required:
- name
type: object
cacheDuration:
format: int32
type: integer
enableCache:
type: boolean
metadata:
additionalProperties:
type: string
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ spec:
required:
- name
type: object
cacheDuration:
format: int32
type: integer
enableCache:
type: boolean
metadata:
additionalProperties:
type: string
Expand Down
1 change: 0 additions & 1 deletion controllers/keda/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc
return scaler, &scalers.ScalerConfig{}, nil
},
}},
Logger: logr.Discard(),
Recorder: nil,
}
metricSpec := v2.MetricSpec{
Expand Down
34 changes: 17 additions & 17 deletions pkg/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -107,7 +107,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand All @@ -117,21 +117,21 @@ var _ = Describe("fallback", func() {
})

It("should propagate the error when fallback is disabled", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))

so := buildScaledObject(nil, nil)
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
})

It("should bump the number of failures when metrics call fails", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(0)

so := buildScaledObject(
Expand All @@ -152,7 +152,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expand All @@ -161,7 +161,7 @@ var _ = Describe("fallback", func() {
})

It("should return a normalised metric when number of failures are beyond threshold", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
expectedMetricValue := int64(100)

Expand All @@ -182,7 +182,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -214,7 +214,7 @@ var _ = Describe("fallback", func() {
})

It("should ignore error if we fail to update kubernetes status", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
expectedMetricValue := int64(100)

Expand All @@ -238,7 +238,7 @@ var _ = Describe("fallback", func() {
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Some error"))
client.EXPECT().Status().Return(statusWriter)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand All @@ -248,7 +248,7 @@ var _ = Describe("fallback", func() {
})

It("should return error when fallback is enabled but scaledobject has invalid parameter", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)

so := buildScaledObject(
Expand All @@ -268,15 +268,15 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
})

It("should set the fallback condition when a fallback exists in the scaled object", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
failingNumberOfFailures := int32(6)
anotherMetricName := "another metric name"
Expand All @@ -302,15 +302,15 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)
Expect(err).ToNot(HaveOccurred())
condition := so.Status.Conditions.GetFallbackCondition()
Expect(condition.IsTrue()).Should(BeTrue())
})

It("should set the fallback condition to false if the config is invalid", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
failingNumberOfFailures := int32(6)
anotherMetricName := "another metric name"
Expand All @@ -336,7 +336,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)
Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
Expand Down Expand Up @@ -425,7 +425,7 @@ func primeGetMetrics(scaler *mock_scalers.MockScaler, value int64) {
Timestamp: metav1.Now(),
}

scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return([]external_metrics.ExternalMetricValue{expectedMetric}, nil)
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return([]external_metrics.ExternalMetricValue{expectedMetric}, true, nil)
}

func createMetricSpec(averageValue int) v2.MetricSpec {
Expand Down
6 changes: 3 additions & 3 deletions pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (

"google.golang.org/grpc"
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kedacore/keda/v2/pkg/metricsservice/api"
"github.com/kedacore/keda/v2/pkg/scaling"
)

var log = ctrl.Log.WithName("grpc_server")
var log = logf.Log.WithName("grpc_server")

type GrpcServer struct {
server *grpc.Server
Expand All @@ -41,7 +41,7 @@ type GrpcServer struct {
// GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject reference
func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*api.Response, error) {
v1beta1ExtMetrics := &v1beta1.ExternalMetricValueList{}
extMetrics, exportedMetrics, err := (*s.scalerHandler).GetExternalMetrics(ctx, in.Name, in.Namespace, in.MetricName)
extMetrics, exportedMetrics, err := (*s.scalerHandler).GetScaledObjectMetrics(ctx, in.Name, in.Namespace, in.MetricName)
if err != nil {
return nil, fmt.Errorf("error when getting metric values %s", err)
}
Expand Down
64 changes: 18 additions & 46 deletions pkg/mock/mock_scaler/mock_scaler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions pkg/mock/mock_scaling/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/prommetrics/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var log = ctrl.Log.WithName("prometheus_server")
var log = logf.Log.WithName("prometheus_server")

const (
ClusterTriggerAuthenticationResource = "cluster_trigger_authentication"
Expand Down
Loading

0 comments on commit dc764d4

Please sign in to comment.