Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use context.WithoutCancel in azbus #4

Merged
merged 3 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.task/
4 changes: 3 additions & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ version: '3'
includes:
codeqa:
taskfile: ./taskfiles/Taskfile_codeqa.yml
mocks:
taskfile: ./taskfiles/Taskfile_mocks.yml

tasks:

Expand All @@ -37,4 +39,4 @@ tasks:
test:
desc: run the tests
cmds:
- task: codeqa:unit-tests
- task: codeqa:unit-tests
27 changes: 0 additions & 27 deletions azbus/aliases.go

This file was deleted.

8 changes: 0 additions & 8 deletions azbus/correlationid.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,3 @@ func ContextFromReceivedMessage(ctx context.Context, message *ReceivedMessage) c
}
return correlationid.ContextWithCorrelationID(ctx, cid.(string))
}

func AddCorrelationIDOption(ctx context.Context, opts ...OutMessageOption) []OutMessageOption {
correlationID := correlationid.FromContext(ctx)
if correlationID == "" {
return opts
}
return append(opts, WithProperty(correlationid.CorrelationIDKey, correlationID))
}
57 changes: 0 additions & 57 deletions azbus/docs.go

This file was deleted.

8 changes: 2 additions & 6 deletions azbus/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

// alias so we dont have to import azservicebus elsewhere
type ServicebusCode = azservicebus.Code

// Azure package expects the user to elucidate errors like so:
//
// var servicebusError *azservicebus.Error
Expand All @@ -30,13 +27,12 @@ var (
ErrTimeout = errors.New("timeout")
)

// CodeUnauthorizedAccess is in the latest code but is not in the current version used.
func NewAzbusError(err error) error {
var servicebusError *azservicebus.Error
if errors.As(err, &servicebusError) {
switch servicebusError.Code {
// case azservicebus.CodeUnauthorizedAccess:
// return ErrUnauthorizedAccess
case azservicebus.CodeUnauthorizedAccess:
return ErrUnauthorizedAccess
case azservicebus.CodeConnectionLost:
return ErrConnectionLost
case azservicebus.CodeLockLost:
Expand Down
7 changes: 7 additions & 0 deletions azbus/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package azbus

import (
"github.com/rkvst/go-rkvstcommon/logger"
)

type Logger = logger.Logger
24 changes: 16 additions & 8 deletions azbus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package azbus

import (
"context"
"errors"
"time"
)

// Set a timeout for processing the message, this should be no later than
Expand All @@ -27,18 +29,24 @@ import (
// If it does timeout then it is too late anyway as the peeklock will already be released.
//
// for the time being we impose a timeout as it is safe.
func setTimeout(ctx context.Context, log Logger, msg *ReceivedMessage) (context.Context, context.CancelFunc) {
var (
ErrPeekLockTimeout = errors.New("peeklock deadline reached")
)

func setTimeout(ctx context.Context, log Logger, msg *ReceivedMessage) (context.Context, context.CancelFunc, time.Duration) {

var cancel context.CancelFunc

msgLockedUntil := msg.LockedUntil
if msgLockedUntil != nil {
ctx, cancel = context.WithDeadline(ctx, *msgLockedUntil)
log.Debugf("context deadline from message lock deadline: %v", ctx)
return ctx, cancel
log.Debugf("msg locked until %s", msg.LockedUntil)
if msg.LockedUntil != nil {
msgLockedUntil := *msg.LockedUntil
ctx, cancel = context.WithDeadlineCause(ctx, msgLockedUntil, ErrPeekLockTimeout)
maxDuration := msgLockedUntil.Sub(time.Now())
log.Debugf("msg must be processed in %s", maxDuration)
return ctx, cancel, maxDuration
}

ctx, cancel = context.WithTimeout(ctx, RenewalTime)
ctx, cancel = context.WithTimeoutCause(ctx, RenewalTime, ErrPeekLockTimeout)
log.Infof("could not get lock deadline from message, using fixed timeout %v", ctx)
return ctx, cancel
return ctx, cancel, RenewalTime
}
46 changes: 41 additions & 5 deletions azbus/mocks/Handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading