From 76db13c06d55296e59eecf30b223ef4b814e2066 Mon Sep 17 00:00:00 2001 From: Paul Hewlett <1104895+eccles@users.noreply.github.com> Date: Fri, 8 Sep 2023 13:39:54 +0100 Subject: [PATCH] Use context.WithoutCancel in azbus (#4) * Use context.WithoutCancel in azbus The handcrafted contextWithoutCancel is replaced with correct standard context.WithoutCancel from Go 1.21. If processing a message takes longer than the message deadline a WARNING is emitted with suggested remedial action. Type aliases in aliases.go have been moved closer to their point of usage and aliases.go deleted. Internal logger is indexed with the name of the sender/receiver. Mock generation Taskfile is added. AB#8276 --- .gitignore | 1 + Taskfile.yml | 4 +- azbus/aliases.go | 27 ---- azbus/correlationid.go | 8 - azbus/docs.go | 57 ------- azbus/error.go | 8 +- azbus/logger.go | 7 + azbus/message.go | 24 ++- azbus/mocks/Handler.go | 46 +++++- azbus/mocks/MsgReceiver.go | 273 +++++++++++++++++++++++++++++++- azbus/mocks/MsgSender.go | 200 ++++++++++++++++++++++- azbus/mocks/OutMessageOption.go | 45 +++++- azbus/receiver.go | 79 ++++++--- azbus/sender.go | 32 ++-- azbus/withoutcancel.go | 35 ---- go.mod | 9 +- go.sum | 28 ++-- taskfiles/Taskfile_mocks.yml | 24 +++ 18 files changed, 686 insertions(+), 221 deletions(-) create mode 100644 .gitignore delete mode 100644 azbus/aliases.go delete mode 100644 azbus/docs.go create mode 100644 azbus/logger.go delete mode 100644 azbus/withoutcancel.go create mode 100644 taskfiles/Taskfile_mocks.yml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f1c37ad --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.task/ diff --git a/Taskfile.yml b/Taskfile.yml index 3adb29f..03bf037 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -18,6 +18,8 @@ version: '3' includes: codeqa: taskfile: ./taskfiles/Taskfile_codeqa.yml + mocks: + taskfile: ./taskfiles/Taskfile_mocks.yml tasks: @@ -37,4 +39,4 @@ tasks: test: desc: run the tests cmds: - - task: codeqa:unit-tests \ No newline at end of file + - task: codeqa:unit-tests diff --git a/azbus/aliases.go b/azbus/aliases.go deleted file mode 100644 index be28610..0000000 --- a/azbus/aliases.go +++ /dev/null @@ -1,27 +0,0 @@ -package azbus - -import ( - "context" - - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - - "github.com/rkvst/go-rkvstcommon/logger" -) - -type Logger = logger.Logger - -// so we dont have to import the azure repo everywhere -type OutMessage = azservicebus.Message - -type ReceivedMessage = azservicebus.ReceivedMessage - -func NewOutMessage(data []byte) OutMessage { - return azservicebus.Message{ - Body: data, - } -} - -// not an alias but its convenient here -type Handler interface { - Handle(context.Context, *ReceivedMessage) error -} diff --git a/azbus/correlationid.go b/azbus/correlationid.go index c8bec6c..523702e 100644 --- a/azbus/correlationid.go +++ b/azbus/correlationid.go @@ -16,11 +16,3 @@ func ContextFromReceivedMessage(ctx context.Context, message *ReceivedMessage) c } return correlationid.ContextWithCorrelationID(ctx, cid.(string)) } - -func AddCorrelationIDOption(ctx context.Context, opts ...OutMessageOption) []OutMessageOption { - correlationID := correlationid.FromContext(ctx) - if correlationID == "" { - return opts - } - return append(opts, WithProperty(correlationid.CorrelationIDKey, correlationID)) -} diff --git a/azbus/docs.go b/azbus/docs.go deleted file mode 100644 index c1e00d8..0000000 --- a/azbus/docs.go +++ /dev/null @@ -1,57 +0,0 @@ -package azbus - -// TODO: this needs some attention as it is incorrect - -// Implement pubsubinterface -// Azure service bus pubsub interface -// -// Abstracts away all the Azure plumbing and replaces with simpler interface. -// -// First create the interface: -// -// -// Usage: Sending message(s): -// client := azsb.Receiver{ -// Cfg: azsb.ReceiverConfig{ -// ConnectionString: "blah-blah-blah...", -// TopicName: "userinterface", -// }, -// } -// -// ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) -// defer cancel() -// client.Open(ctx) -// defer client.Close(ctx) -// client.Publish(ctx, []byte("Hello World")) -// ...... more send messages if required. -// -// Receiving messages: -// -// client := azsb.Receiver{ -// Cfg: azsb.ReceiverConfig{ -// ConnectionString: "blah-blah-blah...", -// TopicName: "userinterface", -// SubscriptionName: "xxxx", -// }, -// } - -// -// handler := func(ctx context.Context, msg *azbus.ReceivedMessage) error { -// logger.Sugar.Infow( -// "Received", -// "message", msg, -// ) -// ctx, cancel := contexttWithTimeout(ctx, 60*time.Second) -// defer cancel() -// // do stuff -// msg.Complete(ctx) -// return nil -// } -// err := client.Subscribe(ctx, handler) -// var azerr *azbus.AzbusError -// if errors.As(err, &azerr) { -// // TopicName, SubscriptionName may be wrong so die... -// // else service bus not yet available so die... -// } else { -// // log transient error - maybe sleep and retry.. -// } diff --git a/azbus/error.go b/azbus/error.go index 2c9f0d4..f9eb1ad 100644 --- a/azbus/error.go +++ b/azbus/error.go @@ -6,9 +6,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) -// alias so we dont have to import azservicebus elsewhere -type ServicebusCode = azservicebus.Code - // Azure package expects the user to elucidate errors like so: // // var servicebusError *azservicebus.Error @@ -30,13 +27,12 @@ var ( ErrTimeout = errors.New("timeout") ) -// CodeUnauthorizedAccess is in the latest code but is not in the current version used. func NewAzbusError(err error) error { var servicebusError *azservicebus.Error if errors.As(err, &servicebusError) { switch servicebusError.Code { - // case azservicebus.CodeUnauthorizedAccess: - // return ErrUnauthorizedAccess + case azservicebus.CodeUnauthorizedAccess: + return ErrUnauthorizedAccess case azservicebus.CodeConnectionLost: return ErrConnectionLost case azservicebus.CodeLockLost: diff --git a/azbus/logger.go b/azbus/logger.go new file mode 100644 index 0000000..0363cf9 --- /dev/null +++ b/azbus/logger.go @@ -0,0 +1,7 @@ +package azbus + +import ( + "github.com/rkvst/go-rkvstcommon/logger" +) + +type Logger = logger.Logger diff --git a/azbus/message.go b/azbus/message.go index a4d3d47..05182b2 100644 --- a/azbus/message.go +++ b/azbus/message.go @@ -2,6 +2,8 @@ package azbus import ( "context" + "errors" + "time" ) // Set a timeout for processing the message, this should be no later than @@ -27,18 +29,24 @@ import ( // If it does timeout then it is too late anyway as the peeklock will already be released. // // for the time being we impose a timeout as it is safe. -func setTimeout(ctx context.Context, log Logger, msg *ReceivedMessage) (context.Context, context.CancelFunc) { +var ( + ErrPeekLockTimeout = errors.New("peeklock deadline reached") +) + +func setTimeout(ctx context.Context, log Logger, msg *ReceivedMessage) (context.Context, context.CancelFunc, time.Duration) { var cancel context.CancelFunc - msgLockedUntil := msg.LockedUntil - if msgLockedUntil != nil { - ctx, cancel = context.WithDeadline(ctx, *msgLockedUntil) - log.Debugf("context deadline from message lock deadline: %v", ctx) - return ctx, cancel + log.Debugf("msg locked until %s", msg.LockedUntil) + if msg.LockedUntil != nil { + msgLockedUntil := *msg.LockedUntil + ctx, cancel = context.WithDeadlineCause(ctx, msgLockedUntil, ErrPeekLockTimeout) + maxDuration := msgLockedUntil.Sub(time.Now()) + log.Debugf("msg must be processed in %s", maxDuration) + return ctx, cancel, maxDuration } - ctx, cancel = context.WithTimeout(ctx, RenewalTime) + ctx, cancel = context.WithTimeoutCause(ctx, RenewalTime, ErrPeekLockTimeout) log.Infof("could not get lock deadline from message, using fixed timeout %v", ctx) - return ctx, cancel + return ctx, cancel, RenewalTime } diff --git a/azbus/mocks/Handler.go b/azbus/mocks/Handler.go index f39b178..0cba5f8 100644 --- a/azbus/mocks/Handler.go +++ b/azbus/mocks/Handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.27.1. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mocks @@ -15,6 +15,14 @@ type Handler struct { mock.Mock } +type Handler_Expecter struct { + mock *mock.Mock +} + +func (_m *Handler) EXPECT() *Handler_Expecter { + return &Handler_Expecter{mock: &_m.Mock} +} + // Handle provides a mock function with given fields: _a0, _a1 func (_m *Handler) Handle(_a0 context.Context, _a1 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1) @@ -29,13 +37,41 @@ func (_m *Handler) Handle(_a0 context.Context, _a1 *azservicebus.ReceivedMessage return r0 } -type mockConstructorTestingTNewHandler interface { - mock.TestingT - Cleanup(func()) +// Handler_Handle_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Handle' +type Handler_Handle_Call struct { + *mock.Call +} + +// Handle is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *azservicebus.ReceivedMessage +func (_e *Handler_Expecter) Handle(_a0 interface{}, _a1 interface{}) *Handler_Handle_Call { + return &Handler_Handle_Call{Call: _e.mock.On("Handle", _a0, _a1)} +} + +func (_c *Handler_Handle_Call) Run(run func(_a0 context.Context, _a1 *azservicebus.ReceivedMessage)) *Handler_Handle_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *Handler_Handle_Call) Return(_a0 error) *Handler_Handle_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Handler_Handle_Call) RunAndReturn(run func(context.Context, *azservicebus.ReceivedMessage) error) *Handler_Handle_Call { + _c.Call.Return(run) + return _c } // NewHandler creates a new instance of Handler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewHandler(t mockConstructorTestingTNewHandler) *Handler { +// The first argument is typically a *testing.T value. +func NewHandler(t interface { + mock.TestingT + Cleanup(func()) +}) *Handler { mock := &Handler{} mock.Mock.Test(t) diff --git a/azbus/mocks/MsgReceiver.go b/azbus/mocks/MsgReceiver.go index 80b3022..a62c3e6 100644 --- a/azbus/mocks/MsgReceiver.go +++ b/azbus/mocks/MsgReceiver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.27.1. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mocks @@ -16,6 +16,14 @@ type MsgReceiver struct { mock.Mock } +type MsgReceiver_Expecter struct { + mock *mock.Mock +} + +func (_m *MsgReceiver) EXPECT() *MsgReceiver_Expecter { + return &MsgReceiver_Expecter{mock: &_m.Mock} +} + // Abandon provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgReceiver) Abandon(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1, _a2) @@ -30,11 +38,69 @@ func (_m *MsgReceiver) Abandon(_a0 context.Context, _a1 error, _a2 *azservicebus return r0 } +// MsgReceiver_Abandon_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Abandon' +type MsgReceiver_Abandon_Call struct { + *mock.Call +} + +// Abandon is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 error +// - _a2 *azservicebus.ReceivedMessage +func (_e *MsgReceiver_Expecter) Abandon(_a0 interface{}, _a1 interface{}, _a2 interface{}) *MsgReceiver_Abandon_Call { + return &MsgReceiver_Abandon_Call{Call: _e.mock.On("Abandon", _a0, _a1, _a2)} +} + +func (_c *MsgReceiver_Abandon_Call) Run(run func(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage)) *MsgReceiver_Abandon_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(error), args[2].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *MsgReceiver_Abandon_Call) Return(_a0 error) *MsgReceiver_Abandon_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_Abandon_Call) RunAndReturn(run func(context.Context, error, *azservicebus.ReceivedMessage) error) *MsgReceiver_Abandon_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: _a0 func (_m *MsgReceiver) Close(_a0 context.Context) { _m.Called(_a0) } +// MsgReceiver_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MsgReceiver_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +// - _a0 context.Context +func (_e *MsgReceiver_Expecter) Close(_a0 interface{}) *MsgReceiver_Close_Call { + return &MsgReceiver_Close_Call{Call: _e.mock.On("Close", _a0)} +} + +func (_c *MsgReceiver_Close_Call) Run(run func(_a0 context.Context)) *MsgReceiver_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MsgReceiver_Close_Call) Return() *MsgReceiver_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MsgReceiver_Close_Call) RunAndReturn(run func(context.Context)) *MsgReceiver_Close_Call { + _c.Call.Return(run) + return _c +} + // Complete provides a mock function with given fields: _a0, _a1 func (_m *MsgReceiver) Complete(_a0 context.Context, _a1 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1) @@ -49,6 +115,35 @@ func (_m *MsgReceiver) Complete(_a0 context.Context, _a1 *azservicebus.ReceivedM return r0 } +// MsgReceiver_Complete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Complete' +type MsgReceiver_Complete_Call struct { + *mock.Call +} + +// Complete is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *azservicebus.ReceivedMessage +func (_e *MsgReceiver_Expecter) Complete(_a0 interface{}, _a1 interface{}) *MsgReceiver_Complete_Call { + return &MsgReceiver_Complete_Call{Call: _e.mock.On("Complete", _a0, _a1)} +} + +func (_c *MsgReceiver_Complete_Call) Run(run func(_a0 context.Context, _a1 *azservicebus.ReceivedMessage)) *MsgReceiver_Complete_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *MsgReceiver_Complete_Call) Return(_a0 error) *MsgReceiver_Complete_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_Complete_Call) RunAndReturn(run func(context.Context, *azservicebus.ReceivedMessage) error) *MsgReceiver_Complete_Call { + _c.Call.Return(run) + return _c +} + // DeadLetter provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgReceiver) DeadLetter(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1, _a2) @@ -63,6 +158,36 @@ func (_m *MsgReceiver) DeadLetter(_a0 context.Context, _a1 error, _a2 *azservice return r0 } +// MsgReceiver_DeadLetter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeadLetter' +type MsgReceiver_DeadLetter_Call struct { + *mock.Call +} + +// DeadLetter is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 error +// - _a2 *azservicebus.ReceivedMessage +func (_e *MsgReceiver_Expecter) DeadLetter(_a0 interface{}, _a1 interface{}, _a2 interface{}) *MsgReceiver_DeadLetter_Call { + return &MsgReceiver_DeadLetter_Call{Call: _e.mock.On("DeadLetter", _a0, _a1, _a2)} +} + +func (_c *MsgReceiver_DeadLetter_Call) Run(run func(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage)) *MsgReceiver_DeadLetter_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(error), args[2].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *MsgReceiver_DeadLetter_Call) Return(_a0 error) *MsgReceiver_DeadLetter_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_DeadLetter_Call) RunAndReturn(run func(context.Context, error, *azservicebus.ReceivedMessage) error) *MsgReceiver_DeadLetter_Call { + _c.Call.Return(run) + return _c +} + // GetAZClient provides a mock function with given fields: func (_m *MsgReceiver) GetAZClient() azbus.AZClient { ret := _m.Called() @@ -77,6 +202,33 @@ func (_m *MsgReceiver) GetAZClient() azbus.AZClient { return r0 } +// MsgReceiver_GetAZClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAZClient' +type MsgReceiver_GetAZClient_Call struct { + *mock.Call +} + +// GetAZClient is a helper method to define mock.On call +func (_e *MsgReceiver_Expecter) GetAZClient() *MsgReceiver_GetAZClient_Call { + return &MsgReceiver_GetAZClient_Call{Call: _e.mock.On("GetAZClient")} +} + +func (_c *MsgReceiver_GetAZClient_Call) Run(run func()) *MsgReceiver_GetAZClient_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgReceiver_GetAZClient_Call) Return(_a0 azbus.AZClient) *MsgReceiver_GetAZClient_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_GetAZClient_Call) RunAndReturn(run func() azbus.AZClient) *MsgReceiver_GetAZClient_Call { + _c.Call.Return(run) + return _c +} + // Open provides a mock function with given fields: func (_m *MsgReceiver) Open() error { ret := _m.Called() @@ -91,6 +243,33 @@ func (_m *MsgReceiver) Open() error { return r0 } +// MsgReceiver_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type MsgReceiver_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +func (_e *MsgReceiver_Expecter) Open() *MsgReceiver_Open_Call { + return &MsgReceiver_Open_Call{Call: _e.mock.On("Open")} +} + +func (_c *MsgReceiver_Open_Call) Run(run func()) *MsgReceiver_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgReceiver_Open_Call) Return(_a0 error) *MsgReceiver_Open_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_Open_Call) RunAndReturn(run func() error) *MsgReceiver_Open_Call { + _c.Call.Return(run) + return _c +} + // ReceiveMessages provides a mock function with given fields: _a0 func (_m *MsgReceiver) ReceiveMessages(_a0 azbus.Handler) error { ret := _m.Called(_a0) @@ -105,6 +284,34 @@ func (_m *MsgReceiver) ReceiveMessages(_a0 azbus.Handler) error { return r0 } +// MsgReceiver_ReceiveMessages_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReceiveMessages' +type MsgReceiver_ReceiveMessages_Call struct { + *mock.Call +} + +// ReceiveMessages is a helper method to define mock.On call +// - _a0 azbus.Handler +func (_e *MsgReceiver_Expecter) ReceiveMessages(_a0 interface{}) *MsgReceiver_ReceiveMessages_Call { + return &MsgReceiver_ReceiveMessages_Call{Call: _e.mock.On("ReceiveMessages", _a0)} +} + +func (_c *MsgReceiver_ReceiveMessages_Call) Run(run func(_a0 azbus.Handler)) *MsgReceiver_ReceiveMessages_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(azbus.Handler)) + }) + return _c +} + +func (_c *MsgReceiver_ReceiveMessages_Call) Return(_a0 error) *MsgReceiver_ReceiveMessages_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_ReceiveMessages_Call) RunAndReturn(run func(azbus.Handler) error) *MsgReceiver_ReceiveMessages_Call { + _c.Call.Return(run) + return _c +} + // Reschedule provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgReceiver) Reschedule(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1, _a2) @@ -119,6 +326,36 @@ func (_m *MsgReceiver) Reschedule(_a0 context.Context, _a1 error, _a2 *azservice return r0 } +// MsgReceiver_Reschedule_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reschedule' +type MsgReceiver_Reschedule_Call struct { + *mock.Call +} + +// Reschedule is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 error +// - _a2 *azservicebus.ReceivedMessage +func (_e *MsgReceiver_Expecter) Reschedule(_a0 interface{}, _a1 interface{}, _a2 interface{}) *MsgReceiver_Reschedule_Call { + return &MsgReceiver_Reschedule_Call{Call: _e.mock.On("Reschedule", _a0, _a1, _a2)} +} + +func (_c *MsgReceiver_Reschedule_Call) Run(run func(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage)) *MsgReceiver_Reschedule_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(error), args[2].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *MsgReceiver_Reschedule_Call) Return(_a0 error) *MsgReceiver_Reschedule_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_Reschedule_Call) RunAndReturn(run func(context.Context, error, *azservicebus.ReceivedMessage) error) *MsgReceiver_Reschedule_Call { + _c.Call.Return(run) + return _c +} + // String provides a mock function with given fields: func (_m *MsgReceiver) String() string { ret := _m.Called() @@ -133,13 +370,39 @@ func (_m *MsgReceiver) String() string { return r0 } -type mockConstructorTestingTNewMsgReceiver interface { - mock.TestingT - Cleanup(func()) +// MsgReceiver_String_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'String' +type MsgReceiver_String_Call struct { + *mock.Call +} + +// String is a helper method to define mock.On call +func (_e *MsgReceiver_Expecter) String() *MsgReceiver_String_Call { + return &MsgReceiver_String_Call{Call: _e.mock.On("String")} +} + +func (_c *MsgReceiver_String_Call) Run(run func()) *MsgReceiver_String_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgReceiver_String_Call) Return(_a0 string) *MsgReceiver_String_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_String_Call) RunAndReturn(run func() string) *MsgReceiver_String_Call { + _c.Call.Return(run) + return _c } // NewMsgReceiver creates a new instance of MsgReceiver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMsgReceiver(t mockConstructorTestingTNewMsgReceiver) *MsgReceiver { +// The first argument is typically a *testing.T value. +func NewMsgReceiver(t interface { + mock.TestingT + Cleanup(func()) +}) *MsgReceiver { mock := &MsgReceiver{} mock.Mock.Test(t) diff --git a/azbus/mocks/MsgSender.go b/azbus/mocks/MsgSender.go index 0fd30b4..2440afe 100644 --- a/azbus/mocks/MsgSender.go +++ b/azbus/mocks/MsgSender.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.27.1. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mocks @@ -16,11 +16,47 @@ type MsgSender struct { mock.Mock } +type MsgSender_Expecter struct { + mock *mock.Mock +} + +func (_m *MsgSender) EXPECT() *MsgSender_Expecter { + return &MsgSender_Expecter{mock: &_m.Mock} +} + // Close provides a mock function with given fields: _a0 func (_m *MsgSender) Close(_a0 context.Context) { _m.Called(_a0) } +// MsgSender_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MsgSender_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +// - _a0 context.Context +func (_e *MsgSender_Expecter) Close(_a0 interface{}) *MsgSender_Close_Call { + return &MsgSender_Close_Call{Call: _e.mock.On("Close", _a0)} +} + +func (_c *MsgSender_Close_Call) Run(run func(_a0 context.Context)) *MsgSender_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MsgSender_Close_Call) Return() *MsgSender_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MsgSender_Close_Call) RunAndReturn(run func(context.Context)) *MsgSender_Close_Call { + _c.Call.Return(run) + return _c +} + // GetAZClient provides a mock function with given fields: func (_m *MsgSender) GetAZClient() azbus.AZClient { ret := _m.Called() @@ -35,6 +71,33 @@ func (_m *MsgSender) GetAZClient() azbus.AZClient { return r0 } +// MsgSender_GetAZClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAZClient' +type MsgSender_GetAZClient_Call struct { + *mock.Call +} + +// GetAZClient is a helper method to define mock.On call +func (_e *MsgSender_Expecter) GetAZClient() *MsgSender_GetAZClient_Call { + return &MsgSender_GetAZClient_Call{Call: _e.mock.On("GetAZClient")} +} + +func (_c *MsgSender_GetAZClient_Call) Run(run func()) *MsgSender_GetAZClient_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgSender_GetAZClient_Call) Return(_a0 azbus.AZClient) *MsgSender_GetAZClient_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_GetAZClient_Call) RunAndReturn(run func() azbus.AZClient) *MsgSender_GetAZClient_Call { + _c.Call.Return(run) + return _c +} + // Open provides a mock function with given fields: func (_m *MsgSender) Open() error { ret := _m.Called() @@ -49,6 +112,33 @@ func (_m *MsgSender) Open() error { return r0 } +// MsgSender_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type MsgSender_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +func (_e *MsgSender_Expecter) Open() *MsgSender_Open_Call { + return &MsgSender_Open_Call{Call: _e.mock.On("Open")} +} + +func (_c *MsgSender_Open_Call) Run(run func()) *MsgSender_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgSender_Open_Call) Return(_a0 error) *MsgSender_Open_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_Open_Call) RunAndReturn(run func() error) *MsgSender_Open_Call { + _c.Call.Return(run) + return _c +} + // Send provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgSender) Send(_a0 context.Context, _a1 []byte, _a2 ...azbus.OutMessageOption) error { _va := make([]interface{}, len(_a2)) @@ -70,6 +160,43 @@ func (_m *MsgSender) Send(_a0 context.Context, _a1 []byte, _a2 ...azbus.OutMessa return r0 } +// MsgSender_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send' +type MsgSender_Send_Call struct { + *mock.Call +} + +// Send is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 []byte +// - _a2 ...azbus.OutMessageOption +func (_e *MsgSender_Expecter) Send(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MsgSender_Send_Call { + return &MsgSender_Send_Call{Call: _e.mock.On("Send", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *MsgSender_Send_Call) Run(run func(_a0 context.Context, _a1 []byte, _a2 ...azbus.OutMessageOption)) *MsgSender_Send_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]azbus.OutMessageOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(azbus.OutMessageOption) + } + } + run(args[0].(context.Context), args[1].([]byte), variadicArgs...) + }) + return _c +} + +func (_c *MsgSender_Send_Call) Return(_a0 error) *MsgSender_Send_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_Send_Call) RunAndReturn(run func(context.Context, []byte, ...azbus.OutMessageOption) error) *MsgSender_Send_Call { + _c.Call.Return(run) + return _c +} + // SendMsg provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgSender) SendMsg(_a0 context.Context, _a1 azservicebus.Message, _a2 ...azbus.OutMessageOption) error { _va := make([]interface{}, len(_a2)) @@ -91,6 +218,43 @@ func (_m *MsgSender) SendMsg(_a0 context.Context, _a1 azservicebus.Message, _a2 return r0 } +// MsgSender_SendMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendMsg' +type MsgSender_SendMsg_Call struct { + *mock.Call +} + +// SendMsg is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 azservicebus.Message +// - _a2 ...azbus.OutMessageOption +func (_e *MsgSender_Expecter) SendMsg(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MsgSender_SendMsg_Call { + return &MsgSender_SendMsg_Call{Call: _e.mock.On("SendMsg", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *MsgSender_SendMsg_Call) Run(run func(_a0 context.Context, _a1 azservicebus.Message, _a2 ...azbus.OutMessageOption)) *MsgSender_SendMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]azbus.OutMessageOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(azbus.OutMessageOption) + } + } + run(args[0].(context.Context), args[1].(azservicebus.Message), variadicArgs...) + }) + return _c +} + +func (_c *MsgSender_SendMsg_Call) Return(_a0 error) *MsgSender_SendMsg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_SendMsg_Call) RunAndReturn(run func(context.Context, azservicebus.Message, ...azbus.OutMessageOption) error) *MsgSender_SendMsg_Call { + _c.Call.Return(run) + return _c +} + // String provides a mock function with given fields: func (_m *MsgSender) String() string { ret := _m.Called() @@ -105,13 +269,39 @@ func (_m *MsgSender) String() string { return r0 } -type mockConstructorTestingTNewMsgSender interface { - mock.TestingT - Cleanup(func()) +// MsgSender_String_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'String' +type MsgSender_String_Call struct { + *mock.Call +} + +// String is a helper method to define mock.On call +func (_e *MsgSender_Expecter) String() *MsgSender_String_Call { + return &MsgSender_String_Call{Call: _e.mock.On("String")} +} + +func (_c *MsgSender_String_Call) Run(run func()) *MsgSender_String_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgSender_String_Call) Return(_a0 string) *MsgSender_String_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_String_Call) RunAndReturn(run func() string) *MsgSender_String_Call { + _c.Call.Return(run) + return _c } // NewMsgSender creates a new instance of MsgSender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMsgSender(t mockConstructorTestingTNewMsgSender) *MsgSender { +// The first argument is typically a *testing.T value. +func NewMsgSender(t interface { + mock.TestingT + Cleanup(func()) +}) *MsgSender { mock := &MsgSender{} mock.Mock.Test(t) diff --git a/azbus/mocks/OutMessageOption.go b/azbus/mocks/OutMessageOption.go index 6794cba..2255844 100644 --- a/azbus/mocks/OutMessageOption.go +++ b/azbus/mocks/OutMessageOption.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.27.1. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mocks @@ -12,18 +12,53 @@ type OutMessageOption struct { mock.Mock } +type OutMessageOption_Expecter struct { + mock *mock.Mock +} + +func (_m *OutMessageOption) EXPECT() *OutMessageOption_Expecter { + return &OutMessageOption_Expecter{mock: &_m.Mock} +} + // Execute provides a mock function with given fields: _a0 func (_m *OutMessageOption) Execute(_a0 *azservicebus.Message) { _m.Called(_a0) } -type mockConstructorTestingTNewOutMessageOption interface { - mock.TestingT - Cleanup(func()) +// OutMessageOption_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute' +type OutMessageOption_Execute_Call struct { + *mock.Call +} + +// Execute is a helper method to define mock.On call +// - _a0 *azservicebus.Message +func (_e *OutMessageOption_Expecter) Execute(_a0 interface{}) *OutMessageOption_Execute_Call { + return &OutMessageOption_Execute_Call{Call: _e.mock.On("Execute", _a0)} +} + +func (_c *OutMessageOption_Execute_Call) Run(run func(_a0 *azservicebus.Message)) *OutMessageOption_Execute_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*azservicebus.Message)) + }) + return _c +} + +func (_c *OutMessageOption_Execute_Call) Return() *OutMessageOption_Execute_Call { + _c.Call.Return() + return _c +} + +func (_c *OutMessageOption_Execute_Call) RunAndReturn(run func(*azservicebus.Message)) *OutMessageOption_Execute_Call { + _c.Call.Return(run) + return _c } // NewOutMessageOption creates a new instance of OutMessageOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewOutMessageOption(t mockConstructorTestingTNewOutMessageOption) *OutMessageOption { +// The first argument is typically a *testing.T value. +func NewOutMessageOption(t interface { + mock.TestingT + Cleanup(func()) +}) *OutMessageOption { mock := &OutMessageOption{} mock.Mock.Test(t) diff --git a/azbus/receiver.go b/azbus/receiver.go index afe59ac..8174882 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -2,6 +2,7 @@ package azbus import ( "context" + "errors" "fmt" "sync" "time" @@ -11,6 +12,13 @@ import ( "github.com/opentracing/opentracing-go" ) +// so we dont have to import the azure repo everywhere +type ReceivedMessage = azservicebus.ReceivedMessage + +type Handler interface { + Handle(context.Context, *ReceivedMessage) error +} + const ( // RenewalTime is the how often we want to renew the message PEEK lock // @@ -99,21 +107,22 @@ func NewReceiver(log Logger, cfg ReceiverConfig) *Receiver { } } - return &Receiver{ + r := &Receiver{ Cfg: cfg, - log: log, azClient: NewAZClient(cfg.ConnectionString), options: options, } + r.log = log.WithIndex("receiver", r.String()) + return r } func (r *Receiver) GetAZClient() AZClient { return r.azClient } -// String - returns string represena=tation of receiver. -// No log function calls in this methgod please.“ +// String - returns string representation of receiver. func (r *Receiver) String() string { + // No log function calls in this method please. if r.Cfg.SubscriptionName != "" { if r.Cfg.Deadletter { return fmt.Sprintf("%s.%s.deadletter", r.Cfg.TopicOrQueueName, r.Cfg.SubscriptionName) @@ -128,12 +137,29 @@ func (r *Receiver) String() string { // elapsed emits 2 log messages detailing how long processing took. // TODO: emit the processing time as a prometheus metric. -func (r *Receiver) elapsed(ctx context.Context, count int, total int, msg *ReceivedMessage, handler Handler) error { - r.log.Debugf("%s: Processing message %d of %d", r, count, total) +func (r *Receiver) elapsed(ctx context.Context, count int, total int, maxDuration time.Duration, msg *ReceivedMessage, handler Handler) error { now := time.Now() ctx = ContextFromReceivedMessage(ctx, msg) + log := r.log.FromContext(ctx) + defer log.Close() + + log.Debugf("Processing message %d of %d", count, total) err := handler.Handle(ctx, msg) - r.log.Debugf("%s: Processing message took %s", r, time.Since(now)) + duration := time.Since(now) + log.Debugf("Processing message %d took %s", count, duration) + // This is safe because maxDuration is only defined if RenewMessageLock is false. + if !r.Cfg.RenewMessageLock && duration >= maxDuration { + log.Infof("WARNING: processing msg %d duration %v took more than %v seconds", count, duration, maxDuration) + log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES") + log.Infof("WARNING: both can be found in the helm chart for each service.") + } + if err != nil { + if errors.Is(err, ErrPeekLockTimeout) { + log.Infof("WARNING: processing msg %d duration %s returned error: %v", count, duration, err) + log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES") + log.Infof("WARNING: both can be found in the helm chart for each service.") + } + } return err } @@ -181,8 +207,7 @@ func (r *Receiver) ReceiveMessages(handler Handler) error { return azerr } r.log.Debugf( - "Receive %s: NumberOfReceivedMessages %d, RenewMessageLock: %v", - r.String(), + "NumberOfReceivedMessages %d, RenewMessageLock: %v", r.Cfg.NumberOfReceivedMessages, r.Cfg.RenewMessageLock, ) @@ -200,11 +225,11 @@ func (r *Receiver) ReceiveMessages(handler Handler) error { r.log.Debugf("total messages %d", total) err = func(fctx context.Context) error { - var ectx context.Context // we need a cancellation if RenewMEssageLock is enabled + var ectx context.Context // we need a cancellation if RenewMessageLock is enabled var ecancel context.CancelFunc if r.Cfg.RenewMessageLock { // start up RenewMessageLock goroutines before processing any - // messages. Inherit values from input context. + // messages. ectx, ecancel = context.WithCancel(fctx) defer ecancel() for i := 0; i < total; i++ { @@ -216,16 +241,20 @@ func (r *Receiver) ReceiveMessages(handler Handler) error { // ignored by the elapsed function. var rctx context.Context // we need a timeout if RenewMessageLock is disabled var rcancel context.CancelFunc + var maxDuration time.Duration for i := 0; i < total; i++ { msg := messages[i] if r.Cfg.RenewMessageLock { rctx = fctx } else { - rctx, rcancel = setTimeout(fctx, r.log, msg) + rctx, rcancel, maxDuration = setTimeout(fctx, r.log, msg) defer rcancel() } - elapsedErr := r.elapsed(rctx, i+1, total, msg, handler) + elapsedErr := r.elapsed(rctx, i+1, total, maxDuration, msg, handler) if elapsedErr != nil { + // return here so that no further messages are processed + // XXXX: check for ErrPeekLockTimeout and only terminate + // then? return elapsedErr } } @@ -244,7 +273,7 @@ func (r *Receiver) Open() error { if r.receiver != nil { return nil } - r.log.Debugf("Open Receiver %s", r) + r.log.Debugf("Open") client, err := r.azClient.azClient() if err != nil { return err @@ -286,16 +315,16 @@ func (r *Receiver) Close(ctx context.Context) { // Abandon abandons message. This function is not used but is present for consistency. func (r *Receiver) Abandon(ctx context.Context, err error, msg *ReceivedMessage) error { - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) log := r.log.FromContext(ctx) defer log.Close() span, ctx := opentracing.StartSpanFromContext(ctx, "Message.Abandon") defer span.Finish() - log.Infof("%s, Abandon Message on DeliveryCount %d: %v", r, msg.DeliveryCount, err) + log.Infof("Abandon Message on DeliveryCount %d: %v", msg.DeliveryCount, err) err1 := r.receiver.AbandonMessage(ctx, msg, nil) if err1 != nil { - azerr := fmt.Errorf("%s: Abandon Message failure: %w", r, NewAzbusError(err1)) + azerr := fmt.Errorf("Abandon Message failure: %w", NewAzbusError(err1)) log.Infof("%s", azerr) } return nil @@ -307,51 +336,51 @@ func (r *Receiver) Abandon(ctx context.Context, err error, msg *ReceivedMessage) // unused arguments for consistency and in case we need to implement more sophisticated // algorithms in future. func (r *Receiver) Reschedule(ctx context.Context, err error, msg *ReceivedMessage) error { - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) log := r.log.FromContext(ctx) defer log.Close() span, _ := opentracing.StartSpanFromContext(ctx, "Message.Reschedule") defer span.Finish() - log.Infof("%s, Reschedule Message on DeliveryCount %d: %v", r, msg.DeliveryCount, err) + log.Infof("Reschedule Message on DeliveryCount %d: %v", msg.DeliveryCount, err) return nil } // DeadLetter explicitly deadletters a message. func (r *Receiver) DeadLetter(ctx context.Context, err error, msg *ReceivedMessage) error { - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) log := r.log.FromContext(ctx) defer log.Close() span, ctx := opentracing.StartSpanFromContext(ctx, "Message.DeadLetter") defer span.Finish() - log.Infof("%s: DeadLetter Message: %v", r, err) + log.Infof("DeadLetter Message: %v", err) options := azservicebus.DeadLetterOptions{ Reason: to.Ptr(err.Error()), } err1 := r.receiver.DeadLetterMessage(ctx, msg, &options) if err1 != nil { - azerr := fmt.Errorf("%s: DeadLetter Message failure: %w", r, NewAzbusError(err1)) + azerr := fmt.Errorf("DeadLetter Message failure: %w", NewAzbusError(err1)) log.Infof("%s", azerr) } return nil } func (r *Receiver) Complete(ctx context.Context, msg *ReceivedMessage) error { - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) log := r.log.FromContext(ctx) defer log.Close() span, _ := opentracing.StartSpanFromContext(ctx, "Message.Complete") defer span.Finish() - log.Infof("%s: Complete Message", r) + log.Infof("Complete Message") err := r.receiver.CompleteMessage(ctx, msg, nil) if err != nil { // If the completion fails then the message will get rescheduled, but it's effect will // have been made, so we could get duplication issues. - azerr := fmt.Errorf("%s: Complete: failed to settle message: %w", r, NewAzbusError(err)) + azerr := fmt.Errorf("Complete: failed to settle message: %w", NewAzbusError(err)) log.Infof("%s", azerr) } return nil diff --git a/azbus/sender.go b/azbus/sender.go index 72362eb..09fa2df 100644 --- a/azbus/sender.go +++ b/azbus/sender.go @@ -9,8 +9,19 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" otrace "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" + + "github.com/rkvst/go-rkvstcommon/correlationid" ) +// so we dont have to import the azure repo everywhere +type OutMessage = azservicebus.Message + +func NewOutMessage(data []byte) OutMessage { + return azservicebus.Message{ + Body: data, + } +} + // SenderConfig configuration for an azure servicebus namespace and queue type SenderConfig struct { ConnectionString string @@ -36,9 +47,9 @@ func NewSender(log Logger, cfg SenderConfig) *Sender { s := &Sender{ Cfg: cfg, - log: log, azClient: NewAZClient(cfg.ConnectionString), } + s.log = log.WithIndex("sender", s.String()) return s } @@ -54,7 +65,7 @@ func (s *Sender) Close(ctx context.Context) { var err error if s != nil && s.sender != nil { - s.log.Debugf("Close %s", s) + s.log.Debugf("Close") s.mtx.Lock() defer s.mtx.Unlock() err = s.sender.Close(ctx) @@ -85,7 +96,7 @@ func (s *Sender) Open() error { s.log.Infof("%s", azerr) return azerr } - s.log.Debugf("%s: Maximum message size is %d bytes", s, s.maxMessageSizeInBytes) + s.log.Debugf("Maximum message size is %d bytes", s.maxMessageSizeInBytes) sender, err := client.NewSender(s.Cfg.TopicOrQueueName, nil) if err != nil { @@ -94,7 +105,7 @@ func (s *Sender) Open() error { return azerr } - s.log.Debugf("Open Sender %s", s) + s.log.Debugf("Open") s.sender = sender return nil } @@ -116,7 +127,7 @@ func (s *Sender) SendMsg(ctx context.Context, message OutMessage, opts ...OutMes // Without this fix eventsourcepoller and similar services repeatedly context cancel and repeatedly // restart. - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) var err error @@ -137,24 +148,27 @@ func (s *Sender) SendMsg(ctx context.Context, message OutMessage, opts ...OutMes size := int64(len(message.Body)) log.Debugf("%s: Msg Sized %d limit %d", s, size, s.maxMessageSizeInBytes) if size > s.maxMessageSizeInBytes { - log.Debugf("%s: Msg Sized %d > limit %d :%v", s, size, s.maxMessageSizeInBytes, ErrMessageOversized) + log.Debugf("Msg Sized %d > limit %d :%v", size, s.maxMessageSizeInBytes, ErrMessageOversized) return fmt.Errorf("%s: Msg Sized %d > limit %d :%w", s, size, s.maxMessageSizeInBytes, ErrMessageOversized) } now := time.Now() if message.ApplicationProperties == nil { message.ApplicationProperties = make(map[string]any) } - opts = AddCorrelationIDOption(ctx, opts...) for _, opt := range opts { opt(&message) } + correlationID := correlationid.FromContext(ctx) + if correlationID != "" { + message.ApplicationProperties[correlationid.CorrelationIDKey] = correlationID + } log.Debugf("ApplicationProperties %v", message.ApplicationProperties) err = s.sender.SendMessage(ctx, &message, nil) if err != nil { - azerr := fmt.Errorf("%s: Send failed in %s: %w", s, time.Since(now), NewAzbusError(err)) + azerr := fmt.Errorf("Send failed in %s: %w", time.Since(now), NewAzbusError(err)) log.Infof("%s", azerr) return azerr } - log.Debugf("%s: Sending message took %s", s, time.Since(now)) + log.Debugf("Sending message took %s", time.Since(now)) return nil } diff --git a/azbus/withoutcancel.go b/azbus/withoutcancel.go deleted file mode 100644 index e09b39e..0000000 --- a/azbus/withoutcancel.go +++ /dev/null @@ -1,35 +0,0 @@ -package azbus - -import ( - "context" - "time" -) - -// The azservicebus obeys a context.WithDeadline if present. However we have -// learned that retries are better so all disposition functions and the sender -// suppress any deadlines in the context. This has the added benefit of not -// reusing a context that has already cancelled - which means that otherwise -// disposition code will exit immediately. -// -// This problem of context having 2 responsibilities (breaking the single -// responsibility principle) is known - see this proposal: -// -// https://github.com/golang/go/issues/40221 -// -// context.WithoutCancel() was committed 2023-03-29 https://go-review.googlesource.com/c/go/+/479918 -// So we can assume it will be available in 1.21 in August 2023 -// -// meanwhile this is the workaround until Go 1.21 when this file will be deleted -type withoutCancelCtx struct { - context.Context -} - -func contextWithoutCancel(ctx context.Context) context.Context { - return withoutCancelCtx{ctx} -} -func (withoutCancelCtx) Deadline() (deadline time.Time, ok bool) { return } -func (withoutCancelCtx) Done() <-chan struct{} { return nil } -func (withoutCancelCtx) Err() error { return nil } - -// func (c withoutCancelCtx) Value(key any) any { return value(c, key) } -func (c withoutCancelCtx) String() string { return "WithoutCancel" } diff --git a/go.mod b/go.mod index bffb03a..67bb7e9 100644 --- a/go.mod +++ b/go.mod @@ -16,11 +16,12 @@ replace ( require ( github.com/Azure/azure-sdk-for-go v68.0.0+incompatible github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.1 - github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.2.1 + github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.4.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1 github.com/Azure/go-autorest/autorest v0.11.29 github.com/Azure/go-autorest/autorest/azure/auth v0.5.12 github.com/KimMachineGun/automemlimit v0.2.6 + github.com/ethereum/go-ethereum v0.0.0-20221208112643-d318a5aa973a github.com/go-redis/redis/v8 v8.11.5 github.com/google/uuid v1.3.1 github.com/gorilla/securecookie v1.1.1 @@ -39,9 +40,8 @@ require ( ) require ( + github.com/Azure/go-amqp v1.0.0 // indirect github.com/btcsuite/btcd v0.20.1-beta // indirect - github.com/ethereum/go-ethereum v0.0.0-20221208112643-d318a5aa973a // indirect - github.com/go-stack/stack v1.8.1 // indirect ) require ( @@ -54,13 +54,11 @@ require ( github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups/v3 v3.0.1 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/docker/go-units v0.4.0 // indirect @@ -68,7 +66,6 @@ require ( github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/holiman/uint256 v1.2.3 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492 // indirect diff --git a/go.sum b/go.sum index 15a0100..b0ad99d 100644 --- a/go.sum +++ b/go.sum @@ -27,11 +27,13 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdE github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= -github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.2.1 h1:ryVRjO3SrGrSM8PNlLuMbMYFz9vexPzvenNUEBfsgCo= -github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.2.1/go.mod h1:R6+0udeRV8iYSTVuT5RT7If4sc46K5Bz3ZKrmvZQF7U= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.4.0 h1:MxbPJrYY81a8xnMml4qICSq1z2WusPw3jSfdIMupnYM= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.4.0/go.mod h1:pXDkeh10bAqElvd+S5Ppncj+DCKvJGXNa8rRT2R7rIw= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1 h1:QSdcrd/UFJv6Bp/CfoVf2SrENpFn9P6Yh8yb+xNhYMM= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1/go.mod h1:eZ4g6GUvXiGulfIbbhh1Xr4XwUYaYaWMqzGD/284wCA= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4= +github.com/Azure/go-amqp v1.0.0 h1:QfCugi1M+4F2JDTRgVnRw7PYXLXZ9hmqk3+9+oJh3OA= +github.com/Azure/go-amqp v1.0.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.11.24/go.mod h1:G6kyRlFnTuSbEYkQGawPfsCswgme4iYf6rfSKUDzbCc= @@ -98,10 +100,6 @@ github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+Wji github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= -github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= -github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= -github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= -github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= @@ -146,10 +144,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= -github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= -github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -175,17 +169,16 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/ethereum/go-ethereum v1.12.2 h1:eGHJ4ij7oyVqUQn48LBz3B7pvQ8sV0wGJiIE6gDq/6Y= -github.com/ethereum/go-ethereum v1.12.2/go.mod h1:1cRAEV+rp/xX0zraSCBnu9Py3HQ+geRMj3HdR+k0wfI= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= @@ -205,7 +198,6 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -299,8 +291,6 @@ github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:i github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.2.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= -github.com/holiman/uint256 v1.2.3 h1:K8UWO1HUJpRMXBxbmaY1Y8IAMZC/RsKB+ArEnnK4l5o= -github.com/holiman/uint256 v1.2.3/go.mod h1:SC8Ryt4n+UBbPbIBKaG9zbbDlp4jOru9xFZmPzLUTxw= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88/go.mod h1:nNs7wvRfN1eKaMknBydLNQU6146XQim8t4h+q90biWo= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= @@ -811,7 +801,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= -nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= -nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/taskfiles/Taskfile_mocks.yml b/taskfiles/Taskfile_mocks.yml new file mode 100644 index 0000000..b20795e --- /dev/null +++ b/taskfiles/Taskfile_mocks.yml @@ -0,0 +1,24 @@ +--- + +version: '3' + +tasks: + + generate: + desc: Generate all mocks + deps: + - task: azbus + + generate-single-mock: + summary: generate a specific mock + cmds: + - rm -f {{.PATH}}/mocks/* + - mockery --all --exported --disable-version-string --with-expecter --dir {{.PATH}} --output {{.PATH}}/mocks + + azbus: + summary: Generate the common package azbus mocks + sources: + - ./azbus/**/*.go + cmds: + - task: generate-single-mock + vars: {PATH: "azbus" }