diff --git a/azbus/msgreceiver.go b/azbus/msgreceiver.go index 49bdf6b..81c32a2 100644 --- a/azbus/msgreceiver.go +++ b/azbus/msgreceiver.go @@ -9,6 +9,11 @@ type MsgReceiver interface { Close(context.Context) ReceiveMessages(Handler) error String() string + + // Listener interface + Listen() error + Shutdown(context.Context) error + GetAZClient() AZClient Abandon(context.Context, error, *ReceivedMessage) error diff --git a/azbus/receiver.go b/azbus/receiver.go index 8174882..2001b64 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -12,6 +12,10 @@ import ( "github.com/opentracing/opentracing-go" ) +var ( + ErrNoHandler = errors.New("no handler defined") +) + // so we dont have to import the azure repo everywhere type ReceivedMessage = azservicebus.ReceivedMessage @@ -96,9 +100,18 @@ type Receiver struct { mtx sync.Mutex receiver *azservicebus.Receiver options *azservicebus.ReceiverOptions + handler Handler } -func NewReceiver(log Logger, cfg ReceiverConfig) *Receiver { +type ReceiverOption func(*Receiver) + +func WithHandler(h Handler) ReceiverOption { + return func(r *Receiver) { + r.handler = h + } +} + +func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiver { var options *azservicebus.ReceiverOptions if cfg.Deadletter { options = &azservicebus.ReceiverOptions{ @@ -107,13 +120,17 @@ func NewReceiver(log Logger, cfg ReceiverConfig) *Receiver { } } - r := &Receiver{ + r := Receiver{ Cfg: cfg, azClient: NewAZClient(cfg.ConnectionString), options: options, } r.log = log.WithIndex("receiver", r.String()) - return r + for _, opt := range opts { + opt(&r) + } + + return &r } func (r *Receiver) GetAZClient() AZClient { @@ -267,6 +284,19 @@ func (r *Receiver) ReceiveMessages(handler Handler) error { } +// The following 2 methods satisfy the startup.Listener interface. +func (r *Receiver) Listen() error { + if r.handler == nil { + return ErrNoHandler + } + return r.ReceiveMessages(r.handler) +} + +func (r *Receiver) Shutdown(ctx context.Context) error { + r.Close(ctx) + return nil +} + func (r *Receiver) Open() error { var err error diff --git a/go.mod b/go.mod index 67bb7e9..4ff3d94 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/stretchr/testify v1.8.4 go.uber.org/automaxprocs v1.5.3 go.uber.org/zap v1.25.0 + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d google.golang.org/grpc v1.57.0 ) diff --git a/go.sum b/go.sum index b0ad99d..eeeda9d 100644 --- a/go.sum +++ b/go.sum @@ -591,6 +591,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/grpcserver/correlationid.go b/grpcserver/correlationid.go new file mode 100644 index 0000000..017cbdb --- /dev/null +++ b/grpcserver/correlationid.go @@ -0,0 +1,32 @@ +package grpcserver + +import ( + "context" + "strings" + + "google.golang.org/grpc" + + "github.com/rkvst/go-rkvstcommon/correlationid" + "github.com/rkvst/go-rkvstcommon/logger" +) + +const ( + archivistPrefix = "/archivist" +) + +// CorrelationIDUnaryServerInterceptor returns a new unary server interceptor that inserts correlationID into context +func CorrelationIDUnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + + // only for archivist endpoint and not /health or /metrics. + // - without this some services refused to become ready (locations and all creators) + logger.Sugar.Debugf("info.FullMethod: %s", info.FullMethod) + if !strings.HasPrefix(info.FullMethod, archivistPrefix) { + return handler(ctx, req) + } + + ctx = correlationid.Context(ctx) + logger.Sugar.Debugf("correlationID: %v", correlationid.FromContext(ctx)) + return handler(ctx, req) + } +} diff --git a/grpcserver/grpcserver.go b/grpcserver/grpcserver.go new file mode 100644 index 0000000..36f01f8 --- /dev/null +++ b/grpcserver/grpcserver.go @@ -0,0 +1,120 @@ +package grpcserver + +import ( + "context" + "errors" + "fmt" + "net" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + //grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_otrace "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + + env "github.com/rkvst/go-rkvstcommon/environment" + "github.com/rkvst/go-rkvstcommon/grpchealth" + grpcHealth "google.golang.org/grpc/health/grpc_health_v1" +) + +// so we dont have to import grpc when using this package. +type grpcServer = grpc.Server +type grpcUnaryServerInterceptor = grpc.UnaryServerInterceptor + +type RegisterServer func(*grpcServer) + +func defaultRegisterServer(g *grpcServer) { +} + +type GRPCServer struct { + name string + log Logger + listenStr string + health *grpchealth.HealthCheckingService + interceptors []grpcUnaryServerInterceptor + register RegisterServer + server *grpcServer +} + +type GRPCServerOption func(*GRPCServer) + +func WithInterceptor(i grpcUnaryServerInterceptor) GRPCServerOption { + return func(g *GRPCServer) { + // Note that this is **prepending** + g.interceptors = append([]grpcUnaryServerInterceptor{i}, g.interceptors...) + } +} + +func WithRegisterServer(r RegisterServer) GRPCServerOption { + return func(g *GRPCServer) { + g.register = r + } +} + +// NewGRPCServer cretaes a new GRPCServer that is bound to a specific GRPC API. This object complies with +// the standard Listener service and can be managed by the startup.Listeners object. +func NewGRPCServer(log Logger, name string, opts ...GRPCServerOption) GRPCServer { + listenStr := fmt.Sprintf(":%s", env.GetOrFatal("PORT")) + + health := grpchealth.New() + + g := GRPCServer{ + name: name, + listenStr: listenStr, + health: &health, + register: defaultRegisterServer, + interceptors: []grpc.UnaryServerInterceptor{ + CorrelationIDUnaryServerInterceptor(), + grpc_otrace.UnaryServerInterceptor(), + grpc_validator.UnaryServerInterceptor(), + }, + } + for _, opt := range opts { + opt(&g) + } + server := grpc.NewServer( + grpc.UnaryInterceptor( + grpc_middleware.ChainUnaryServer(g.interceptors...), + ), + ) + + // RegisterAccessPoliciesServer(s grpc.ServiceRegistrar, srv AccessPoliciesServer) + //accessPolicyV1API.RegisterAccessPoliciesServer(server, s) + g.register(server) + grpcHealth.RegisterHealthServer(server, &health) + reflection.Register(server) + + g.server = server + g.log = log.WithIndex("grpcserver", g.String()) + return g +} + +func (g *GRPCServer) String() string { + // No logging in this method please. + return fmt.Sprintf("%s%s", g.name, g.listenStr) +} + +func (g *GRPCServer) Listen() error { + listen, err := net.Listen("tcp", g.listenStr) + if err != nil { + return fmt.Errorf("failed to listen %s: %w", g, err) + } + + g.health.Ready() // readiness + + g.log.Infof("Listening") + err = g.server.Serve(listen) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("failed to serve %s: %w", g, err) + } + return nil +} + +func (g *GRPCServer) Shutdown(_ context.Context) error { + g.log.Infof("Shutdown") + g.health.NotReady() // readiness + g.health.Dead() // liveness + g.server.GracefulStop() + return nil +} diff --git a/grpcserver/logger.go b/grpcserver/logger.go new file mode 100644 index 0000000..30dc936 --- /dev/null +++ b/grpcserver/logger.go @@ -0,0 +1,7 @@ +package grpcserver + +import ( + "github.com/rkvst/go-rkvstcommon/logger" +) + +type Logger = logger.Logger diff --git a/httpserver/httpserver.go b/httpserver/httpserver.go new file mode 100644 index 0000000..7197620 --- /dev/null +++ b/httpserver/httpserver.go @@ -0,0 +1,52 @@ +package httpserver + +import ( + "context" + "fmt" + "net/http" + "time" +) + +// A http server that has an inbuilt logger, name and complies wuth the Listener interface in +// startup.Listeners. + +type HTTPServer struct { + http.Server + log Logger + name string +} + +func NewHTTPServer(log Logger, name string, port string, handler http.Handler) *HTTPServer { + m := HTTPServer{ + Server: http.Server{ + Addr: ":" + port, + Handler: handler, + }, + name: name, + } + m.log = log.WithIndex("httpserver", m.String()) + // It is preferable to return a copy rather than a reference. Unfortunately http.Server has an + // internal mutex and this cannot or should not be copied so we will return a reference instead. + return &m +} + +func (m *HTTPServer) String() string { + // No logging here please + return fmt.Sprintf("%s%s", m.name, m.Addr) +} + +func (m *HTTPServer) Listen() error { + m.log.Infof("httpserver starting") + err := m.ListenAndServe() + if err != nil { + return fmt.Errorf("%s server terminated: %v", m, err) + } + return nil +} + +func (m *HTTPServer) Shutdown(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + m.log.Infof("httpserver shutdown") + return m.Server.Shutdown(ctx) +} diff --git a/httpserver/logger.go b/httpserver/logger.go new file mode 100644 index 0000000..18bd6e9 --- /dev/null +++ b/httpserver/logger.go @@ -0,0 +1,7 @@ +package httpserver + +import ( + "github.com/rkvst/go-rkvstcommon/logger" +) + +type Logger = logger.Logger diff --git a/startup/listener.go b/startup/listener.go new file mode 100644 index 0000000..afd03c1 --- /dev/null +++ b/startup/listener.go @@ -0,0 +1,103 @@ +package startup + +import ( + "context" + "errors" + "fmt" + "os/signal" + "syscall" + "time" + + "golang.org/x/sync/errgroup" +) + +// based on gist found at https://gist.github.com/pteich/c0bb58b0b7c8af7cc6a689dd0d3d26ef?permalink_comment_id=4053701 + +// Listener is an interface that describes any kind of listener - HTTP Server, GRPC Server +// or servicebus receiver. +type Listener interface { + Listen() error + Shutdown(context.Context) error +} + +// Listeners contains all servers that comply with the service. +type Listeners struct { + name string + log Logger + listeners []Listener +} + +type ListenersOption func(*Listeners) + +func WithListener(h Listener) ListenersOption { + return func(l *Listeners) { + l.listeners = append(l.listeners, h) + } +} + +func NewListeners(log Logger, name string, opts ...ListenersOption) Listeners { + l := Listeners{log: log, name: name} + for _, opt := range opts { + opt(&l) + } + return l +} + +func (l *Listeners) String() string { + return l.name +} + +func (l *Listeners) AddListener(h Listener) { + l.listeners = append(l.listeners, h) +} + +func (l *Listeners) Listen() error { + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + g, errCtx := errgroup.WithContext(ctx) + + for _, h := range l.listeners { + h := h + g.Go(func() error { + err := h.Listen() + if err != nil { + return err + } + return nil + }) + } + + g.Go(func() error { + <-errCtx.Done() + l.log.Infof("Cancel from signal") + return l.Shutdown() + }) + + err := g.Wait() + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + + return nil +} + +func (l *Listeners) Shutdown() error { + var err error + for _, h := range l.listeners { + func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + e := h.Shutdown(ctx) + if e != nil { + if err != nil { + err = fmt.Errorf("Cannot shutdown %s: %w: %w", h, err, e) + } else { + err = fmt.Errorf("Cannot shutdown %s: %w", h, e) + } + } + }() + } + return err +} diff --git a/startup/interfaces.go b/startup/logger.go similarity index 100% rename from startup/interfaces.go rename to startup/logger.go diff --git a/startup/run.go b/startup/run.go index d797de0..05e0133 100644 --- a/startup/run.go +++ b/startup/run.go @@ -16,12 +16,13 @@ func Run(serviceName string, run Runner) { var exitCode int logger.New(environment.GetLogLevel()) log := logger.Sugar.WithServiceName(serviceName) - err := run(serviceName, log) + err := run(serviceName, log) if err != nil { log.Infof("Error terminating: %v", err) exitCode = 1 } + log.Infof("Shutting down gracefully") logger.OnExit() os.Exit(exitCode) diff --git a/tracing/tracing.go b/tracing/tracing.go index 5f0bcf7..cb7a81c 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -2,6 +2,7 @@ package tracing import ( + "fmt" "io" "log" "net/http" @@ -77,6 +78,11 @@ func HeaderMatcher(key string) (string, bool) { return "", false } +func NewTracer(serviceName string) io.Closer { + listenStr := fmt.Sprintf(":%s", environment.GetOrFatal("PORT")) + return NewFromEnv(serviceName, listenStr, "ZIPKIN_ENDPOINT", "DISABLE_ZIPKIN"); +} + // NewFromEnv initialises tracing and returns a closer if tracing is // configured. If the necessary configuration is not available it is Fatal // unless disableVar is set and is truthy (strconf.ParseBool -> true). If