Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic listener management #8

Merged
merged 7 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions azbus/msgreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ type MsgReceiver interface {
Close(context.Context)
ReceiveMessages(Handler) error
String() string

// Listener interface
Listen() error
Shutdown(context.Context) error

GetAZClient() AZClient

Abandon(context.Context, error, *ReceivedMessage) error
Expand Down
36 changes: 33 additions & 3 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"github.com/opentracing/opentracing-go"
)

var (
ErrNoHandler = errors.New("no handler defined")
)

// so we dont have to import the azure repo everywhere
type ReceivedMessage = azservicebus.ReceivedMessage

Expand Down Expand Up @@ -96,9 +100,18 @@ type Receiver struct {
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handler Handler
}

func NewReceiver(log Logger, cfg ReceiverConfig) *Receiver {
type ReceiverOption func(*Receiver)

func WithHandler(h Handler) ReceiverOption {
return func(r *Receiver) {
r.handler = h
}
}

func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiver {
var options *azservicebus.ReceiverOptions
if cfg.Deadletter {
options = &azservicebus.ReceiverOptions{
Expand All @@ -107,13 +120,17 @@ func NewReceiver(log Logger, cfg ReceiverConfig) *Receiver {
}
}

r := &Receiver{
r := Receiver{
Cfg: cfg,
azClient: NewAZClient(cfg.ConnectionString),
options: options,
}
r.log = log.WithIndex("receiver", r.String())
return r
for _, opt := range opts {
opt(&r)
}

return &r
}

func (r *Receiver) GetAZClient() AZClient {
Expand Down Expand Up @@ -267,6 +284,19 @@ func (r *Receiver) ReceiveMessages(handler Handler) error {

}

// The following 2 methods satisfy the startup.Listener interface.
func (r *Receiver) Listen() error {
if r.handler == nil {
return ErrNoHandler
}
return r.ReceiveMessages(r.handler)
}

func (r *Receiver) Shutdown(ctx context.Context) error {
r.Close(ctx)
return nil
}

func (r *Receiver) Open() error {
var err error

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/stretchr/testify v1.8.4
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.25.0
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d
google.golang.org/grpc v1.57.0
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
32 changes: 32 additions & 0 deletions grpcserver/correlationid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package grpcserver

import (
"context"
"strings"

"google.golang.org/grpc"

"github.com/rkvst/go-rkvstcommon/correlationid"
"github.com/rkvst/go-rkvstcommon/logger"
)

const (
archivistPrefix = "/archivist"
)

// CorrelationIDUnaryServerInterceptor returns a new unary server interceptor that inserts correlationID into context
func CorrelationIDUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {

// only for archivist endpoint and not /health or /metrics.
// - without this some services refused to become ready (locations and all creators)
logger.Sugar.Debugf("info.FullMethod: %s", info.FullMethod)
if !strings.HasPrefix(info.FullMethod, archivistPrefix) {
return handler(ctx, req)
}

ctx = correlationid.Context(ctx)
logger.Sugar.Debugf("correlationID: %v", correlationid.FromContext(ctx))
return handler(ctx, req)
}
}
120 changes: 120 additions & 0 deletions grpcserver/grpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package grpcserver

import (
"context"
"errors"
"fmt"
"net"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
//grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_otrace "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

env "github.com/rkvst/go-rkvstcommon/environment"
"github.com/rkvst/go-rkvstcommon/grpchealth"
grpcHealth "google.golang.org/grpc/health/grpc_health_v1"
)

// so we dont have to import grpc when using this package.
type grpcServer = grpc.Server
type grpcUnaryServerInterceptor = grpc.UnaryServerInterceptor

type RegisterServer func(*grpcServer)

func defaultRegisterServer(g *grpcServer) {
}

type GRPCServer struct {
name string
log Logger
listenStr string
health *grpchealth.HealthCheckingService
interceptors []grpcUnaryServerInterceptor
register RegisterServer
server *grpcServer
}

type GRPCServerOption func(*GRPCServer)

func WithInterceptor(i grpcUnaryServerInterceptor) GRPCServerOption {
return func(g *GRPCServer) {
// Note that this is **prepending**
g.interceptors = append([]grpcUnaryServerInterceptor{i}, g.interceptors...)
}
}

func WithRegisterServer(r RegisterServer) GRPCServerOption {
return func(g *GRPCServer) {
g.register = r
}
}

// NewGRPCServer cretaes a new GRPCServer that is bound to a specific GRPC API. This object complies with
// the standard Listener service and can be managed by the startup.Listeners object.
func NewGRPCServer(log Logger, name string, opts ...GRPCServerOption) GRPCServer {
listenStr := fmt.Sprintf(":%s", env.GetOrFatal("PORT"))

health := grpchealth.New()

g := GRPCServer{
name: name,
listenStr: listenStr,
health: &health,
register: defaultRegisterServer,
interceptors: []grpc.UnaryServerInterceptor{
CorrelationIDUnaryServerInterceptor(),
grpc_otrace.UnaryServerInterceptor(),
grpc_validator.UnaryServerInterceptor(),
},
}
for _, opt := range opts {
opt(&g)
}
server := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(g.interceptors...),
),
)

// RegisterAccessPoliciesServer(s grpc.ServiceRegistrar, srv AccessPoliciesServer)
//accessPolicyV1API.RegisterAccessPoliciesServer(server, s)
g.register(server)
grpcHealth.RegisterHealthServer(server, &health)
reflection.Register(server)

g.server = server
g.log = log.WithIndex("grpcserver", g.String())
return g
}

func (g *GRPCServer) String() string {
// No logging in this method please.
return fmt.Sprintf("%s%s", g.name, g.listenStr)
}

func (g *GRPCServer) Listen() error {
listen, err := net.Listen("tcp", g.listenStr)
if err != nil {
return fmt.Errorf("failed to listen %s: %w", g, err)
}

g.health.Ready() // readiness

g.log.Infof("Listening")
err = g.server.Serve(listen)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("failed to serve %s: %w", g, err)
}
return nil
}

func (g *GRPCServer) Shutdown(_ context.Context) error {
g.log.Infof("Shutdown")
g.health.NotReady() // readiness
g.health.Dead() // liveness
g.server.GracefulStop()
return nil
}
7 changes: 7 additions & 0 deletions grpcserver/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package grpcserver

import (
"github.com/rkvst/go-rkvstcommon/logger"
)

type Logger = logger.Logger
52 changes: 52 additions & 0 deletions httpserver/httpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package httpserver

import (
"context"
"fmt"
"net/http"
"time"
)

// A http server that has an inbuilt logger, name and complies wuth the Listener interface in
// startup.Listeners.

type HTTPServer struct {
http.Server
log Logger
name string
}

func NewHTTPServer(log Logger, name string, port string, handler http.Handler) *HTTPServer {
m := HTTPServer{
Server: http.Server{
Addr: ":" + port,
Handler: handler,
},
name: name,
}
m.log = log.WithIndex("httpserver", m.String())
// It is preferable to return a copy rather than a reference. Unfortunately http.Server has an
// internal mutex and this cannot or should not be copied so we will return a reference instead.
return &m
}

func (m *HTTPServer) String() string {
// No logging here please
return fmt.Sprintf("%s%s", m.name, m.Addr)
}

func (m *HTTPServer) Listen() error {
m.log.Infof("httpserver starting")
err := m.ListenAndServe()
if err != nil {
return fmt.Errorf("%s server terminated: %v", m, err)
}
return nil
}

func (m *HTTPServer) Shutdown(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
m.log.Infof("httpserver shutdown")
return m.Server.Shutdown(ctx)
}
7 changes: 7 additions & 0 deletions httpserver/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package httpserver

import (
"github.com/rkvst/go-rkvstcommon/logger"
)

type Logger = logger.Logger
Loading
Loading