From 059ca32994bc2e1946c15ed572e9524f979dcb10 Mon Sep 17 00:00:00 2001 From: Leonardo Cecchi Date: Mon, 29 Jan 2024 16:14:26 +0100 Subject: [PATCH] feat: always remove stale unix sockets (#2) Signed-off-by: Leonardo Cecchi --- go.mod | 1 + go.sum | 6 +- pkg/pluginhelper/server.go | 176 +++++++++++++++++++++++++++---------- 3 files changed, 134 insertions(+), 49 deletions(-) diff --git a/go.mod b/go.mod index 37b0214..d264341 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/evanphx/json-patch/v5 v5.8.1 github.com/go-logr/logr v1.3.0 github.com/go-logr/zapr v1.2.4 + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.14.0 go.uber.org/zap v1.26.0 diff --git a/go.sum b/go.sum index 397df23..c85cfd6 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -272,8 +274,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/pluginhelper/server.go b/pkg/pluginhelper/server.go index 848c3ca..2148938 100644 --- a/pkg/pluginhelper/server.go +++ b/pkg/pluginhelper/server.go @@ -1,10 +1,16 @@ package pluginhelper import ( + "context" + "errors" "net" + "os" + "os/signal" "path" + "syscall" "github.com/cloudnative-pg/cnpg-i/pkg/identity" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" "github.com/spf13/cobra" "github.com/spf13/viper" "google.golang.org/grpc" @@ -31,53 +37,7 @@ func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnri }, Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { - logger := logging.FromContext(cmd.Context()) - - identityResponse, err := identityImpl.GetPluginMetadata( - cmd.Context(), - &identity.GetPluginMetadataRequest{}) - if err != nil { - logger.Error(err, "Error while querying the identity service") - return err - } - - pluginPath := viper.GetString("plugin-path") - pluginName := identityResponse.Name - pluginDisplayName := identityResponse.DisplayName - pluginVersion := identityResponse.Version - socketName := path.Join(pluginPath, identityResponse.Name) - - grpcServer := grpc.NewServer() - identity.RegisterIdentityServer( - grpcServer, - identityImpl) - for _, enrich := range enrichers { - enrich(grpcServer) - } - - listener, err := net.Listen( - unixNetwork, - socketName, - ) - if err != nil { - logger.Error(err, "While starting server") - return err - } - - logger.Info( - "Starting plugin", - "path", pluginPath, - "name", pluginName, - "displayName", pluginDisplayName, - "version", pluginVersion, - "socketName", socketName, - ) - err = grpcServer.Serve(listener) - if err != nil { - logger.Error(err, "While terminatind server") - } - - return err + return run(cmd.Context(), identityImpl, enrichers...) }, } @@ -97,3 +57,125 @@ func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnri return cmd } + +// run starts listining for GRPC requests +func run(ctx context.Context, identityImpl identity.IdentityServer, enrichers ...ServerEnricher) error { + logger := logging.FromContext(ctx) + + identityResponse, err := identityImpl.GetPluginMetadata( + ctx, + &identity.GetPluginMetadataRequest{}) + if err != nil { + logger.Error(err, "Error while querying the identity service") + return err + } + + pluginPath := viper.GetString("plugin-path") + pluginName := identityResponse.Name + pluginDisplayName := identityResponse.DisplayName + pluginVersion := identityResponse.Version + socketName := path.Join(pluginPath, identityResponse.Name) + + // Remove stale unix socket it still existent + if err := removeStaleSocket(ctx, socketName); err != nil { + logger.Error(err, "While removing old unix socket") + return err + } + + // Start accepting connections on the socket + listener, err := net.Listen( + unixNetwork, + socketName, + ) + if err != nil { + logger.Error(err, "While starting server") + return err + } + + // Handle quit-like signal + handleSignals(ctx, listener) + + // Create GRPC server + grpcServer := grpc.NewServer( + grpc.ChainUnaryInterceptor( + recovery.UnaryServerInterceptor(recovery.WithRecoveryHandlerContext(panicRecoveryHandler(listener))), + ), + grpc.ChainStreamInterceptor( + recovery.StreamServerInterceptor(recovery.WithRecoveryHandlerContext(panicRecoveryHandler(listener))), + ), + ) + identity.RegisterIdentityServer( + grpcServer, + identityImpl) + for _, enrich := range enrichers { + enrich(grpcServer) + } + + logger.Info( + "Starting plugin", + "path", pluginPath, + "name", pluginName, + "displayName", pluginDisplayName, + "version", pluginVersion, + "socketName", socketName, + ) + + if err = grpcServer.Serve(listener); !errors.Is(err, net.ErrClosed) { + logger.Error(err, "While terminating server") + } + + return nil +} + +// removeStaleSocket removes a stale unix domain socket +func removeStaleSocket(ctx context.Context, pluginPath string) error { + logger := logging.FromContext(ctx) + _, err := os.Stat(pluginPath) + + switch { + case err == nil: + logger.Info("Removing stale socket", "pluginPath", pluginPath) + return os.Remove(pluginPath) + + case errors.Is(err, os.ErrNotExist): + return nil + + default: + return err + } +} + +// handleSignals makes sure that we close the listening socket +// when we receive a quit-like signal +func handleSignals(ctx context.Context, listener net.Listener) { + logger := logging.FromContext(ctx) + + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM, syscall.SIGABRT, syscall.SIGINT) + go func(c chan os.Signal) { + sig := <-c + logger.Info( + "Caught signal, shutting down.", + "signal", sig.String()) + + if err := listener.Close(); err != nil { + logger.Error(err, "While stopping server") + } + + os.Exit(1) + }(sigc) +} + +func panicRecoveryHandler(listener net.Listener) recovery.RecoveryHandlerFuncContext { + return func(ctx context.Context, err any) error { + logger := logging.FromContext(ctx) + logger.Info("Panic occurred", "error", err) + + if closeError := listener.Close(); closeError != nil { + logger.Error(closeError, "While stopping server") + } + + os.Exit(1) + return nil + } +}