From 8bde4c8e3a08dd29c0b309d65185c351fb6ec2c8 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Wed, 26 Apr 2023 20:09:58 +0200 Subject: [PATCH] Metrics Adapter: remove deprecated Prom Metrics and cleanup Signed-off-by: Zbynek Roubalik --- cmd/adapter/main.go | 8 - config/metrics-server/deployment.yaml | 2 - config/metrics-server/service.yaml | 3 - config/prometheus/kustomization.yaml | 2 - config/prometheus/monitor.yaml | 16 - pkg/metricsservice/api/metrics.pb.go | 395 +----------------- pkg/metricsservice/api/metrics.proto | 29 +- pkg/metricsservice/api/metrics_grpc.pb.go | 11 +- pkg/metricsservice/client.go | 17 +- pkg/metricsservice/server.go | 13 +- pkg/mock/mock_scaling/mock_interface.go | 8 +- .../adapter/adapter_prommetrics.go | 140 ------- pkg/provider/provider.go | 133 +----- .../externalscaler/externalscaler.pb.go | 2 +- pkg/scalers/liiklus/LiiklusService.pb.go | 6 +- pkg/scaling/scale_handler.go | 40 +- pkg/scaling/scale_handler_test.go | 6 +- .../prometheus_metrics_test.go | 25 -- 18 files changed, 69 insertions(+), 787 deletions(-) delete mode 100644 config/prometheus/kustomization.yaml delete mode 100644 config/prometheus/monitor.yaml delete mode 100644 pkg/prommetrics/adapter/adapter_prommetrics.go diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index e650bd266cb..dd689e0c13c 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -43,7 +43,6 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedacontrollers "github.com/kedacore/keda/v2/controllers/keda" "github.com/kedacore/keda/v2/pkg/metricsservice" - prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/adapter" kedaprovider "github.com/kedacore/keda/v2/pkg/provider" "github.com/kedacore/keda/v2/pkg/scaling" kedautil "github.com/kedacore/keda/v2/pkg/util" @@ -60,8 +59,6 @@ type Adapter struct { var logger = klogr.New().WithName("keda_metrics_adapter") var ( - prometheusMetricsPort int - prometheusMetricsPath string adapterClientRequestQPS float32 adapterClientRequestBurst int metricsAPIServerPort int @@ -153,9 +150,6 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat externalMetricsInfo := &[]provider.ExternalMetricInfo{} externalMetricsInfoLock := &sync.RWMutex{} - prometheusServer := &prommetrics.PrometheusMetricServer{} - go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }() - stopCh := make(chan struct{}) if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh, secretInformer.Informer().HasSynced); err != nil { return nil, nil, err @@ -246,8 +240,6 @@ func main() { cmd.Flags().StringVar(&cmd.Message, "msg", "starting adapter...", "startup message") cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the klog flags cmd.Flags().IntVar(&metricsAPIServerPort, "port", 8080, "Set the port for the metrics API server") - cmd.Flags().IntVar(&prometheusMetricsPort, "metrics-port", 9022, "Set the port to expose prometheus metrics") - cmd.Flags().StringVar(&prometheusMetricsPath, "metrics-path", "/metrics", "Set the path for the prometheus metrics endpoint") cmd.Flags().StringVar(&metricsServiceAddr, "metrics-service-address", generateDefaultMetricsServiceAddr(), "The address of the gRPRC Metrics Service Server.") cmd.Flags().Float32Var(&adapterClientRequestQPS, "kube-api-qps", 20.0, "Set the QPS rate for throttling requests sent to the apiserver") cmd.Flags().IntVar(&adapterClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver") diff --git a/config/metrics-server/deployment.yaml b/config/metrics-server/deployment.yaml index 833ea678f14..a6f7f88625f 100644 --- a/config/metrics-server/deployment.yaml +++ b/config/metrics-server/deployment.yaml @@ -68,8 +68,6 @@ spec: name: https - containerPort: 8080 name: http - - containerPort: 9022 - name: metrics volumeMounts: - mountPath: /tmp name: temp-vol diff --git a/config/metrics-server/service.yaml b/config/metrics-server/service.yaml index e3880adc927..e1281b12bae 100644 --- a/config/metrics-server/service.yaml +++ b/config/metrics-server/service.yaml @@ -16,8 +16,5 @@ spec: - name: http port: 80 targetPort: 8080 - - name: metrics - port: 9022 - targetPort: 9022 selector: app: keda-metrics-apiserver diff --git a/config/prometheus/kustomization.yaml b/config/prometheus/kustomization.yaml deleted file mode 100644 index ed137168a1d..00000000000 --- a/config/prometheus/kustomization.yaml +++ /dev/null @@ -1,2 +0,0 @@ -resources: -- monitor.yaml diff --git a/config/prometheus/monitor.yaml b/config/prometheus/monitor.yaml deleted file mode 100644 index 9b8047b760f..00000000000 --- a/config/prometheus/monitor.yaml +++ /dev/null @@ -1,16 +0,0 @@ - -# Prometheus Monitor Service (Metrics) -apiVersion: monitoring.coreos.com/v1 -kind: ServiceMonitor -metadata: - labels: - control-plane: controller-manager - name: controller-manager-metrics-monitor - namespace: system -spec: - endpoints: - - path: /metrics - port: https - selector: - matchLabels: - control-plane: controller-manager diff --git a/pkg/metricsservice/api/metrics.pb.go b/pkg/metricsservice/api/metrics.pb.go index bd9c7a86e7a..5126d6b843f 100644 --- a/pkg/metricsservice/api/metrics.pb.go +++ b/pkg/metricsservice/api/metrics.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 +// protoc-gen-go v1.30.0 // protoc v3.21.12 // source: metrics.proto @@ -99,267 +99,6 @@ func (x *ScaledObjectRef) GetMetricName() string { return "" } -type Response struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Metrics *v1beta1.ExternalMetricValueList `protobuf:"bytes,1,opt,name=metrics,proto3" json:"metrics,omitempty"` - PromMetrics *PromMetricsMsg `protobuf:"bytes,2,opt,name=promMetrics,proto3" json:"promMetrics,omitempty"` -} - -func (x *Response) Reset() { - *x = Response{} - if protoimpl.UnsafeEnabled { - mi := &file_metrics_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Response) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Response) ProtoMessage() {} - -func (x *Response) ProtoReflect() protoreflect.Message { - mi := &file_metrics_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Response.ProtoReflect.Descriptor instead. -func (*Response) Descriptor() ([]byte, []int) { - return file_metrics_proto_rawDescGZIP(), []int{1} -} - -func (x *Response) GetMetrics() *v1beta1.ExternalMetricValueList { - if x != nil { - return x.Metrics - } - return nil -} - -func (x *Response) GetPromMetrics() *PromMetricsMsg { - if x != nil { - return x.PromMetrics - } - return nil -} - -// [DEPRECATED] PromMetricsMsg provides metrics for deprecated Prometheus Metrics in Metrics Server -type PromMetricsMsg struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ScaledObjectErr bool `protobuf:"varint,1,opt,name=scaledObjectErr,proto3" json:"scaledObjectErr,omitempty"` - ScalerMetric []*ScalerMetricMsg `protobuf:"bytes,2,rep,name=scalerMetric,proto3" json:"scalerMetric,omitempty"` - ScalerError []*ScalerErrorMsg `protobuf:"bytes,3,rep,name=scalerError,proto3" json:"scalerError,omitempty"` -} - -func (x *PromMetricsMsg) Reset() { - *x = PromMetricsMsg{} - if protoimpl.UnsafeEnabled { - mi := &file_metrics_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PromMetricsMsg) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PromMetricsMsg) ProtoMessage() {} - -func (x *PromMetricsMsg) ProtoReflect() protoreflect.Message { - mi := &file_metrics_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PromMetricsMsg.ProtoReflect.Descriptor instead. -func (*PromMetricsMsg) Descriptor() ([]byte, []int) { - return file_metrics_proto_rawDescGZIP(), []int{2} -} - -func (x *PromMetricsMsg) GetScaledObjectErr() bool { - if x != nil { - return x.ScaledObjectErr - } - return false -} - -func (x *PromMetricsMsg) GetScalerMetric() []*ScalerMetricMsg { - if x != nil { - return x.ScalerMetric - } - return nil -} - -func (x *PromMetricsMsg) GetScalerError() []*ScalerErrorMsg { - if x != nil { - return x.ScalerError - } - return nil -} - -type ScalerMetricMsg struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ScalerName string `protobuf:"bytes,1,opt,name=scalerName,proto3" json:"scalerName,omitempty"` - ScalerIndex int32 `protobuf:"varint,2,opt,name=scalerIndex,proto3" json:"scalerIndex,omitempty"` - MetricName string `protobuf:"bytes,3,opt,name=metricName,proto3" json:"metricName,omitempty"` - MetricValue float32 `protobuf:"fixed32,4,opt,name=metricValue,proto3" json:"metricValue,omitempty"` -} - -func (x *ScalerMetricMsg) Reset() { - *x = ScalerMetricMsg{} - if protoimpl.UnsafeEnabled { - mi := &file_metrics_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ScalerMetricMsg) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ScalerMetricMsg) ProtoMessage() {} - -func (x *ScalerMetricMsg) ProtoReflect() protoreflect.Message { - mi := &file_metrics_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ScalerMetricMsg.ProtoReflect.Descriptor instead. -func (*ScalerMetricMsg) Descriptor() ([]byte, []int) { - return file_metrics_proto_rawDescGZIP(), []int{3} -} - -func (x *ScalerMetricMsg) GetScalerName() string { - if x != nil { - return x.ScalerName - } - return "" -} - -func (x *ScalerMetricMsg) GetScalerIndex() int32 { - if x != nil { - return x.ScalerIndex - } - return 0 -} - -func (x *ScalerMetricMsg) GetMetricName() string { - if x != nil { - return x.MetricName - } - return "" -} - -func (x *ScalerMetricMsg) GetMetricValue() float32 { - if x != nil { - return x.MetricValue - } - return 0 -} - -type ScalerErrorMsg struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ScalerName string `protobuf:"bytes,1,opt,name=scalerName,proto3" json:"scalerName,omitempty"` - ScalerIndex int32 `protobuf:"varint,2,opt,name=scalerIndex,proto3" json:"scalerIndex,omitempty"` - MetricName string `protobuf:"bytes,3,opt,name=metricName,proto3" json:"metricName,omitempty"` - Error bool `protobuf:"varint,4,opt,name=error,proto3" json:"error,omitempty"` -} - -func (x *ScalerErrorMsg) Reset() { - *x = ScalerErrorMsg{} - if protoimpl.UnsafeEnabled { - mi := &file_metrics_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *ScalerErrorMsg) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ScalerErrorMsg) ProtoMessage() {} - -func (x *ScalerErrorMsg) ProtoReflect() protoreflect.Message { - mi := &file_metrics_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ScalerErrorMsg.ProtoReflect.Descriptor instead. -func (*ScalerErrorMsg) Descriptor() ([]byte, []int) { - return file_metrics_proto_rawDescGZIP(), []int{4} -} - -func (x *ScalerErrorMsg) GetScalerName() string { - if x != nil { - return x.ScalerName - } - return "" -} - -func (x *ScalerErrorMsg) GetScalerIndex() int32 { - if x != nil { - return x.ScalerIndex - } - return 0 -} - -func (x *ScalerErrorMsg) GetMetricName() string { - if x != nil { - return x.MetricName - } - return "" -} - -func (x *ScalerErrorMsg) GetError() bool { - if x != nil { - return x.Error - } - return false -} - var File_metrics_proto protoreflect.FileDescriptor var file_metrics_proto_rawDesc = []byte{ @@ -374,52 +113,16 @@ var file_metrics_proto_rawDesc = []byte{ 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0a, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xa6, 0x01, 0x0a, 0x08, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x63, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, - 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6b, 0x38, 0x73, 0x2e, - 0x69, 0x6f, 0x2e, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x61, - 0x70, 0x69, 0x73, 0x2e, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5f, 0x6d, 0x65, 0x74, - 0x72, 0x69, 0x63, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x45, 0x78, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x56, 0x61, 0x6c, 0x75, 0x65, - 0x4c, 0x69, 0x73, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x35, 0x0a, - 0x0b, 0x70, 0x72, 0x6f, 0x6d, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x50, 0x72, 0x6f, 0x6d, 0x4d, 0x65, 0x74, - 0x72, 0x69, 0x63, 0x73, 0x4d, 0x73, 0x67, 0x52, 0x0b, 0x70, 0x72, 0x6f, 0x6d, 0x4d, 0x65, 0x74, - 0x72, 0x69, 0x63, 0x73, 0x22, 0xab, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x6d, 0x4d, 0x65, 0x74, - 0x72, 0x69, 0x63, 0x73, 0x4d, 0x73, 0x67, 0x12, 0x28, 0x0a, 0x0f, 0x73, 0x63, 0x61, 0x6c, 0x65, - 0x64, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x45, 0x72, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x0f, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x64, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x45, 0x72, - 0x72, 0x12, 0x38, 0x0a, 0x0c, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x74, 0x72, 0x69, - 0x63, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x63, - 0x61, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x4d, 0x73, 0x67, 0x52, 0x0c, 0x73, - 0x63, 0x61, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x0b, 0x73, - 0x63, 0x61, 0x6c, 0x65, 0x72, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x13, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x45, 0x72, 0x72, - 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x52, 0x0b, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x45, 0x72, 0x72, - 0x6f, 0x72, 0x22, 0x95, 0x01, 0x0a, 0x0f, 0x53, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x74, - 0x72, 0x69, 0x63, 0x4d, 0x73, 0x67, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, - 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x63, 0x61, 0x6c, - 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, - 0x49, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x73, 0x63, 0x61, - 0x6c, 0x65, 0x72, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1e, 0x0a, 0x0a, 0x6d, 0x65, 0x74, 0x72, - 0x69, 0x63, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x6d, 0x65, 0x74, 0x72, - 0x69, 0x63, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x02, 0x52, 0x0b, 0x6d, - 0x65, 0x74, 0x72, 0x69, 0x63, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x88, 0x01, 0x0a, 0x0e, 0x53, - 0x63, 0x61, 0x6c, 0x65, 0x72, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x1e, 0x0a, - 0x0a, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0a, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, - 0x0b, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x0b, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, - 0x1e, 0x0a, 0x0a, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x4e, 0x61, 0x6d, 0x65, 0x12, - 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0x45, 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x33, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x14, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x63, 0x61, 0x6c, - 0x65, 0x64, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x52, 0x65, 0x66, 0x1a, 0x0d, 0x2e, 0x61, 0x70, - 0x69, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x07, 0x5a, 0x05, - 0x2e, 0x3b, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x0a, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x4e, 0x61, 0x6d, 0x65, 0x32, 0x81, 0x01, 0x0a, 0x0e, + 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x6f, + 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x14, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x53, 0x63, 0x61, 0x6c, 0x65, 0x64, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x52, + 0x65, 0x66, 0x1a, 0x49, 0x2e, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2e, 0x6d, 0x65, 0x74, 0x72, + 0x69, 0x63, 0x73, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x61, 0x70, 0x69, 0x73, 0x2e, 0x65, 0x78, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x76, 0x31, + 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x45, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x4d, 0x65, + 0x74, 0x72, 0x69, 0x63, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x00, 0x42, + 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -434,27 +137,19 @@ func file_metrics_proto_rawDescGZIP() []byte { return file_metrics_proto_rawDescData } -var file_metrics_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_metrics_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_metrics_proto_goTypes = []interface{}{ (*ScaledObjectRef)(nil), // 0: api.ScaledObjectRef - (*Response)(nil), // 1: api.Response - (*PromMetricsMsg)(nil), // 2: api.PromMetricsMsg - (*ScalerMetricMsg)(nil), // 3: api.ScalerMetricMsg - (*ScalerErrorMsg)(nil), // 4: api.ScalerErrorMsg - (*v1beta1.ExternalMetricValueList)(nil), // 5: k8s.io.metrics.pkg.apis.external_metrics.v1beta1.ExternalMetricValueList + (*v1beta1.ExternalMetricValueList)(nil), // 1: k8s.io.metrics.pkg.apis.external_metrics.v1beta1.ExternalMetricValueList } var file_metrics_proto_depIdxs = []int32{ - 5, // 0: api.Response.metrics:type_name -> k8s.io.metrics.pkg.apis.external_metrics.v1beta1.ExternalMetricValueList - 2, // 1: api.Response.promMetrics:type_name -> api.PromMetricsMsg - 3, // 2: api.PromMetricsMsg.scalerMetric:type_name -> api.ScalerMetricMsg - 4, // 3: api.PromMetricsMsg.scalerError:type_name -> api.ScalerErrorMsg - 0, // 4: api.MetricsService.GetMetrics:input_type -> api.ScaledObjectRef - 1, // 5: api.MetricsService.GetMetrics:output_type -> api.Response - 5, // [5:6] is the sub-list for method output_type - 4, // [4:5] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 0, // 0: api.MetricsService.GetMetrics:input_type -> api.ScaledObjectRef + 1, // 1: api.MetricsService.GetMetrics:output_type -> k8s.io.metrics.pkg.apis.external_metrics.v1beta1.ExternalMetricValueList + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } func init() { file_metrics_proto_init() } @@ -475,54 +170,6 @@ func file_metrics_proto_init() { return nil } } - file_metrics_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Response); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_metrics_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PromMetricsMsg); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_metrics_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ScalerMetricMsg); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_metrics_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ScalerErrorMsg); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } } type x struct{} out := protoimpl.TypeBuilder{ @@ -530,7 +177,7 @@ func file_metrics_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_metrics_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 1, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/metricsservice/api/metrics.proto b/pkg/metricsservice/api/metrics.proto index 59226eeeed1..883b2ed7ca2 100644 --- a/pkg/metricsservice/api/metrics.proto +++ b/pkg/metricsservice/api/metrics.proto @@ -22,7 +22,7 @@ option go_package = ".;api"; import "k8s.io/metrics/pkg/apis/external_metrics/v1beta1/generated.proto"; service MetricsService { - rpc GetMetrics (ScaledObjectRef) returns (Response) {}; + rpc GetMetrics (ScaledObjectRef) returns (k8s.io.metrics.pkg.apis.external_metrics.v1beta1.ExternalMetricValueList) {}; } message ScaledObjectRef { @@ -30,30 +30,3 @@ message ScaledObjectRef { string namespace = 2; string metricName = 3; } - - -message Response { - k8s.io.metrics.pkg.apis.external_metrics.v1beta1.ExternalMetricValueList metrics = 1; - PromMetricsMsg promMetrics = 2; -} - -// [DEPRECATED] PromMetricsMsg provides metrics for deprecated Prometheus Metrics in Metrics Server -message PromMetricsMsg { - bool scaledObjectErr = 1; - repeated ScalerMetricMsg scalerMetric = 2; - repeated ScalerErrorMsg scalerError = 3; -} - -message ScalerMetricMsg { - string scalerName = 1; - int32 scalerIndex = 2; - string metricName = 3; - float metricValue = 4; -} - -message ScalerErrorMsg { - string scalerName = 1; - int32 scalerIndex = 2; - string metricName = 3; - bool error = 4; -} diff --git a/pkg/metricsservice/api/metrics_grpc.pb.go b/pkg/metricsservice/api/metrics_grpc.pb.go index d310e610985..8fa556834d1 100644 --- a/pkg/metricsservice/api/metrics_grpc.pb.go +++ b/pkg/metricsservice/api/metrics_grpc.pb.go @@ -26,6 +26,7 @@ import ( grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" + v1beta1 "k8s.io/metrics/pkg/apis/external_metrics/v1beta1" ) // This is a compile-time assertion to ensure that this generated file @@ -41,7 +42,7 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type MetricsServiceClient interface { - GetMetrics(ctx context.Context, in *ScaledObjectRef, opts ...grpc.CallOption) (*Response, error) + GetMetrics(ctx context.Context, in *ScaledObjectRef, opts ...grpc.CallOption) (*v1beta1.ExternalMetricValueList, error) } type metricsServiceClient struct { @@ -52,8 +53,8 @@ func NewMetricsServiceClient(cc grpc.ClientConnInterface) MetricsServiceClient { return &metricsServiceClient{cc} } -func (c *metricsServiceClient) GetMetrics(ctx context.Context, in *ScaledObjectRef, opts ...grpc.CallOption) (*Response, error) { - out := new(Response) +func (c *metricsServiceClient) GetMetrics(ctx context.Context, in *ScaledObjectRef, opts ...grpc.CallOption) (*v1beta1.ExternalMetricValueList, error) { + out := new(v1beta1.ExternalMetricValueList) err := c.cc.Invoke(ctx, MetricsService_GetMetrics_FullMethodName, in, out, opts...) if err != nil { return nil, err @@ -65,7 +66,7 @@ func (c *metricsServiceClient) GetMetrics(ctx context.Context, in *ScaledObjectR // All implementations must embed UnimplementedMetricsServiceServer // for forward compatibility type MetricsServiceServer interface { - GetMetrics(context.Context, *ScaledObjectRef) (*Response, error) + GetMetrics(context.Context, *ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) mustEmbedUnimplementedMetricsServiceServer() } @@ -73,7 +74,7 @@ type MetricsServiceServer interface { type UnimplementedMetricsServiceServer struct { } -func (UnimplementedMetricsServiceServer) GetMetrics(context.Context, *ScaledObjectRef) (*Response, error) { +func (UnimplementedMetricsServiceServer) GetMetrics(context.Context, *ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) { return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented") } func (UnimplementedMetricsServiceServer) mustEmbedUnimplementedMetricsServiceServer() {} diff --git a/pkg/metricsservice/client.go b/pkg/metricsservice/client.go index ba6692b4f25..537b172d200 100644 --- a/pkg/metricsservice/client.go +++ b/pkg/metricsservice/client.go @@ -65,24 +65,19 @@ func NewGrpcClient(url, certDir string) (*GrpcClient, error) { return &GrpcClient{client: api.NewMetricsServiceClient(conn), connection: conn}, nil } -func (c *GrpcClient) GetMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, *api.PromMetricsMsg, error) { - // nosemgrep: trailofbits.go.invalid-usage-of-modified-variable.invalid-usage-of-modified-variable - response, err := c.client.GetMetrics(ctx, &api.ScaledObjectRef{Name: scaledObjectName, Namespace: scaledObjectNamespace, MetricName: metricName}) +func (c *GrpcClient) GetMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error) { + v1beta1ExtMetrics, err := c.client.GetMetrics(ctx, &api.ScaledObjectRef{Name: scaledObjectName, Namespace: scaledObjectNamespace, MetricName: metricName}) if err != nil { - // in certain cases we would like to get Prometheus metrics even if there's an error - // so we can expose information about the error in the client - return nil, response.GetPromMetrics(), err + return nil, err } extMetrics := &external_metrics.ExternalMetricValueList{} - err = v1beta1.Convert_v1beta1_ExternalMetricValueList_To_external_metrics_ExternalMetricValueList(response.GetMetrics(), extMetrics, nil) + err = v1beta1.Convert_v1beta1_ExternalMetricValueList_To_external_metrics_ExternalMetricValueList(v1beta1ExtMetrics, extMetrics, nil) if err != nil { - // in certain cases we would like to get Prometheus metrics even if there's an error - // so we can expose information about the error in the client - return nil, response.GetPromMetrics(), fmt.Errorf("error when converting metric values %w", err) + return nil, fmt.Errorf("error when converting metric values %w", err) } - return extMetrics, response.GetPromMetrics(), nil + return extMetrics, nil } // WaitForConnectionReady waits for gRPC connection to be ready diff --git a/pkg/metricsservice/server.go b/pkg/metricsservice/server.go index efde50a8b00..9f5f856473f 100644 --- a/pkg/metricsservice/server.go +++ b/pkg/metricsservice/server.go @@ -42,24 +42,21 @@ type GrpcServer struct { } // GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject reference -func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*api.Response, error) { - response := api.Response{} +func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) { v1beta1ExtMetrics := &v1beta1.ExternalMetricValueList{} - extMetrics, exportedMetrics, err := (*s.scalerHandler).GetScaledObjectMetrics(ctx, in.Name, in.Namespace, in.MetricName) - response.PromMetrics = exportedMetrics + extMetrics, err := (*s.scalerHandler).GetScaledObjectMetrics(ctx, in.Name, in.Namespace, in.MetricName) if err != nil { - return &response, fmt.Errorf("error when getting metric values %w", err) + return v1beta1ExtMetrics, fmt.Errorf("error when getting metric values %w", err) } err = v1beta1.Convert_external_metrics_ExternalMetricValueList_To_v1beta1_ExternalMetricValueList(extMetrics, v1beta1ExtMetrics, nil) if err != nil { - return &response, fmt.Errorf("error when converting metric values %w", err) + return v1beta1ExtMetrics, fmt.Errorf("error when converting metric values %w", err) } log.V(1).WithValues("scaledObjectName", in.Name, "scaledObjectNamespace", in.Namespace, "metrics", v1beta1ExtMetrics).Info("Providing metrics") - response.Metrics = v1beta1ExtMetrics - return &response, nil + return v1beta1ExtMetrics, nil } // NewGrpcServer creates a new instance of GrpcServer diff --git a/pkg/mock/mock_scaling/mock_interface.go b/pkg/mock/mock_scaling/mock_interface.go index d0579a12bac..94b0adf6d76 100644 --- a/pkg/mock/mock_scaling/mock_interface.go +++ b/pkg/mock/mock_scaling/mock_interface.go @@ -9,7 +9,6 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - api "github.com/kedacore/keda/v2/pkg/metricsservice/api" cache "github.com/kedacore/keda/v2/pkg/scaling/cache" external_metrics "k8s.io/metrics/pkg/apis/external_metrics" ) @@ -66,13 +65,12 @@ func (mr *MockScaleHandlerMockRecorder) DeleteScalableObject(ctx, scalableObject } // GetScaledObjectMetrics mocks base method. -func (m *MockScaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, *api.PromMetricsMsg, error) { +func (m *MockScaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetScaledObjectMetrics", ctx, scaledObjectName, scaledObjectNamespace, metricName) ret0, _ := ret[0].(*external_metrics.ExternalMetricValueList) - ret1, _ := ret[1].(*api.PromMetricsMsg) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret1, _ := ret[1].(error) + return ret0, ret1 } // GetScaledObjectMetrics indicates an expected call of GetScaledObjectMetrics. diff --git a/pkg/prommetrics/adapter/adapter_prommetrics.go b/pkg/prommetrics/adapter/adapter_prommetrics.go deleted file mode 100644 index c088e4b97d9..00000000000 --- a/pkg/prommetrics/adapter/adapter_prommetrics.go +++ /dev/null @@ -1,140 +0,0 @@ -/* -Copyright 2021 The KEDA Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package adapter - -import ( - "log" - "net/http" - "strconv" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -var ( - metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex"} - scalerErrorsTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "keda_metrics_adapter", - Subsystem: "scaler", - Name: "errors_total", - Help: "Total number of errors for all scalers", - }, - []string{}, - ) - scalerMetricsValue = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "keda_metrics_adapter", - Subsystem: "scaler", - Name: "metrics_value", - Help: "Metric Value used for HPA", - }, - metricLabels, - ) - scalerErrors = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "keda_metrics_adapter", - Subsystem: "scaler", - Name: "errors", - Help: "Number of scaler errors", - }, - metricLabels, - ) - scaledObjectErrors = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "keda_metrics_adapter", - Subsystem: "scaled_object", - Name: "errors", - Help: "Number of scaled object errors", - }, - []string{"namespace", "scaledObject"}, - ) -) - -// PrometheusMetricServer the type of MetricsServer -type PrometheusMetricServer struct{} - -var registry *prometheus.Registry - -func init() { - registry = prometheus.NewRegistry() - registry.MustRegister(scalerErrorsTotal) - registry.MustRegister(scalerMetricsValue) - registry.MustRegister(scalerErrors) - registry.MustRegister(scaledObjectErrors) -} - -// NewServer creates a new http serving instance of prometheus metrics -func (metricsServer PrometheusMetricServer) NewServer(address string, pattern string) { - http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusOK) - _, err := w.Write([]byte("OK")) - if err != nil { - log.Fatalf("Unable to write to serve custom metrics: %v", err) - } - }) - log.Printf("Starting metrics server at %v", address) - http.Handle(pattern, promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) - - // initialize the total error metric - _, errscaler := scalerErrorsTotal.GetMetricWith(prometheus.Labels{}) - if errscaler != nil { - log.Fatalf("Unable to initialize total error metrics as : %v", errscaler) - } - - // nosemgrep: go.lang.security.audit.net.use-tls.use-tls - log.Fatal(http.ListenAndServe(address, nil)) -} - -// RecordHPAScalerMetric create a measurement of the external metric used by the HPA -func (metricsServer PrometheusMetricServer) RecordHPAScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { - scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) -} - -// RecordHPAScalerError counts the number of errors occurred in trying get an external metric used by the HPA -func (metricsServer PrometheusMetricServer) RecordHPAScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { - if err != nil { - scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Inc() - metricsServer.RecordScaledObjectError(namespace, scaledObject, err) - scalerErrorsTotal.With(prometheus.Labels{}).Inc() - return - } - // initialize metric with 0 if not already set - _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)) - if errscaler != nil { - log.Fatalf("Unable to write to serve custom metrics: %v", errscaler) - } -} - -// RecordScalerObjectError counts the number of errors with the scaled object -func (metricsServer PrometheusMetricServer) RecordScaledObjectError(namespace string, scaledObject string, err error) { - labels := prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject} - if err != nil { - scaledObjectErrors.With(labels).Inc() - return - } - // initialize metric with 0 if not already set - _, errscaledobject := scaledObjectErrors.GetMetricWith(labels) - if errscaledobject != nil { - log.Fatalf("Unable to write to serve custom metrics: %v", errscaledobject) - return - } -} - -func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string) prometheus.Labels { - return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric} -} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index c6a5752705a..fd1bdc6bbf0 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -19,7 +19,6 @@ package provider import ( "context" "fmt" - "strings" "sync" "github.com/go-logr/logr" @@ -32,14 +31,10 @@ import ( "sigs.k8s.io/custom-metrics-apiserver/pkg/provider" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/fallback" "github.com/kedacore/keda/v2/pkg/metricsservice" - prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/adapter" "github.com/kedacore/keda/v2/pkg/scaling" ) -// prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/adapter" - // KedaProvider implements External Metrics Provider type KedaProvider struct { client client.Client @@ -55,7 +50,6 @@ type KedaProvider struct { var ( logger logr.Logger - promMetricsServer prommetrics.PrometheusMetricServer grpcClientConnected bool ) @@ -104,126 +98,29 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } // Get Metrics from Metrics Service gRPC Server - if p.useMetricsServiceGrpc { - if !p.grpcClient.WaitForConnectionReady(ctx, logger) { - grpcClientConnected = false - logger.Error(fmt.Errorf("timeout while waiting to establish gRPC connection to KEDA Metrics Service server"), "timeout", "server", p.grpcClient.GetServerURL()) - return nil, err - } - if !grpcClientConnected { - grpcClientConnected = true - logger.Info("Connection to KEDA Metrics Service gRPC server has been successfully established", "server", p.grpcClient.GetServerURL()) - } - - // selector is in form: `scaledobject.keda.sh/name: scaledobject-name` - scaledObjectName := selector.Get(kedav1alpha1.ScaledObjectOwnerAnnotation) - if scaledObjectName == "" { - err := fmt.Errorf("scaledObject name is not specified") - logger.Error(err, fmt.Sprintf("please specify scaledObject name, it needs to be set as value of label selector %q on the query", kedav1alpha1.ScaledObjectOwnerAnnotation)) - - return &external_metrics.ExternalMetricValueList{}, err - } - - metrics, promMetrics, err := p.grpcClient.GetMetrics(ctx, scaledObjectName, namespace, info.Metric) - logger.V(1).WithValues("scaledObjectName", scaledObjectName, "scaledObjectNamespace", namespace, "metrics", metrics).Info("Receiving metrics") - - // [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server - if promMetrics != nil { - var scaledObjectErr error - if promMetrics.ScaledObjectErr { - scaledObjectErr = fmt.Errorf("scaledObject error") - } - promMetricsServer.RecordScaledObjectError(namespace, scaledObjectName, scaledObjectErr) - for _, scalerMetric := range promMetrics.ScalerMetric { - promMetricsServer.RecordHPAScalerMetric(namespace, scaledObjectName, scalerMetric.ScalerName, int(scalerMetric.ScalerIndex), scalerMetric.MetricName, float64(scalerMetric.MetricValue)) - } - for _, scalerError := range promMetrics.ScalerError { - var scalerErr error - if scalerError.Error { - scalerErr = fmt.Errorf("scaler error") - } - promMetricsServer.RecordHPAScalerError(namespace, scaledObjectName, scalerError.ScalerName, int(scalerError.ScalerIndex), scalerError.MetricName, scalerErr) - } - } - - return metrics, err - } - - // ------ Deprecated way of getting metric directly from MS ------ // - // --------------------------------------------------------------- // - // Get Metrics by querying directly the external service - scaledObjects := &kedav1alpha1.ScaledObjectList{} - opts := []client.ListOption{ - client.InNamespace(namespace), - client.MatchingLabels(selector), - } - err = p.client.List(ctx, scaledObjects, opts...) - if err != nil { + if !p.grpcClient.WaitForConnectionReady(ctx, logger) { + grpcClientConnected = false + logger.Error(fmt.Errorf("timeout while waiting to establish gRPC connection to KEDA Metrics Service server"), "timeout", "server", p.grpcClient.GetServerURL()) return nil, err - } else if len(scaledObjects.Items) != 1 { - return nil, fmt.Errorf("exactly one ScaledObject should match label %s", metricSelector.String()) } - - scaledObject := &scaledObjects.Items[0] - var matchingMetrics []external_metrics.ExternalMetricValue - - cache, err := p.scaleHandler.GetScalersCache(ctx, scaledObject) - promMetricsServer.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, err) - if err != nil { - return nil, fmt.Errorf("error when getting scalers %w", err) + if !grpcClientConnected { + grpcClientConnected = true + logger.Info("Connection to KEDA Metrics Service gRPC server has been successfully established", "server", p.grpcClient.GetServerURL()) } - // let's check metrics for all scalers in a ScaledObject - scalerError := false - scalers, scalerConfigs := cache.GetScalers() - for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ { - metricSpecs := scalers[scalerIndex].GetMetricSpecForScaling(ctx) - scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1) - if scalerConfigs[scalerIndex].TriggerName != "" { - scalerName = scalerConfigs[scalerIndex].TriggerName - } - - for _, metricSpec := range metricSpecs { - // skip cpu/memory resource scaler - if metricSpec.External == nil { - continue - } - // Filter only the desired metric - if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - metrics, _, _, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, info.Metric) - metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, metrics, err, info.Metric, scaledObject, metricSpec) - if err != nil { - scalerError = true - logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerName) - } else { - for _, metric := range metrics { - metricValue := metric.Value.AsApproximateFloat64() - promMetricsServer.RecordHPAScalerMetric(namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue) - } - matchingMetrics = append(matchingMetrics, metrics...) - } - promMetricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, err) - } - } - } + // selector is in form: `scaledobject.keda.sh/name: scaledobject-name` + scaledObjectName := selector.Get(kedav1alpha1.ScaledObjectOwnerAnnotation) + if scaledObjectName == "" { + err := fmt.Errorf("scaledObject name is not specified") + logger.Error(err, fmt.Sprintf("please specify scaledObject name, it needs to be set as value of label selector %q on the query", kedav1alpha1.ScaledObjectOwnerAnnotation)) - // invalidate the cache for the ScaledObject, if we hit an error in any scaler - // in this case we try to build all scalers (and resolve all secrets/creds) again in the next call - if scalerError { - err := p.scaleHandler.ClearScalersCache(ctx, scaledObject) - if err != nil { - logger.Error(err, "error clearing scalers cache") - } - logger.V(1).Info("scaler error encountered, clearing scaler cache") + return &external_metrics.ExternalMetricValueList{}, err } - if len(matchingMetrics) == 0 { - return nil, fmt.Errorf("no matching metrics found for " + info.Metric) - } + metrics, err := p.grpcClient.GetMetrics(ctx, scaledObjectName, namespace, info.Metric) + logger.V(1).WithValues("scaledObjectName", scaledObjectName, "scaledObjectNamespace", namespace, "metrics", metrics).Info("Receiving metrics") - return &external_metrics.ExternalMetricValueList{ - Items: matchingMetrics, - }, nil + return metrics, err } // ListAllExternalMetrics returns the supported external metrics for this provider diff --git a/pkg/scalers/externalscaler/externalscaler.pb.go b/pkg/scalers/externalscaler/externalscaler.pb.go index 3805e0b5eb4..92785942093 100644 --- a/pkg/scalers/externalscaler/externalscaler.pb.go +++ b/pkg/scalers/externalscaler/externalscaler.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 +// protoc-gen-go v1.30.0 // protoc v3.21.12 // source: externalscaler.proto diff --git a/pkg/scalers/liiklus/LiiklusService.pb.go b/pkg/scalers/liiklus/LiiklusService.pb.go index 8d34aea93d9..f55339283a8 100644 --- a/pkg/scalers/liiklus/LiiklusService.pb.go +++ b/pkg/scalers/liiklus/LiiklusService.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 +// protoc-gen-go v1.30.0 // protoc v3.21.12 // source: LiiklusService.proto @@ -392,7 +392,7 @@ type AckRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Deprecated: Do not use. + // Deprecated: Marked as deprecated in LiiklusService.proto. Assignment *Assignment `protobuf:"bytes,1,opt,name=assignment,proto3" json:"assignment,omitempty"` Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` Group string `protobuf:"bytes,4,opt,name=group,proto3" json:"group,omitempty"` @@ -433,7 +433,7 @@ func (*AckRequest) Descriptor() ([]byte, []int) { return file_LiiklusService_proto_rawDescGZIP(), []int{5} } -// Deprecated: Do not use. +// Deprecated: Marked as deprecated in LiiklusService.proto. func (x *AckRequest) GetAssignment() *Assignment { if x != nil { return x.Assignment diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 913d6beb020..031155b1384 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -36,7 +36,6 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/fallback" - metricsserviceapi "github.com/kedacore/keda/v2/pkg/metricsservice/api" "github.com/kedacore/keda/v2/pkg/prommetrics" "github.com/kedacore/keda/v2/pkg/scalers" "github.com/kedacore/keda/v2/pkg/scaling/cache" @@ -55,7 +54,7 @@ type ScaleHandler interface { GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) ClearScalersCache(ctx context.Context, scalableObject interface{}) error - GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, *metricsserviceapi.PromMetricsMsg, error) + GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error) } type scaleHandler struct { @@ -399,25 +398,16 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int // GetScaledObjectMetrics returns metrics for specified metric name for a ScaledObject identified by it's name and namespace. // The second return value are Prometheus metrics that needed to be exposed (used by DEPRECATED Prometheus Server on KEDA Metrics Server) // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. -func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, *metricsserviceapi.PromMetricsMsg, error) { +func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error) { logger := log.WithValues("scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName) var matchingMetrics []external_metrics.ExternalMetricValue - exportedPromMetrics := metricsserviceapi.PromMetricsMsg{ - ScaledObjectErr: false, - ScalerMetric: []*metricsserviceapi.ScalerMetricMsg{}, - ScalerError: []*metricsserviceapi.ScalerErrorMsg{}, - } - cache, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace) prommetrics.RecordScaledObjectError(scaledObjectNamespace, scaledObjectName, err) - // [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server - exportedPromMetrics.ScaledObjectErr = (err != nil) - if err != nil { - return nil, &exportedPromMetrics, fmt.Errorf("error getting scalers %w", err) + return nil, fmt.Errorf("error getting scalers %w", err) } var scaledObject *kedav1alpha1.ScaledObject @@ -426,7 +416,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } else { err := fmt.Errorf("scaledObject not found in the cache") logger.Error(err, "scaledObject not found in the cache") - return nil, &exportedPromMetrics, err + return nil, err } isScalerError := false @@ -487,28 +477,10 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, metricValue) - - // [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server - scalerMetricMsg := metricsserviceapi.ScalerMetricMsg{ - ScalerName: scalerName, - ScalerIndex: int32(scalerIndex), - MetricName: metricName, - MetricValue: float32(metricValue), - } - exportedPromMetrics.ScalerMetric = append(exportedPromMetrics.ScalerMetric, &scalerMetricMsg) } matchingMetrics = append(matchingMetrics, metrics...) } prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, err) - - // [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server - scalerErrMsg := metricsserviceapi.ScalerErrorMsg{ - ScalerName: scalerName, - ScalerIndex: int32(scalerIndex), - MetricName: metricName, - Error: (err != nil), - } - exportedPromMetrics.ScalerError = append(exportedPromMetrics.ScalerError, &scalerErrMsg) } } } @@ -524,12 +496,12 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } if len(matchingMetrics) == 0 { - return nil, &exportedPromMetrics, fmt.Errorf("no matching metrics found for " + metricName) + return nil, fmt.Errorf("no matching metrics found for " + metricName) } return &external_metrics.ExternalMetricValueList{ Items: matchingMetrics, - }, &exportedPromMetrics, nil + }, nil } // getScaledObjectState returns whether the input ScaledObject: diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 79b2ac155ee..dc9b40fdf5d 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -123,9 +123,8 @@ func TestGetScaledObjectMetrics_DirectCall(t *testing.T) { scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) // hitting directly GetMetricsAndActivity() scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{metricValue}, true, nil) - metrics, promMsg, err := sh.GetScaledObjectMetrics(context.TODO(), scaledObjectName, scaledObjectNamespace, metricName) + metrics, err := sh.GetScaledObjectMetrics(context.TODO(), scaledObjectName, scaledObjectNamespace, metricName) assert.NotNil(t, metrics) - assert.NotNil(t, promMsg) assert.Nil(t, err) scaler.EXPECT().Close(gomock.Any()) @@ -214,9 +213,8 @@ func TestGetScaledObjectMetrics_FromCache(t *testing.T) { mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) // hitting cache here instead of calling GetMetricsAndActivity() - metrics, promMsg, err := sh.GetScaledObjectMetrics(context.TODO(), scaledObjectName, scaledObjectNamespace, metricName) + metrics, err := sh.GetScaledObjectMetrics(context.TODO(), scaledObjectName, scaledObjectNamespace, metricName) assert.NotNil(t, metrics) - assert.NotNil(t, promMsg) assert.Nil(t, err) scaler.EXPECT().Close(gomock.Any()) diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index 85d2d8773d2..4e823009d73 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -259,7 +259,6 @@ func TestPrometheusMetrics(t *testing.T) { testScaledObjectErrors(t, data) testScalerErrors(t, data) testScalerErrorsTotal(t, data) - testMetricsServerScalerMetricValue(t) testOperatorMetrics(t, kc, data) testWebhookMetrics(t, data) @@ -486,30 +485,6 @@ func testScalerActiveMetric(t *testing.T) { } } -// [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server -func testMetricsServerScalerMetricValue(t *testing.T) { - t.Log("--- testing scaler metric value in metrics server ---") - - family := fetchAndParsePrometheusMetrics(t, "curl --insecure http://keda-metrics-apiserver.keda:9022/metrics") - - if val, ok := family["keda_metrics_adapter_scaler_metrics_value"]; ok { - var found bool - metrics := val.GetMetric() - for _, metric := range metrics { - labels := metric.GetLabel() - for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { - assert.Equal(t, float64(4), *metric.Gauge.Value) - found = true - } - } - } - assert.Equal(t, true, found) - } else { - t.Errorf("metric not available") - } -} - func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing operator metrics ---") testOperatorMetricValues(t, kc)