Skip to content

Commit

Permalink
Receive Messages Serially
Browse files Browse the repository at this point in the history
New functional option to add a serialHandler that reads N messages at
once and process them one at a time - serially.

    receiver := NewReceiver(, WithSerialHandler(h, 200))

where h is an instance of Handler.

Defining a SerialHandler this way disables the normal parallel
processing defined by WithHandlers().

It is highly recommended to specify RenewMessageLock if the number of
received messages is high (say > 10). One has to be sure that processing
the number of messages within 60s is achievable.

AB#9378
  • Loading branch information
eccles committed Jun 18, 2024
1 parent dd2783b commit 8ba2a73
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 15 deletions.
95 changes: 84 additions & 11 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ type Receiver struct {

Cfg ReceiverConfig

log Logger
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
log Logger
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
serialHandler Handler
numberOfReceivedMessages int // for serial Handler only
}

type ReceiverOption func(*Receiver)
Expand All @@ -98,6 +100,14 @@ func WithHandlers(h ...Handler) ReceiverOption {
}
}

// WithSerialHandler
func WithSerialHandler(h Handler, n int) ReceiverOption {
return func(r *Receiver) {
r.serialHandler = h
r.numberOfReceivedMessages = n
}
}

// WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less
// than the peek lock timeout. For example: the default peek lock timeout is 60s and the default
// renewal time is 50s.
Expand Down Expand Up @@ -226,7 +236,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive
}
}

func (r *Receiver) receiveMessages() error {
func (r *Receiver) receiveMessagesInParallel() error {

numberOfReceivedMessages := len(r.handlers)
r.log.Debugf(
Expand Down Expand Up @@ -294,6 +304,55 @@ func (r *Receiver) receiveMessages() error {
}
}

func (r *Receiver) receiveMessagesInSerial() error {

r.log.Debugf(
"NumberOfReceivedMessages %d, RenewMessageLock: %v",
r.numberOfReceivedMessages,
r.Cfg.RenewMessageLock,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
var err error
var messages []*ReceivedMessage
messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil)
if err != nil {
azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
return azerr
}
total := len(messages)
r.log.Debugf("total messages %d", total)
var renewCtx context.Context
var renewCancel context.CancelFunc
var maxDuration time.Duration
// XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required.
if r.Cfg.RenewMessageLock {
func() {
renewCtx, renewCancel = context.WithCancel(ctx)
defer renewCancel()
for i, msg := range messages {
go r.renewMessageLock(renewCtx, i+1, msg)
}
for i, msg := range messages {
r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler)
}
}()
} else {
for i, msg := range messages {
func() {
// we need a timeout per message if RenewMessageLock is disabled
renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, msg)
defer renewCancel()
r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler)
}()
}
}
}
}

// The following 2 methods satisfy the startup.Listener interface.
func (r *Receiver) Listen() error {
r.log.Debugf("listen")
Expand All @@ -303,7 +362,10 @@ func (r *Receiver) Listen() error {
r.log.Infof("%s", azerr)
return azerr
}
return r.receiveMessages()
if r.serialHandler != nil {
return r.receiveMessagesInSerial()
}
return r.receiveMessagesInParallel()
}

func (r *Receiver) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -334,10 +396,17 @@ func (r *Receiver) open() error {
r.log.Infof("%s", azerr)
return azerr
}

r.receiver = receiver
for j := range len(r.handlers) {
err = r.handlers[j].Open()

if r.serialHandler == nil {
for j := range len(r.handlers) {
err = r.handlers[j].Open()
if err != nil {
return fmt.Errorf("failed to open handler: %w", err)
}
}
} else {
err = r.serialHandler.Open()
if err != nil {
return fmt.Errorf("failed to open handler: %w", err)
}
Expand All @@ -356,11 +425,15 @@ func (r *Receiver) close_() {
azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
}
r.receiver = nil
for j := range len(r.handlers) {
r.handlers[j].Close()
}
r.handlers = []Handler{}
if r.serialHandler != nil {
r.serialHandler.Close()
r.serialHandler = nil
}
r.receiver = nil
}
}
}
9 changes: 5 additions & 4 deletions taskfiles/Taskfile_codeqa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ tasks:
desc: Quality assurance of code
summary: "format sources (go fmt)"
cmds:
- gofmt -l -s -w .
- |
go fix ./...
goimports -w .
gofmt -l -s -w .
lint:
desc: Quality assurance of code
cmds:
- |
golangci-lint --version
go vet ./...
goimports {{.VERBOSE}} -w .
golangci-lint --version
golangci-lint {{.VERBOSE}} run --timeout 10m ./...
gofmt -l -s -w .
unit-tests:
desc: "run unit tests"
Expand Down

0 comments on commit 8ba2a73

Please sign in to comment.