Skip to content

Commit

Permalink
Fix panic on termination (#82)
Browse files Browse the repository at this point in the history
* Fix panic on termination

AB#9793
  • Loading branch information
eccles authored Aug 13, 2024
1 parent 7d74b38 commit 5892023
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
23 changes: 15 additions & 8 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Receiver struct {
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
cancel context.CancelFunc
}

type ReceiverOption func(*Receiver)
Expand Down Expand Up @@ -224,7 +225,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive
}
}

func (r *Receiver) receiveMessages() error {
func (r *Receiver) receiveMessages(ctx context.Context) error {

numberOfReceivedMessages := len(r.handlers)
r.log.Debugf(
Expand All @@ -235,8 +236,6 @@ func (r *Receiver) receiveMessages() error {

// Start all the workers
msgs := make(chan *ReceivedMessage, numberOfReceivedMessages)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
for i := range numberOfReceivedMessages {
go func(rctx context.Context, ii int, rr *Receiver) {
Expand Down Expand Up @@ -294,17 +293,20 @@ func (r *Receiver) receiveMessages() error {

// The following 2 methods satisfy the startup.Listener interface.
func (r *Receiver) Listen() error {
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
r.log.Debugf("listen")
err := r.open()
if err != nil {
azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
return azerr
}
return r.receiveMessages()
return r.receiveMessages(ctx)
}

func (r *Receiver) Shutdown(ctx context.Context) error {
r.cancel()
r.close_()
return nil
}
Expand Down Expand Up @@ -345,20 +347,25 @@ func (r *Receiver) open() error {

func (r *Receiver) close_() {
if r != nil {
r.log.Debugf("Close")
if r.receiver != nil {
r.mtx.Lock()
defer r.mtx.Unlock()

for j := range len(r.handlers) {
r.log.Debugf("Close handler")
r.handlers[j].Close()
}

r.log.Debugf("Close receiver")
err := r.receiver.Close(context.Background())
if err != nil {
azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
}
r.receiver = nil
for j := range len(r.handlers) {
r.handlers[j].Close()
}
r.handlers = []Handler{}
r.receiver = nil
r.cancel = nil
}
}
}
2 changes: 1 addition & 1 deletion grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (g *Client) Open() error {
var conn *grpc.ClientConn

g.log.Debugf("Open %s client at %v", g.name, g.address)
conn, err = grpc.Dial(g.address, g.options...)
conn, err = grpc.Dial(g.address, g.options...) //nolint:staticcheck
if err != nil {
return err
}
Expand Down

0 comments on commit 5892023

Please sign in to comment.