diff --git a/azbus/receiver.go b/azbus/receiver.go index f05070a..efd3224 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -87,6 +87,7 @@ type Receiver struct { receiver *azservicebus.Receiver options *azservicebus.ReceiverOptions handlers []Handler + cancel context.CancelFunc } type ReceiverOption func(*Receiver) @@ -224,7 +225,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive } } -func (r *Receiver) receiveMessages() error { +func (r *Receiver) receiveMessages(ctx context.Context) error { numberOfReceivedMessages := len(r.handlers) r.log.Debugf( @@ -235,8 +236,6 @@ func (r *Receiver) receiveMessages() error { // Start all the workers msgs := make(chan *ReceivedMessage, numberOfReceivedMessages) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() var wg sync.WaitGroup for i := range numberOfReceivedMessages { go func(rctx context.Context, ii int, rr *Receiver) { @@ -294,6 +293,8 @@ func (r *Receiver) receiveMessages() error { // The following 2 methods satisfy the startup.Listener interface. func (r *Receiver) Listen() error { + ctx, cancel := context.WithCancel(context.Background()) + r.cancel = cancel r.log.Debugf("listen") err := r.open() if err != nil { @@ -301,10 +302,11 @@ func (r *Receiver) Listen() error { r.log.Infof("%s", azerr) return azerr } - return r.receiveMessages() + return r.receiveMessages(ctx) } func (r *Receiver) Shutdown(ctx context.Context) error { + r.cancel() r.close_() return nil } @@ -345,20 +347,25 @@ func (r *Receiver) open() error { func (r *Receiver) close_() { if r != nil { + r.log.Debugf("Close") if r.receiver != nil { r.mtx.Lock() defer r.mtx.Unlock() + for j := range len(r.handlers) { + r.log.Debugf("Close handler") + r.handlers[j].Close() + } + + r.log.Debugf("Close receiver") err := r.receiver.Close(context.Background()) if err != nil { azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err)) r.log.Infof("%s", azerr) } - r.receiver = nil - for j := range len(r.handlers) { - r.handlers[j].Close() - } r.handlers = []Handler{} + r.receiver = nil + r.cancel = nil } } } diff --git a/grpcclient/client.go b/grpcclient/client.go index d5a9de4..d875a8d 100644 --- a/grpcclient/client.go +++ b/grpcclient/client.go @@ -21,7 +21,7 @@ func (g *Client) Open() error { var conn *grpc.ClientConn g.log.Debugf("Open %s client at %v", g.name, g.address) - conn, err = grpc.Dial(g.address, g.options...) + conn, err = grpc.Dial(g.address, g.options...) //nolint:staticcheck if err != nil { return err }