From e8f5803c8181a208755c4ef371fa61ef5ed24833 Mon Sep 17 00:00:00 2001 From: Paul Hewlett Date: Thu, 21 Sep 2023 09:50:32 +0100 Subject: [PATCH] Generic listener management Add startup.Listeners struct to manage the start and stop of a generic Listener that complies with the Listener interface. Add startup.Service that handles a generic service. Add generic httpserver that complies with Listener interface. Add NewTracer to tracing package. AB#8170 --- go.mod | 1 + go.sum | 1 + httpserver/httpserver.go | 52 +++++++++++++++ httpserver/logger.go | 7 ++ startup/listener.go | 99 ++++++++++++++++++++++++++++ startup/{interfaces.go => logger.go} | 0 startup/run.go | 3 +- startup/service.go | 40 +++++++++++ tracing/tracing.go | 6 ++ 9 files changed, 208 insertions(+), 1 deletion(-) create mode 100644 httpserver/httpserver.go create mode 100644 httpserver/logger.go create mode 100644 startup/listener.go rename startup/{interfaces.go => logger.go} (100%) create mode 100644 startup/service.go diff --git a/go.mod b/go.mod index 67bb7e9..4ff3d94 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/stretchr/testify v1.8.4 go.uber.org/automaxprocs v1.5.3 go.uber.org/zap v1.25.0 + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d google.golang.org/grpc v1.57.0 ) diff --git a/go.sum b/go.sum index b0ad99d..eeeda9d 100644 --- a/go.sum +++ b/go.sum @@ -591,6 +591,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/httpserver/httpserver.go b/httpserver/httpserver.go new file mode 100644 index 0000000..7197620 --- /dev/null +++ b/httpserver/httpserver.go @@ -0,0 +1,52 @@ +package httpserver + +import ( + "context" + "fmt" + "net/http" + "time" +) + +// A http server that has an inbuilt logger, name and complies wuth the Listener interface in +// startup.Listeners. + +type HTTPServer struct { + http.Server + log Logger + name string +} + +func NewHTTPServer(log Logger, name string, port string, handler http.Handler) *HTTPServer { + m := HTTPServer{ + Server: http.Server{ + Addr: ":" + port, + Handler: handler, + }, + name: name, + } + m.log = log.WithIndex("httpserver", m.String()) + // It is preferable to return a copy rather than a reference. Unfortunately http.Server has an + // internal mutex and this cannot or should not be copied so we will return a reference instead. + return &m +} + +func (m *HTTPServer) String() string { + // No logging here please + return fmt.Sprintf("%s%s", m.name, m.Addr) +} + +func (m *HTTPServer) Listen() error { + m.log.Infof("httpserver starting") + err := m.ListenAndServe() + if err != nil { + return fmt.Errorf("%s server terminated: %v", m, err) + } + return nil +} + +func (m *HTTPServer) Shutdown(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + m.log.Infof("httpserver shutdown") + return m.Server.Shutdown(ctx) +} diff --git a/httpserver/logger.go b/httpserver/logger.go new file mode 100644 index 0000000..18bd6e9 --- /dev/null +++ b/httpserver/logger.go @@ -0,0 +1,7 @@ +package httpserver + +import ( + "github.com/rkvst/go-rkvstcommon/logger" +) + +type Logger = logger.Logger diff --git a/startup/listener.go b/startup/listener.go new file mode 100644 index 0000000..2507edc --- /dev/null +++ b/startup/listener.go @@ -0,0 +1,99 @@ +package startup + +import ( + "context" + "errors" + "fmt" + "os/signal" + "syscall" + "time" + + "golang.org/x/sync/errgroup" +) + +// based on gist found at https://gist.github.com/pteich/c0bb58b0b7c8af7cc6a689dd0d3d26ef?permalink_comment_id=4053701 + +// Listener is an interface that describes any kind of listener - HTTP Server, GRPC Server +// or servicebus receiver. +type Listener interface { + Listen() error + Shutdown(context.Context) error +} + +// Listeners contains all servers that comply with the service. +type Listeners struct { + name string + log Logger + listeners []Listener +} + +type ListenersOption func(*Listeners) + +func WithListener(h Listener) ListenersOption { + return func(l *Listeners) { + l.listeners = append(l.listeners, h) + } +} + +func NewListeners(log Logger, name string, opts ...ListenersOption) Listeners { + l := Listeners{log: log, name: name} + for _, opt := range opts { + opt(&l) + } + return l +} + +func (l *Listeners) String() string { + return l.name +} + +func (l *Listeners) Listen() error { + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + g, errCtx := errgroup.WithContext(ctx) + + for _, h := range l.listeners { + h := h + g.Go(func() error { + err := h.Listen() + if err != nil { + return err + } + return nil + }) + } + + g.Go(func() error { + <-errCtx.Done() + l.log.Infof("Cancel from signal") + return l.Shutdown() + }) + + err := g.Wait() + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + + return nil +} + +func (l *Listeners) Shutdown() error { + var err error + for _, h := range l.listeners { + func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + e := h.Shutdown(ctx) + if e != nil { + if err != nil { + err = fmt.Errorf("Cannot shutdown %s: %w: %w", h, err, e) + } else { + err = fmt.Errorf("Cannot shutdown %s: %w", h, e) + } + } + }() + } + return err +} diff --git a/startup/interfaces.go b/startup/logger.go similarity index 100% rename from startup/interfaces.go rename to startup/logger.go diff --git a/startup/run.go b/startup/run.go index d797de0..05e0133 100644 --- a/startup/run.go +++ b/startup/run.go @@ -16,12 +16,13 @@ func Run(serviceName string, run Runner) { var exitCode int logger.New(environment.GetLogLevel()) log := logger.Sugar.WithServiceName(serviceName) - err := run(serviceName, log) + err := run(serviceName, log) if err != nil { log.Infof("Error terminating: %v", err) exitCode = 1 } + log.Infof("Shutting down gracefully") logger.OnExit() os.Exit(exitCode) diff --git a/startup/service.go b/startup/service.go new file mode 100644 index 0000000..796f410 --- /dev/null +++ b/startup/service.go @@ -0,0 +1,40 @@ +package startup + +type ServiceProvider interface { + Open() error + Close() +} + +// Service consists of a named ServiceProviders and a bunch of listeners. +type Service struct { + name string + log Logger + service ServiceProvider + listeners Listeners +} + +func NewService(log Logger, name string, service ServiceProvider, listeners Listeners) Service { + s := Service{ + name: name, + service: service, + listeners: listeners, + } + s.log = log.WithIndex("service", s.String()) + return s +} + +func (s *Service) String() string { + return s.name +} + +func (s *Service) Open() error { + return s.service.Open() +} + +func (s *Service) Close() { + s.service.Close() +} + +func (s *Service) Listen() error { + return s.listeners.Listen() +} diff --git a/tracing/tracing.go b/tracing/tracing.go index 5f0bcf7..cb7a81c 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -2,6 +2,7 @@ package tracing import ( + "fmt" "io" "log" "net/http" @@ -77,6 +78,11 @@ func HeaderMatcher(key string) (string, bool) { return "", false } +func NewTracer(serviceName string) io.Closer { + listenStr := fmt.Sprintf(":%s", environment.GetOrFatal("PORT")) + return NewFromEnv(serviceName, listenStr, "ZIPKIN_ENDPOINT", "DISABLE_ZIPKIN"); +} + // NewFromEnv initialises tracing and returns a closer if tracing is // configured. If the necessary configuration is not available it is Fatal // unless disableVar is set and is truthy (strconf.ParseBool -> true). If