diff --git a/azbus/receiver.go b/azbus/receiver.go index 1c23649..3d7d607 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -82,11 +82,13 @@ type Receiver struct { Cfg ReceiverConfig - log Logger - mtx sync.Mutex - receiver *azservicebus.Receiver - options *azservicebus.ReceiverOptions - handlers []Handler + log Logger + mtx sync.Mutex + receiver *azservicebus.Receiver + options *azservicebus.ReceiverOptions + handlers []Handler + serialHandler Handler + numberOfReceivedMessages int // for serial Handler only } type ReceiverOption func(*Receiver) @@ -98,6 +100,14 @@ func WithHandlers(h ...Handler) ReceiverOption { } } +// WithSerialHandler +func WithSerialHandler(h Handler, n int) ReceiverOption { + return func(r *Receiver) { + r.serialHandler = h + r.numberOfReceivedMessages = n + } +} + // WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less // than the peek lock timeout. For example: the default peek lock timeout is 60s and the default // renewal time is 50s. @@ -226,7 +236,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive } } -func (r *Receiver) receiveMessages() error { +func (r *Receiver) receiveMessagesInParallel() error { numberOfReceivedMessages := len(r.handlers) r.log.Debugf( @@ -294,6 +304,55 @@ func (r *Receiver) receiveMessages() error { } } +func (r *Receiver) receiveMessagesInSerial() error { + + r.log.Debugf( + "NumberOfReceivedMessages %d, RenewMessageLock: %v", + r.numberOfReceivedMessages, + r.Cfg.RenewMessageLock, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + var err error + var messages []*ReceivedMessage + messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil) + if err != nil { + azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + return azerr + } + total := len(messages) + r.log.Debugf("total messages %d", total) + var renewCtx context.Context + var renewCancel context.CancelFunc + var maxDuration time.Duration + // XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required. + if r.Cfg.RenewMessageLock { + func() { + renewCtx, renewCancel = context.WithCancel(ctx) + defer renewCancel() + for i, msg := range messages { + go r.renewMessageLock(renewCtx, i+1, msg) + } + for i, msg := range messages { + r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler) + } + }() + } else { + for i, msg := range messages { + func() { + // we need a timeout per message if RenewMessageLock is disabled + renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, msg) + defer renewCancel() + r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler) + }() + } + } + } +} + // The following 2 methods satisfy the startup.Listener interface. func (r *Receiver) Listen() error { r.log.Debugf("listen") @@ -303,7 +362,10 @@ func (r *Receiver) Listen() error { r.log.Infof("%s", azerr) return azerr } - return r.receiveMessages() + if r.serialHandler != nil { + return r.receiveMessagesInSerial() + } + return r.receiveMessagesInParallel() } func (r *Receiver) Shutdown(ctx context.Context) error { @@ -334,10 +396,17 @@ func (r *Receiver) open() error { r.log.Infof("%s", azerr) return azerr } - r.receiver = receiver - for j := range len(r.handlers) { - err = r.handlers[j].Open() + + if r.serialHandler == nil { + for j := range len(r.handlers) { + err = r.handlers[j].Open() + if err != nil { + return fmt.Errorf("failed to open handler: %w", err) + } + } + } else { + err = r.serialHandler.Open() if err != nil { return fmt.Errorf("failed to open handler: %w", err) } @@ -356,11 +425,15 @@ func (r *Receiver) close_() { azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err)) r.log.Infof("%s", azerr) } - r.receiver = nil for j := range len(r.handlers) { r.handlers[j].Close() } r.handlers = []Handler{} + if r.serialHandler != nil { + r.serialHandler.Close() + r.serialHandler = nil + } + r.receiver = nil } } } diff --git a/taskfiles/Taskfile_codeqa.yml b/taskfiles/Taskfile_codeqa.yml index 89eb4b2..d4ed14a 100644 --- a/taskfiles/Taskfile_codeqa.yml +++ b/taskfiles/Taskfile_codeqa.yml @@ -27,17 +27,18 @@ tasks: desc: Quality assurance of code summary: "format sources (go fmt)" cmds: - - gofmt -l -s -w . + - | + go fix ./... + goimports -w . + gofmt -l -s -w . lint: desc: Quality assurance of code cmds: - | - golangci-lint --version go vet ./... - goimports {{.VERBOSE}} -w . + golangci-lint --version golangci-lint {{.VERBOSE}} run --timeout 10m ./... - gofmt -l -s -w . unit-tests: desc: "run unit tests"