Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic #91

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (r *Receiver) String() string {
func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration time.Duration, msg *ReceivedMessage, handler Handler) {
now := time.Now()

// the context wont have a trace span on it yet, so stick with the reciever logger instance
// the context wont have a trace span on it yet, so stick with the receiver logger instance

r.log.Debugf("Processing message %d id %s", count, msg.MessageID)
disp, ctx, err := r.handleReceivedMessageWithTracingContext(ctx, msg, handler)
Expand Down Expand Up @@ -230,7 +230,11 @@ func (r *Receiver) receiveMessages(ctx context.Context) error {
r.Cfg.RenewMessageLock,
)

// Start all the workers
// Start all the workers. Each worker runs forever waiting on a channel for received
// messages. The waitgroup semantics is used to indicate whether the current message has
// been processed. The worker goroutines will terminate on a context.cancel between processing
// any messages. If there are any unprocessed messages then these will eventually timeout and
// azure servicebus will re-schedule them for processing.
msgs := make(chan *ReceivedMessage, numberOfReceivedMessages)
var wg sync.WaitGroup
for i := range numberOfReceivedMessages {
Expand All @@ -240,7 +244,6 @@ func (r *Receiver) receiveMessages(ctx context.Context) error {
select {
case <-rctx.Done():
rr.log.Debugf("Stop worker %d", ii)
wg.Done()
return
eccles marked this conversation as resolved.
Show resolved Hide resolved
case msg := <-msgs:
func(rrctx context.Context) {
Expand All @@ -266,8 +269,8 @@ func (r *Receiver) receiveMessages(ctx context.Context) error {

// Extensively tested by loading messages and checking that the waitGroup logic always reset to zero so messages
// continue to be processed. The sync.Waitgroup will panic if the internal counter ever goes to less than zero - this is
// what we want as then the service will restart. Extensive testing has never encountered this.
// The load tests wree conducted with over 1000 simplhash anchor messages present and with NumberOfReceivedMessage=8.
// what we want as then the service will restart.
// The load tests were conducted with over 1000 simplhash anchor messages present and with NumberOfReceivedMessage=8.
// The code mosly read 8 messages at a time - sometimes only 3 or 4 were read - either way the code processed the
// messages successfully and only finished once the receiver was empty.
for {
Expand All @@ -282,6 +285,7 @@ func (r *Receiver) receiveMessages(ctx context.Context) error {
total := len(messages)
r.log.Debugf("total messages %d", total)

// Use the waitgroup to indicate when all messages have been processed.
for i := range total {
wg.Add(1)
msgs <- messages[i]
Expand Down
6 changes: 3 additions & 3 deletions errhandling/errb2c.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (e *ErrB2C) WithErrorString(log Logger, errString string) error {
if _, scanErr := fmt.Sscanf(errString, b2cFmtString,
&e.APIVersion, &e.Status, &e.UserMessage); scanErr != nil {

log.Infof("scan error: %v", scanErr)
log.Debugf("scan error: %v", scanErr)
return scanErr
}

Expand All @@ -72,15 +72,15 @@ func GetErrB2c(ctx context.Context, err error) (*ErrB2C, error) {

// chance it is a status.Error (can't access status.Error as its part of an internal package)
if !strings.HasPrefix(err.Error(), grpcErrPrefix) {
log.Infof("error is not b2c error: %v", err)
log.Debugf("error is not b2c error: %v", err)
return nil, err
}

// can't use errors.Unwrap because status.Error doesn't implement Unwrap method
b2cErrString := strings.TrimPrefix(err.Error(), grpcErrPrefix)

if convErr := errB2C.WithErrorString(log, b2cErrString); convErr != nil {
log.Infof("unable to add error string: %v", convErr)
log.Debugf("unable to add error string: %v", convErr)
return nil, convErr
}

Expand Down
2 changes: 1 addition & 1 deletion restproxyserver/restproxyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func SetQueryParameterParser(p QueryParameterParser) RESTProxyServerOption {
}
}

// WithIncomingHeaderMatcher adds an intercepror that matches header values.
// WithIncomingHeaderMatcher adds an interceptor that matches header values.
func WithIncomingHeaderMatcher(o HeaderMatcherFunc) RESTProxyServerOption {
return func(g *RESTProxyServer) {
if o != nil && !reflect.ValueOf(o).IsNil() {
Expand Down
Loading