From aaa3e84381b9ed40179e325c28c581b1c30f6cbb Mon Sep 17 00:00:00 2001 From: Paul Hewlett Date: Wed, 20 Nov 2024 17:42:35 +0000 Subject: [PATCH] Fix panic When shutting down a receiver a panic ensues because the waitGroup index becomes less than zero Some miscellaneous spelling errors fixed. Found during development of anothet feature. Fix was tested during feature development. AB#9875 --- azbus/receiver.go | 14 +++++++++----- errhandling/errb2c.go | 6 +++--- restproxyserver/restproxyserver.go | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/azbus/receiver.go b/azbus/receiver.go index be2d027..0e7602d 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -163,7 +163,7 @@ func (r *Receiver) String() string { func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration time.Duration, msg *ReceivedMessage, handler Handler) { now := time.Now() - // the context wont have a trace span on it yet, so stick with the reciever logger instance + // the context wont have a trace span on it yet, so stick with the receiver logger instance r.log.Debugf("Processing message %d id %s", count, msg.MessageID) disp, ctx, err := r.handleReceivedMessageWithTracingContext(ctx, msg, handler) @@ -230,7 +230,11 @@ func (r *Receiver) receiveMessages(ctx context.Context) error { r.Cfg.RenewMessageLock, ) - // Start all the workers + // Start all the workers. Each worker runs forever waiting on a channel for received + // messages. The waitgroup semantics is used to indicate whether the current message has + // been processed. The worker goroutines will terminate on a context.cancel between processing + // any messages. If there are any unprocessed messages then these will eventually timeout and + // azure servicebus will re-schedule them for processing. msgs := make(chan *ReceivedMessage, numberOfReceivedMessages) var wg sync.WaitGroup for i := range numberOfReceivedMessages { @@ -240,7 +244,6 @@ func (r *Receiver) receiveMessages(ctx context.Context) error { select { case <-rctx.Done(): rr.log.Debugf("Stop worker %d", ii) - wg.Done() return case msg := <-msgs: func(rrctx context.Context) { @@ -266,8 +269,8 @@ func (r *Receiver) receiveMessages(ctx context.Context) error { // Extensively tested by loading messages and checking that the waitGroup logic always reset to zero so messages // continue to be processed. The sync.Waitgroup will panic if the internal counter ever goes to less than zero - this is - // what we want as then the service will restart. Extensive testing has never encountered this. - // The load tests wree conducted with over 1000 simplhash anchor messages present and with NumberOfReceivedMessage=8. + // what we want as then the service will restart. + // The load tests were conducted with over 1000 simplhash anchor messages present and with NumberOfReceivedMessage=8. // The code mosly read 8 messages at a time - sometimes only 3 or 4 were read - either way the code processed the // messages successfully and only finished once the receiver was empty. for { @@ -282,6 +285,7 @@ func (r *Receiver) receiveMessages(ctx context.Context) error { total := len(messages) r.log.Debugf("total messages %d", total) + // Use the waitgroup to indicate when all messages have been processed. for i := range total { wg.Add(1) msgs <- messages[i] diff --git a/errhandling/errb2c.go b/errhandling/errb2c.go index b8dc905..f0dee81 100644 --- a/errhandling/errb2c.go +++ b/errhandling/errb2c.go @@ -47,7 +47,7 @@ func (e *ErrB2C) WithErrorString(log Logger, errString string) error { if _, scanErr := fmt.Sscanf(errString, b2cFmtString, &e.APIVersion, &e.Status, &e.UserMessage); scanErr != nil { - log.Infof("scan error: %v", scanErr) + log.Debugf("scan error: %v", scanErr) return scanErr } @@ -72,7 +72,7 @@ func GetErrB2c(ctx context.Context, err error) (*ErrB2C, error) { // chance it is a status.Error (can't access status.Error as its part of an internal package) if !strings.HasPrefix(err.Error(), grpcErrPrefix) { - log.Infof("error is not b2c error: %v", err) + log.Debugf("error is not b2c error: %v", err) return nil, err } @@ -80,7 +80,7 @@ func GetErrB2c(ctx context.Context, err error) (*ErrB2C, error) { b2cErrString := strings.TrimPrefix(err.Error(), grpcErrPrefix) if convErr := errB2C.WithErrorString(log, b2cErrString); convErr != nil { - log.Infof("unable to add error string: %v", convErr) + log.Debugf("unable to add error string: %v", convErr) return nil, convErr } diff --git a/restproxyserver/restproxyserver.go b/restproxyserver/restproxyserver.go index 673e4ba..67f1593 100644 --- a/restproxyserver/restproxyserver.go +++ b/restproxyserver/restproxyserver.go @@ -74,7 +74,7 @@ func SetQueryParameterParser(p QueryParameterParser) RESTProxyServerOption { } } -// WithIncomingHeaderMatcher adds an intercepror that matches header values. +// WithIncomingHeaderMatcher adds an interceptor that matches header values. func WithIncomingHeaderMatcher(o HeaderMatcherFunc) RESTProxyServerOption { return func(g *RESTProxyServer) { if o != nil && !reflect.ValueOf(o).IsNil() {