Skip to content

Commit

Permalink
fixup! fixup! fixup! fixup! fixup! Generic listener management
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Hewlett committed Sep 25, 2023
1 parent a59b13a commit 17f89e3
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 0 deletions.
32 changes: 32 additions & 0 deletions grpcserver/correlationid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package grpcserver

import (
"context"
"strings"

"google.golang.org/grpc"

"github.com/rkvst/go-rkvstcommon/correlationid"
"github.com/rkvst/go-rkvstcommon/logger"
)

const (
archivistPrefix = "/archivist"
)

// CorrelationIDUnaryServerInterceptor returns a new unary server interceptor that inserts correlationID into context
func CorrelationIDUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {

// only for archivist endpoint and not /health or /metrics.
// - without this some services refused to become ready (locations and all creators)
logger.Sugar.Debugf("info.FullMethod: %s", info.FullMethod)
if !strings.HasPrefix(info.FullMethod, archivistPrefix) {
return handler(ctx, req)
}

ctx = correlationid.Context(ctx)
logger.Sugar.Debugf("correlationID: %v", correlationid.FromContext(ctx))
return handler(ctx, req)
}
}
120 changes: 120 additions & 0 deletions grpcserver/grpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package grpcserver

import (
"context"
"errors"
"fmt"
"net"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
//grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_otrace "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

env "github.com/rkvst/go-rkvstcommon/environment"
"github.com/rkvst/go-rkvstcommon/grpchealth"
grpcHealth "google.golang.org/grpc/health/grpc_health_v1"
)

// so we dont have to import grpc when using this package.
type grpcServer = grpc.Server
type grpcUnaryServerInterceptor = grpc.UnaryServerInterceptor

type RegisterServer func(*grpcServer)

func defaultRegisterServer(g *grpcServer) {
}

type GRPCServer struct {
name string
log Logger
listenStr string
health *grpchealth.HealthCheckingService
interceptors []grpcUnaryServerInterceptor
register RegisterServer
server *grpcServer
}

type GRPCServerOption func(*GRPCServer)

func WithInterceptor(i grpcUnaryServerInterceptor) GRPCServerOption {
return func(g *GRPCServer) {
// Note that this is **prepending**
g.interceptors = append([]grpcUnaryServerInterceptor{i}, g.interceptors...)
}
}

func WithRegisterServer(r RegisterServer) GRPCServerOption {
return func(g *GRPCServer) {
g.register = r
}
}

// NewGRPCServer cretaes a new GRPCServer that is bound to a specific GRPC API. This object complies with
// the standard Listener service and can be managed by the startup.Listeners object.
func NewGRPCServer(log Logger, name string, opts ...GRPCServerOption) GRPCServer {
listenStr := fmt.Sprintf(":%s", env.GetOrFatal("PORT"))

health := grpchealth.New()

g := GRPCServer{
name: name,
listenStr: listenStr,
health: &health,
register: defaultRegisterServer,
interceptors: []grpc.UnaryServerInterceptor{
CorrelationIDUnaryServerInterceptor(),
grpc_otrace.UnaryServerInterceptor(),
grpc_validator.UnaryServerInterceptor(),
},
}
for _, opt := range opts {
opt(&g)
}
server := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(g.interceptors...),
),
)

// RegisterAccessPoliciesServer(s grpc.ServiceRegistrar, srv AccessPoliciesServer)
//accessPolicyV1API.RegisterAccessPoliciesServer(server, s)
g.register(server)
grpcHealth.RegisterHealthServer(server, &health)
reflection.Register(server)

g.server = server
g.log = log.WithIndex("grpcserver", g.String())
return g
}

func (g *GRPCServer) String() string {
// No logging in this method please.
return fmt.Sprintf("%s%s", g.name, g.listenStr)
}

func (g *GRPCServer) Listen() error {
listen, err := net.Listen("tcp", g.listenStr)
if err != nil {
return fmt.Errorf("failed to listen %s: %w", g, err)
}

g.health.Ready() // readiness

g.log.Infof("Listening")
err = g.server.Serve(listen)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("failed to serve %s: %w", g, err)
}
return nil
}

func (g *GRPCServer) Shutdown(_ context.Context) error {
g.log.Infof("Shutdown")
g.health.NotReady() // readiness
g.health.Dead() // liveness
g.server.GracefulStop()
return nil
}
7 changes: 7 additions & 0 deletions grpcserver/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package grpcserver

import (
"github.com/rkvst/go-rkvstcommon/logger"
)

type Logger = logger.Logger

0 comments on commit 17f89e3

Please sign in to comment.