diff --git a/main.go b/main.go index f4cb26de348..88937a4a26e 100644 --- a/main.go +++ b/main.go @@ -207,6 +207,12 @@ func main() { os.Exit(1) } + grpcServer := metricsservice.NewGrpcServer(&scaledHandler, metricsServiceAddr) + if err := mgr.Add(&grpcServer); err != nil { + setupLog.Error(err, "unable to set up Metrics Service gRPC server") + os.Exit(1) + } + setupLog.Info("Starting manager") setupLog.Info(fmt.Sprintf("KEDA Version: %s", version.Version)) setupLog.Info(fmt.Sprintf("Git Commit: %s", version.GitCommit)) @@ -214,14 +220,6 @@ func main() { setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) setupLog.Info(fmt.Sprintf("Running on Kubernetes %s", kubeVersion.PrettyVersion), "version", kubeVersion.Version) - go func() { - setupLog.Info("Starting Metrics Service gRPC Server", "address", metricsServiceAddr) - if err := metricsservice.StartServer(&scaledHandler, metricsServiceAddr); err != nil { - setupLog.Error(err, "unable to start Metrics Service gRPC server") - os.Exit(1) - } - }() - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) diff --git a/pkg/metricsservice/server.go b/pkg/metricsservice/server.go index ae4a5c2d85d..5599dc34eb8 100644 --- a/pkg/metricsservice/server.go +++ b/pkg/metricsservice/server.go @@ -31,12 +31,15 @@ import ( var log = ctrl.Log.WithName("grpc_server") -type grpcServer struct { +type GrpcServer struct { + server *grpc.Server + address string scalerHandler *scaling.ScaleHandler api.UnimplementedMetricsServiceServer } -func (s *grpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) { +// GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject reference +func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) { // TODO hit the metrics cache here first cache, err := (*s.scalerHandler).GetScalersCacheForScaledObject(ctx, in.Name, in.Namespace) @@ -62,25 +65,56 @@ func (s *grpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (* return v1beta1ExtMetrics, nil } -func newGrpcServer(scaleHandler *scaling.ScaleHandler) *grpc.Server { +// NewGrpcServer creates a new instance of GrpcServer +func NewGrpcServer(scaleHandler *scaling.ScaleHandler, address string) GrpcServer { gsrv := grpc.NewServer() - srv := grpcServer{ + srv := GrpcServer{ + server: gsrv, + address: address, scalerHandler: scaleHandler, } api.RegisterMetricsServiceServer(gsrv, &srv) - return gsrv + return srv } -func StartServer(scaleHandler *scaling.ScaleHandler, address string) error { - lis, err := net.Listen("tcp", address) +func (s *GrpcServer) startServer() error { + lis, err := net.Listen("tcp", s.address) if err != nil { return fmt.Errorf("failed to listen: %v", err) } - if err := newGrpcServer(scaleHandler).Serve(lis); err != nil { + if err := s.server.Serve(lis); err != nil { return fmt.Errorf("failed to serve: %v", err) } return nil } + +// Start starts a new gRPC Metrics Service, this implements Runnable interface +// of controller-runtime Manager, so we can use mgr.Add() to start this component. +func (s *GrpcServer) Start(ctx context.Context) error { + errChan := make(chan error) + + go func() { + log.Info("Starting Metrics Service gRPC Server", "address", s.address) + if err := s.startServer(); err != nil { + log.Error(err, "unable to start Metrics Service gRPC server", "address", s.address) + errChan <- err + } + }() + + select { + case err := <-errChan: + return err + case <-ctx.Done(): + return nil + } +} + +// NeedLeaderElection is needed to implement LeaderElectionRunnable interface +// of controller-runtime. This assures that the component is started/stoped +// when this particular instance is selected/deselected as a leader. +func (s *GrpcServer) NeedLeaderElection() bool { + return true +}