Skip to content

Commit

Permalink
Use context.WithoutCancel in azbus (#4)
Browse files Browse the repository at this point in the history
* Use context.WithoutCancel in azbus

The handcrafted contextWithoutCancel is replaced with correct
standard context.WithoutCancel from Go 1.21.

If processing a message takes longer than the message deadline a
WARNING is emitted with suggested remedial action.

Type aliases in aliases.go have been moved closer to their point
of usage and aliases.go deleted.

Internal logger is indexed with the name of the sender/receiver.

Mock generation Taskfile is added.

AB#8276
  • Loading branch information
eccles authored Sep 8, 2023
1 parent c42180e commit 76db13c
Show file tree
Hide file tree
Showing 18 changed files with 686 additions and 221 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.task/
4 changes: 3 additions & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ version: '3'
includes:
codeqa:
taskfile: ./taskfiles/Taskfile_codeqa.yml
mocks:
taskfile: ./taskfiles/Taskfile_mocks.yml

tasks:

Expand All @@ -37,4 +39,4 @@ tasks:
test:
desc: run the tests
cmds:
- task: codeqa:unit-tests
- task: codeqa:unit-tests
27 changes: 0 additions & 27 deletions azbus/aliases.go

This file was deleted.

8 changes: 0 additions & 8 deletions azbus/correlationid.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,3 @@ func ContextFromReceivedMessage(ctx context.Context, message *ReceivedMessage) c
}
return correlationid.ContextWithCorrelationID(ctx, cid.(string))
}

func AddCorrelationIDOption(ctx context.Context, opts ...OutMessageOption) []OutMessageOption {
correlationID := correlationid.FromContext(ctx)
if correlationID == "" {
return opts
}
return append(opts, WithProperty(correlationid.CorrelationIDKey, correlationID))
}
57 changes: 0 additions & 57 deletions azbus/docs.go

This file was deleted.

8 changes: 2 additions & 6 deletions azbus/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

// alias so we dont have to import azservicebus elsewhere
type ServicebusCode = azservicebus.Code

// Azure package expects the user to elucidate errors like so:
//
// var servicebusError *azservicebus.Error
Expand All @@ -30,13 +27,12 @@ var (
ErrTimeout = errors.New("timeout")
)

// CodeUnauthorizedAccess is in the latest code but is not in the current version used.
func NewAzbusError(err error) error {
var servicebusError *azservicebus.Error
if errors.As(err, &servicebusError) {
switch servicebusError.Code {
// case azservicebus.CodeUnauthorizedAccess:
// return ErrUnauthorizedAccess
case azservicebus.CodeUnauthorizedAccess:
return ErrUnauthorizedAccess
case azservicebus.CodeConnectionLost:
return ErrConnectionLost
case azservicebus.CodeLockLost:
Expand Down
7 changes: 7 additions & 0 deletions azbus/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package azbus

import (
"github.com/rkvst/go-rkvstcommon/logger"
)

type Logger = logger.Logger
24 changes: 16 additions & 8 deletions azbus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package azbus

import (
"context"
"errors"
"time"
)

// Set a timeout for processing the message, this should be no later than
Expand All @@ -27,18 +29,24 @@ import (
// If it does timeout then it is too late anyway as the peeklock will already be released.
//
// for the time being we impose a timeout as it is safe.
func setTimeout(ctx context.Context, log Logger, msg *ReceivedMessage) (context.Context, context.CancelFunc) {
var (
ErrPeekLockTimeout = errors.New("peeklock deadline reached")
)

func setTimeout(ctx context.Context, log Logger, msg *ReceivedMessage) (context.Context, context.CancelFunc, time.Duration) {

var cancel context.CancelFunc

msgLockedUntil := msg.LockedUntil
if msgLockedUntil != nil {
ctx, cancel = context.WithDeadline(ctx, *msgLockedUntil)
log.Debugf("context deadline from message lock deadline: %v", ctx)
return ctx, cancel
log.Debugf("msg locked until %s", msg.LockedUntil)
if msg.LockedUntil != nil {
msgLockedUntil := *msg.LockedUntil
ctx, cancel = context.WithDeadlineCause(ctx, msgLockedUntil, ErrPeekLockTimeout)
maxDuration := msgLockedUntil.Sub(time.Now())
log.Debugf("msg must be processed in %s", maxDuration)
return ctx, cancel, maxDuration
}

ctx, cancel = context.WithTimeout(ctx, RenewalTime)
ctx, cancel = context.WithTimeoutCause(ctx, RenewalTime, ErrPeekLockTimeout)
log.Infof("could not get lock deadline from message, using fixed timeout %v", ctx)
return ctx, cancel
return ctx, cancel, RenewalTime
}
46 changes: 41 additions & 5 deletions azbus/mocks/Handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 76db13c

Please sign in to comment.