Skip to content

Commit

Permalink
Metrics Adapter: use gRPC connection to get metrics from Operator
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik committed Nov 16, 2022
1 parent dab87b5 commit 1dbd37c
Show file tree
Hide file tree
Showing 91 changed files with 978 additions and 332 deletions.
1 change: 0 additions & 1 deletion CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ KEDA works in conjunction with Kubernetes Horizontal Pod Autoscaler (HPA). When

The return type of this function is `MetricSpec`, but in KEDA's case we will mostly write External metrics. So the property that should be filled is `ExternalMetricSource`, where the:
- `MetricName`: the name of our metric we are returning in this scaler. The name should be unique, to allow setting multiple (even the same type) Triggers in one ScaledObject, but each function call should return the same name.
- `MetricSelector`: //TODO
- `TargetValue`: is the value of the metric we want to reach at all times at all costs. As long as the current metric doesn't match TargetValue, HPA will increase the number of the pods until it reaches the maximum number of pods allowed to scale to.
- `TargetAverageValue`: the value of the metric for which we require one pod to handle. e.g. if we are have a scaler based on the length of a message queue, and we specificy 10 for `TargetAverageValue`, we are saying that each pod will handle 10 messages. So if the length of the queue becomes 30, we expect that we have 3 pods in our cluster. (`TargetAverage` and `TargetValue` are mutually exclusive)

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ clientset-generate: ## Generate client-go clientset, listers and informers.
proto-gen: protoc-gen ## Generate Liiklus, ExternalScaler and MetricsService proto
PATH=$(LOCALBIN):$(PATH) protoc -I vendor --proto_path=hack LiiklusService.proto --go_out=pkg/scalers/liiklus --go-grpc_out=pkg/scalers/liiklus
PATH=$(LOCALBIN):$(PATH) protoc -I vendor --proto_path=pkg/scalers/externalscaler externalscaler.proto --go_out=pkg/scalers/externalscaler --go-grpc_out=pkg/scalers/externalscaler
PATH=$(LOCALBIN):$(PATH) protoc -I vendor --proto_path=pkg/metricsservice/api metrics.proto --go_out=pkg/metricsservice/api --go-grpc_out=pkg/metricsservice/api

.PHONY: mockgen-gen
mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go
Expand Down
15 changes: 12 additions & 3 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
generatedopenapi "github.com/kedacore/keda/v2/adapter/generated/openapi"
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/prommetrics"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
"github.com/kedacore/keda/v2/pkg/scaling"
Expand Down Expand Up @@ -138,13 +139,21 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat

prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()
stopCh := make(chan struct{})

if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
// TODO make the address configurable
grpcServerAddr := "keda-operator.keda.svc.cluster.local:9666"
logger.Info("Connecting Metrics Service gRPC client to the server", "address", grpcServerAddr)
grpcClient, err := metricsservice.NewGrpcClient(grpcServerAddr)
if err != nil {
logger.Error(err, "error connecting Metrics Service gRPC client to the server", "address", grpcServerAddr)
return nil, nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
return nil, nil, err
}
return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}) error {
Expand Down
27 changes: 27 additions & 0 deletions apis/keda/v1alpha1/indentifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2022 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"fmt"
"strings"
)

// GenerateIdenitifier returns identifier for the object in form "kind.namespace.name"
func GenerateIdenitifier(kind, name, namespace string) string {
return strings.ToLower(fmt.Sprintf("%s.%s.%s", kind, namespace, name))
}
4 changes: 1 addition & 3 deletions apis/keda/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package v1alpha1

import (
"fmt"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -91,5 +89,5 @@ func (t *WithTriggers) GetPollingInterval() time.Duration {

// GenerateIdenitifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdenitifier() string {
return strings.ToLower(fmt.Sprintf("%s.%s.%s", t.Kind, t.Namespace, t.Name))
return GenerateIdenitifier(t.Kind, t.Namespace, t.Name)
}
1 change: 1 addition & 0 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
resources:
- manager.yaml
- service.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
Expand Down
17 changes: 17 additions & 0 deletions config/manager/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/name: keda-operator
app.kubernetes.io/version: latest
app.kubernetes.io/part-of: keda-operator
name: keda-operator
namespace: keda
spec:
ports:
- name: metricsservice
port: 9666
targetPort: 9666
selector:
app: keda-operator
2 changes: 1 addition & 1 deletion controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context,
var externalMetricNames []string
var resourceMetricNames []string

cache, err := r.scaleHandler.GetScalersCache(ctx, scaledObject)
cache, err := r.ScaleHandler.GetScalersCache(ctx, scaledObject)
if err != nil {
logger.Error(err, "Error getting scalers")
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var _ = Describe("hpa", func() {
logger = logr.Discard()
reconciler = ScaledObjectReconciler{
Client: client,
scaleHandler: scaleHandler,
ScaleHandler: scaleHandler,
}
})

Expand Down
70 changes: 23 additions & 47 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
autoscalingv1 "k8s.io/api/autoscaling/v1"
Expand All @@ -32,8 +31,6 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand All @@ -42,7 +39,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
Expand All @@ -65,16 +61,14 @@ import (

// ScaledObjectReconciler reconciles a ScaledObject object
type ScaledObjectReconciler struct {
Client client.Client
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder
Client client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
ScaleClient scale.ScalesGetter
ScaleHandler scaling.ScaleHandler

scaleClient scale.ScalesGetter
restMapper meta.RESTMapper
scaledObjectsGenerations *sync.Map
scaleHandler scaling.ScaleHandler
kubeVersion kedautil.K8sVersion
}

type scaledObjectMetricsData struct {
Expand Down Expand Up @@ -102,33 +96,24 @@ func init() {

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
setupLog := log.Log.WithName("setup")
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}

// create Discovery clientset
// TODO If we need to increase the QPS of scaling API calls, copy and tweak this RESTConfig.
clientset, err := discovery.NewDiscoveryClientForConfig(mgr.GetConfig())
if err != nil {
setupLog.Error(err, "Not able to create Discovery clientset")
return err
if r.ScaleHandler == nil {
return fmt.Errorf("ScaledObjectReconciler.ScaleHandler is not initialized")
}

// Find out Kubernetes version
version, err := clientset.ServerVersion()
if err == nil {
r.kubeVersion = kedautil.NewK8sVersion(version)
setupLog.Info("Running on Kubernetes "+r.kubeVersion.PrettyVersion, "version", version)
} else {
setupLog.Error(err, "Not able to get Kubernetes version")
if r.Client == nil {
return fmt.Errorf("ScaledObjectReconciler.Client is not initialized")
}
if r.ScaleClient == nil {
return fmt.Errorf("ScaledObjectReconciler.ScaleClient is not initialized")
}
if r.Scheme == nil {
return fmt.Errorf("ScaledObjectReconciler.Scheme is not initialized")
}
if r.Recorder == nil {
return fmt.Errorf("ScaledObjectReconciler.Recorder is not initialized")
}

// Create Scale Client
scaleClient := initScaleClient(mgr, clientset)
r.scaleClient = scaleClient

// Init the rest of ScaledObjectReconciler
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout, r.Recorder)

// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand All @@ -147,15 +132,6 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
Complete(r)
}

func initScaleClient(mgr manager.Manager, clientset *discovery.DiscoveryClient) scale.ScalesGetter {
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(clientset)
return scale.New(
clientset.RESTClient(), mgr.GetRESTMapper(),
dynamic.LegacyAPIPathResolverFunc,
scaleKindResolver,
)
}

// Reconcile performs reconciliation on the identified ScaledObject resource based on the request information passed, returns the result and an error (if any).
func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLogger := log.FromContext(ctx)
Expand Down Expand Up @@ -319,7 +295,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
// not cached, let's try to detect /scale subresource
// also rechecks when we need to update the status.
var errScale error
scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
scale, errScale = (r.ScaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
Expand Down Expand Up @@ -467,7 +443,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger lo
return err
}

if err = r.scaleHandler.HandleScalableObject(ctx, scaledObject); err != nil {
if err = r.ScaleHandler.HandleScalableObject(ctx, scaledObject); err != nil {
return err
}

Expand All @@ -485,7 +461,7 @@ func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.
return err
}

if err := r.scaleHandler.DeleteScalableObject(ctx, scaledObject); err != nil {
if err := r.ScaleHandler.DeleteScalableObject(ctx, scaledObject); err != nil {
return err
}
// delete ScaledObject's current Generation
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var _ = Describe("ScaledObjectController", func() {
mockStatusWriter = mock_client.NewMockStatusWriter(ctrl)

metricNameTestReconciler = ScaledObjectReconciler{
scaleHandler: mockScaleHandler,
ScaleHandler: mockScaleHandler,
Client: mockClient,
}
})
Expand Down
4 changes: 2 additions & 2 deletions controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
logger.V(1).Info("Failed to restore scaleTarget's replica count back to the original, the scaling haven't been probably initialized yet.")
} else {
// We have enough information about the scaleTarget, let's proceed.
scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
scale, err := r.ScaleClient.Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
logger.V(1).Info("Failed to get scaleTarget's scale status, because it was probably deleted", "error", err)
Expand All @@ -62,7 +62,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
}
} else {
scale.Spec.Replicas = *scaledObject.Status.OriginalReplicaCount
_, err = r.scaleClient.Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{})
_, err = r.ScaleClient.Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Failed to restore scaleTarget's replica count back to the original", "finalizer", scaledObjectFinalizer)
}
Expand Down
14 changes: 11 additions & 3 deletions controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"path/filepath"
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -34,6 +35,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/k8s"
"github.com/kedacore/keda/v2/pkg/scaling"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -81,10 +84,15 @@ var _ = BeforeSuite(func(done Done) {
})
Expect(err).ToNot(HaveOccurred())

scaleClient, _, err := k8s.InitScaleClient(k8sManager)
Expect(err).ToNot(HaveOccurred())

err = (&ScaledObjectReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator")),
ScaleClient: scaleClient,
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())

Expand Down
Loading

0 comments on commit 1dbd37c

Please sign in to comment.