Skip to content

Commit

Permalink
feat: always remove stale unix sockets (#2)
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Cecchi <[email protected]>
  • Loading branch information
leonardoce authored Jan 29, 2024
1 parent 5caf526 commit 059ca32
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 49 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/evanphx/json-patch/v5 v5.8.1
github.com/go-logr/logr v1.3.0
github.com/go-logr/zapr v1.2.4
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.14.0
go.uber.org/zap v1.26.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
Expand Down Expand Up @@ -272,8 +274,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
176 changes: 129 additions & 47 deletions pkg/pluginhelper/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package pluginhelper

import (
"context"
"errors"
"net"
"os"
"os/signal"
"path"
"syscall"

"github.com/cloudnative-pg/cnpg-i/pkg/identity"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
Expand All @@ -31,53 +37,7 @@ func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnri
},
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
logger := logging.FromContext(cmd.Context())

identityResponse, err := identityImpl.GetPluginMetadata(
cmd.Context(),
&identity.GetPluginMetadataRequest{})
if err != nil {
logger.Error(err, "Error while querying the identity service")
return err
}

pluginPath := viper.GetString("plugin-path")
pluginName := identityResponse.Name
pluginDisplayName := identityResponse.DisplayName
pluginVersion := identityResponse.Version
socketName := path.Join(pluginPath, identityResponse.Name)

grpcServer := grpc.NewServer()
identity.RegisterIdentityServer(
grpcServer,
identityImpl)
for _, enrich := range enrichers {
enrich(grpcServer)
}

listener, err := net.Listen(
unixNetwork,
socketName,
)
if err != nil {
logger.Error(err, "While starting server")
return err
}

logger.Info(
"Starting plugin",
"path", pluginPath,
"name", pluginName,
"displayName", pluginDisplayName,
"version", pluginVersion,
"socketName", socketName,
)
err = grpcServer.Serve(listener)
if err != nil {
logger.Error(err, "While terminatind server")
}

return err
return run(cmd.Context(), identityImpl, enrichers...)
},
}

Expand All @@ -97,3 +57,125 @@ func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnri

return cmd
}

// run starts listining for GRPC requests
func run(ctx context.Context, identityImpl identity.IdentityServer, enrichers ...ServerEnricher) error {
logger := logging.FromContext(ctx)

identityResponse, err := identityImpl.GetPluginMetadata(
ctx,
&identity.GetPluginMetadataRequest{})
if err != nil {
logger.Error(err, "Error while querying the identity service")
return err
}

pluginPath := viper.GetString("plugin-path")
pluginName := identityResponse.Name
pluginDisplayName := identityResponse.DisplayName
pluginVersion := identityResponse.Version
socketName := path.Join(pluginPath, identityResponse.Name)

// Remove stale unix socket it still existent
if err := removeStaleSocket(ctx, socketName); err != nil {
logger.Error(err, "While removing old unix socket")
return err
}

// Start accepting connections on the socket
listener, err := net.Listen(
unixNetwork,
socketName,
)
if err != nil {
logger.Error(err, "While starting server")
return err
}

// Handle quit-like signal
handleSignals(ctx, listener)

// Create GRPC server
grpcServer := grpc.NewServer(
grpc.ChainUnaryInterceptor(
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandlerContext(panicRecoveryHandler(listener))),
),
grpc.ChainStreamInterceptor(
recovery.StreamServerInterceptor(recovery.WithRecoveryHandlerContext(panicRecoveryHandler(listener))),
),
)
identity.RegisterIdentityServer(
grpcServer,
identityImpl)
for _, enrich := range enrichers {
enrich(grpcServer)
}

logger.Info(
"Starting plugin",
"path", pluginPath,
"name", pluginName,
"displayName", pluginDisplayName,
"version", pluginVersion,
"socketName", socketName,
)

if err = grpcServer.Serve(listener); !errors.Is(err, net.ErrClosed) {
logger.Error(err, "While terminating server")
}

return nil
}

// removeStaleSocket removes a stale unix domain socket
func removeStaleSocket(ctx context.Context, pluginPath string) error {
logger := logging.FromContext(ctx)
_, err := os.Stat(pluginPath)

switch {
case err == nil:
logger.Info("Removing stale socket", "pluginPath", pluginPath)
return os.Remove(pluginPath)

case errors.Is(err, os.ErrNotExist):
return nil

default:
return err
}
}

// handleSignals makes sure that we close the listening socket
// when we receive a quit-like signal
func handleSignals(ctx context.Context, listener net.Listener) {
logger := logging.FromContext(ctx)

sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGABRT, syscall.SIGINT)
go func(c chan os.Signal) {
sig := <-c
logger.Info(
"Caught signal, shutting down.",
"signal", sig.String())

if err := listener.Close(); err != nil {
logger.Error(err, "While stopping server")
}

os.Exit(1)
}(sigc)
}

func panicRecoveryHandler(listener net.Listener) recovery.RecoveryHandlerFuncContext {
return func(ctx context.Context, err any) error {
logger := logging.FromContext(ctx)
logger.Info("Panic occurred", "error", err)

if closeError := listener.Close(); closeError != nil {
logger.Error(closeError, "While stopping server")
}

os.Exit(1)
return nil
}
}

0 comments on commit 059ca32

Please sign in to comment.