diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f1c37ad --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.task/ diff --git a/Taskfile.yml b/Taskfile.yml index 3adb29f..03bf037 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -18,6 +18,8 @@ version: '3' includes: codeqa: taskfile: ./taskfiles/Taskfile_codeqa.yml + mocks: + taskfile: ./taskfiles/Taskfile_mocks.yml tasks: @@ -37,4 +39,4 @@ tasks: test: desc: run the tests cmds: - - task: codeqa:unit-tests \ No newline at end of file + - task: codeqa:unit-tests diff --git a/azbus/aliases.go b/azbus/aliases.go deleted file mode 100644 index be28610..0000000 --- a/azbus/aliases.go +++ /dev/null @@ -1,27 +0,0 @@ -package azbus - -import ( - "context" - - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - - "github.com/rkvst/go-rkvstcommon/logger" -) - -type Logger = logger.Logger - -// so we dont have to import the azure repo everywhere -type OutMessage = azservicebus.Message - -type ReceivedMessage = azservicebus.ReceivedMessage - -func NewOutMessage(data []byte) OutMessage { - return azservicebus.Message{ - Body: data, - } -} - -// not an alias but its convenient here -type Handler interface { - Handle(context.Context, *ReceivedMessage) error -} diff --git a/azbus/correlationid.go b/azbus/correlationid.go index c8bec6c..523702e 100644 --- a/azbus/correlationid.go +++ b/azbus/correlationid.go @@ -16,11 +16,3 @@ func ContextFromReceivedMessage(ctx context.Context, message *ReceivedMessage) c } return correlationid.ContextWithCorrelationID(ctx, cid.(string)) } - -func AddCorrelationIDOption(ctx context.Context, opts ...OutMessageOption) []OutMessageOption { - correlationID := correlationid.FromContext(ctx) - if correlationID == "" { - return opts - } - return append(opts, WithProperty(correlationid.CorrelationIDKey, correlationID)) -} diff --git a/azbus/docs.go b/azbus/docs.go deleted file mode 100644 index c1e00d8..0000000 --- a/azbus/docs.go +++ /dev/null @@ -1,57 +0,0 @@ -package azbus - -// TODO: this needs some attention as it is incorrect - -// Implement pubsubinterface -// Azure service bus pubsub interface -// -// Abstracts away all the Azure plumbing and replaces with simpler interface. -// -// First create the interface: -// -// -// Usage: Sending message(s): -// client := azsb.Receiver{ -// Cfg: azsb.ReceiverConfig{ -// ConnectionString: "blah-blah-blah...", -// TopicName: "userinterface", -// }, -// } -// -// ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) -// defer cancel() -// client.Open(ctx) -// defer client.Close(ctx) -// client.Publish(ctx, []byte("Hello World")) -// ...... more send messages if required. -// -// Receiving messages: -// -// client := azsb.Receiver{ -// Cfg: azsb.ReceiverConfig{ -// ConnectionString: "blah-blah-blah...", -// TopicName: "userinterface", -// SubscriptionName: "xxxx", -// }, -// } - -// -// handler := func(ctx context.Context, msg *azbus.ReceivedMessage) error { -// logger.Sugar.Infow( -// "Received", -// "message", msg, -// ) -// ctx, cancel := contexttWithTimeout(ctx, 60*time.Second) -// defer cancel() -// // do stuff -// msg.Complete(ctx) -// return nil -// } -// err := client.Subscribe(ctx, handler) -// var azerr *azbus.AzbusError -// if errors.As(err, &azerr) { -// // TopicName, SubscriptionName may be wrong so die... -// // else service bus not yet available so die... -// } else { -// // log transient error - maybe sleep and retry.. -// } diff --git a/azbus/error.go b/azbus/error.go index 2c9f0d4..f9eb1ad 100644 --- a/azbus/error.go +++ b/azbus/error.go @@ -6,9 +6,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) -// alias so we dont have to import azservicebus elsewhere -type ServicebusCode = azservicebus.Code - // Azure package expects the user to elucidate errors like so: // // var servicebusError *azservicebus.Error @@ -30,13 +27,12 @@ var ( ErrTimeout = errors.New("timeout") ) -// CodeUnauthorizedAccess is in the latest code but is not in the current version used. func NewAzbusError(err error) error { var servicebusError *azservicebus.Error if errors.As(err, &servicebusError) { switch servicebusError.Code { - // case azservicebus.CodeUnauthorizedAccess: - // return ErrUnauthorizedAccess + case azservicebus.CodeUnauthorizedAccess: + return ErrUnauthorizedAccess case azservicebus.CodeConnectionLost: return ErrConnectionLost case azservicebus.CodeLockLost: diff --git a/azbus/logger.go b/azbus/logger.go new file mode 100644 index 0000000..0363cf9 --- /dev/null +++ b/azbus/logger.go @@ -0,0 +1,7 @@ +package azbus + +import ( + "github.com/rkvst/go-rkvstcommon/logger" +) + +type Logger = logger.Logger diff --git a/azbus/message.go b/azbus/message.go index a4d3d47..05182b2 100644 --- a/azbus/message.go +++ b/azbus/message.go @@ -2,6 +2,8 @@ package azbus import ( "context" + "errors" + "time" ) // Set a timeout for processing the message, this should be no later than @@ -27,18 +29,24 @@ import ( // If it does timeout then it is too late anyway as the peeklock will already be released. // // for the time being we impose a timeout as it is safe. -func setTimeout(ctx context.Context, log Logger, msg *ReceivedMessage) (context.Context, context.CancelFunc) { +var ( + ErrPeekLockTimeout = errors.New("peeklock deadline reached") +) + +func setTimeout(ctx context.Context, log Logger, msg *ReceivedMessage) (context.Context, context.CancelFunc, time.Duration) { var cancel context.CancelFunc - msgLockedUntil := msg.LockedUntil - if msgLockedUntil != nil { - ctx, cancel = context.WithDeadline(ctx, *msgLockedUntil) - log.Debugf("context deadline from message lock deadline: %v", ctx) - return ctx, cancel + log.Debugf("msg locked until %s", msg.LockedUntil) + if msg.LockedUntil != nil { + msgLockedUntil := *msg.LockedUntil + ctx, cancel = context.WithDeadlineCause(ctx, msgLockedUntil, ErrPeekLockTimeout) + maxDuration := msgLockedUntil.Sub(time.Now()) + log.Debugf("msg must be processed in %s", maxDuration) + return ctx, cancel, maxDuration } - ctx, cancel = context.WithTimeout(ctx, RenewalTime) + ctx, cancel = context.WithTimeoutCause(ctx, RenewalTime, ErrPeekLockTimeout) log.Infof("could not get lock deadline from message, using fixed timeout %v", ctx) - return ctx, cancel + return ctx, cancel, RenewalTime } diff --git a/azbus/mocks/Handler.go b/azbus/mocks/Handler.go index f39b178..0cba5f8 100644 --- a/azbus/mocks/Handler.go +++ b/azbus/mocks/Handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.27.1. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mocks @@ -15,6 +15,14 @@ type Handler struct { mock.Mock } +type Handler_Expecter struct { + mock *mock.Mock +} + +func (_m *Handler) EXPECT() *Handler_Expecter { + return &Handler_Expecter{mock: &_m.Mock} +} + // Handle provides a mock function with given fields: _a0, _a1 func (_m *Handler) Handle(_a0 context.Context, _a1 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1) @@ -29,13 +37,41 @@ func (_m *Handler) Handle(_a0 context.Context, _a1 *azservicebus.ReceivedMessage return r0 } -type mockConstructorTestingTNewHandler interface { - mock.TestingT - Cleanup(func()) +// Handler_Handle_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Handle' +type Handler_Handle_Call struct { + *mock.Call +} + +// Handle is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *azservicebus.ReceivedMessage +func (_e *Handler_Expecter) Handle(_a0 interface{}, _a1 interface{}) *Handler_Handle_Call { + return &Handler_Handle_Call{Call: _e.mock.On("Handle", _a0, _a1)} +} + +func (_c *Handler_Handle_Call) Run(run func(_a0 context.Context, _a1 *azservicebus.ReceivedMessage)) *Handler_Handle_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *Handler_Handle_Call) Return(_a0 error) *Handler_Handle_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Handler_Handle_Call) RunAndReturn(run func(context.Context, *azservicebus.ReceivedMessage) error) *Handler_Handle_Call { + _c.Call.Return(run) + return _c } // NewHandler creates a new instance of Handler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewHandler(t mockConstructorTestingTNewHandler) *Handler { +// The first argument is typically a *testing.T value. +func NewHandler(t interface { + mock.TestingT + Cleanup(func()) +}) *Handler { mock := &Handler{} mock.Mock.Test(t) diff --git a/azbus/mocks/MsgReceiver.go b/azbus/mocks/MsgReceiver.go index 80b3022..a62c3e6 100644 --- a/azbus/mocks/MsgReceiver.go +++ b/azbus/mocks/MsgReceiver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.27.1. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mocks @@ -16,6 +16,14 @@ type MsgReceiver struct { mock.Mock } +type MsgReceiver_Expecter struct { + mock *mock.Mock +} + +func (_m *MsgReceiver) EXPECT() *MsgReceiver_Expecter { + return &MsgReceiver_Expecter{mock: &_m.Mock} +} + // Abandon provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgReceiver) Abandon(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1, _a2) @@ -30,11 +38,69 @@ func (_m *MsgReceiver) Abandon(_a0 context.Context, _a1 error, _a2 *azservicebus return r0 } +// MsgReceiver_Abandon_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Abandon' +type MsgReceiver_Abandon_Call struct { + *mock.Call +} + +// Abandon is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 error +// - _a2 *azservicebus.ReceivedMessage +func (_e *MsgReceiver_Expecter) Abandon(_a0 interface{}, _a1 interface{}, _a2 interface{}) *MsgReceiver_Abandon_Call { + return &MsgReceiver_Abandon_Call{Call: _e.mock.On("Abandon", _a0, _a1, _a2)} +} + +func (_c *MsgReceiver_Abandon_Call) Run(run func(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage)) *MsgReceiver_Abandon_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(error), args[2].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *MsgReceiver_Abandon_Call) Return(_a0 error) *MsgReceiver_Abandon_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_Abandon_Call) RunAndReturn(run func(context.Context, error, *azservicebus.ReceivedMessage) error) *MsgReceiver_Abandon_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: _a0 func (_m *MsgReceiver) Close(_a0 context.Context) { _m.Called(_a0) } +// MsgReceiver_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MsgReceiver_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +// - _a0 context.Context +func (_e *MsgReceiver_Expecter) Close(_a0 interface{}) *MsgReceiver_Close_Call { + return &MsgReceiver_Close_Call{Call: _e.mock.On("Close", _a0)} +} + +func (_c *MsgReceiver_Close_Call) Run(run func(_a0 context.Context)) *MsgReceiver_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MsgReceiver_Close_Call) Return() *MsgReceiver_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MsgReceiver_Close_Call) RunAndReturn(run func(context.Context)) *MsgReceiver_Close_Call { + _c.Call.Return(run) + return _c +} + // Complete provides a mock function with given fields: _a0, _a1 func (_m *MsgReceiver) Complete(_a0 context.Context, _a1 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1) @@ -49,6 +115,35 @@ func (_m *MsgReceiver) Complete(_a0 context.Context, _a1 *azservicebus.ReceivedM return r0 } +// MsgReceiver_Complete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Complete' +type MsgReceiver_Complete_Call struct { + *mock.Call +} + +// Complete is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *azservicebus.ReceivedMessage +func (_e *MsgReceiver_Expecter) Complete(_a0 interface{}, _a1 interface{}) *MsgReceiver_Complete_Call { + return &MsgReceiver_Complete_Call{Call: _e.mock.On("Complete", _a0, _a1)} +} + +func (_c *MsgReceiver_Complete_Call) Run(run func(_a0 context.Context, _a1 *azservicebus.ReceivedMessage)) *MsgReceiver_Complete_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *MsgReceiver_Complete_Call) Return(_a0 error) *MsgReceiver_Complete_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_Complete_Call) RunAndReturn(run func(context.Context, *azservicebus.ReceivedMessage) error) *MsgReceiver_Complete_Call { + _c.Call.Return(run) + return _c +} + // DeadLetter provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgReceiver) DeadLetter(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1, _a2) @@ -63,6 +158,36 @@ func (_m *MsgReceiver) DeadLetter(_a0 context.Context, _a1 error, _a2 *azservice return r0 } +// MsgReceiver_DeadLetter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeadLetter' +type MsgReceiver_DeadLetter_Call struct { + *mock.Call +} + +// DeadLetter is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 error +// - _a2 *azservicebus.ReceivedMessage +func (_e *MsgReceiver_Expecter) DeadLetter(_a0 interface{}, _a1 interface{}, _a2 interface{}) *MsgReceiver_DeadLetter_Call { + return &MsgReceiver_DeadLetter_Call{Call: _e.mock.On("DeadLetter", _a0, _a1, _a2)} +} + +func (_c *MsgReceiver_DeadLetter_Call) Run(run func(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage)) *MsgReceiver_DeadLetter_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(error), args[2].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *MsgReceiver_DeadLetter_Call) Return(_a0 error) *MsgReceiver_DeadLetter_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_DeadLetter_Call) RunAndReturn(run func(context.Context, error, *azservicebus.ReceivedMessage) error) *MsgReceiver_DeadLetter_Call { + _c.Call.Return(run) + return _c +} + // GetAZClient provides a mock function with given fields: func (_m *MsgReceiver) GetAZClient() azbus.AZClient { ret := _m.Called() @@ -77,6 +202,33 @@ func (_m *MsgReceiver) GetAZClient() azbus.AZClient { return r0 } +// MsgReceiver_GetAZClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAZClient' +type MsgReceiver_GetAZClient_Call struct { + *mock.Call +} + +// GetAZClient is a helper method to define mock.On call +func (_e *MsgReceiver_Expecter) GetAZClient() *MsgReceiver_GetAZClient_Call { + return &MsgReceiver_GetAZClient_Call{Call: _e.mock.On("GetAZClient")} +} + +func (_c *MsgReceiver_GetAZClient_Call) Run(run func()) *MsgReceiver_GetAZClient_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgReceiver_GetAZClient_Call) Return(_a0 azbus.AZClient) *MsgReceiver_GetAZClient_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_GetAZClient_Call) RunAndReturn(run func() azbus.AZClient) *MsgReceiver_GetAZClient_Call { + _c.Call.Return(run) + return _c +} + // Open provides a mock function with given fields: func (_m *MsgReceiver) Open() error { ret := _m.Called() @@ -91,6 +243,33 @@ func (_m *MsgReceiver) Open() error { return r0 } +// MsgReceiver_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type MsgReceiver_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +func (_e *MsgReceiver_Expecter) Open() *MsgReceiver_Open_Call { + return &MsgReceiver_Open_Call{Call: _e.mock.On("Open")} +} + +func (_c *MsgReceiver_Open_Call) Run(run func()) *MsgReceiver_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgReceiver_Open_Call) Return(_a0 error) *MsgReceiver_Open_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_Open_Call) RunAndReturn(run func() error) *MsgReceiver_Open_Call { + _c.Call.Return(run) + return _c +} + // ReceiveMessages provides a mock function with given fields: _a0 func (_m *MsgReceiver) ReceiveMessages(_a0 azbus.Handler) error { ret := _m.Called(_a0) @@ -105,6 +284,34 @@ func (_m *MsgReceiver) ReceiveMessages(_a0 azbus.Handler) error { return r0 } +// MsgReceiver_ReceiveMessages_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReceiveMessages' +type MsgReceiver_ReceiveMessages_Call struct { + *mock.Call +} + +// ReceiveMessages is a helper method to define mock.On call +// - _a0 azbus.Handler +func (_e *MsgReceiver_Expecter) ReceiveMessages(_a0 interface{}) *MsgReceiver_ReceiveMessages_Call { + return &MsgReceiver_ReceiveMessages_Call{Call: _e.mock.On("ReceiveMessages", _a0)} +} + +func (_c *MsgReceiver_ReceiveMessages_Call) Run(run func(_a0 azbus.Handler)) *MsgReceiver_ReceiveMessages_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(azbus.Handler)) + }) + return _c +} + +func (_c *MsgReceiver_ReceiveMessages_Call) Return(_a0 error) *MsgReceiver_ReceiveMessages_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_ReceiveMessages_Call) RunAndReturn(run func(azbus.Handler) error) *MsgReceiver_ReceiveMessages_Call { + _c.Call.Return(run) + return _c +} + // Reschedule provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgReceiver) Reschedule(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage) error { ret := _m.Called(_a0, _a1, _a2) @@ -119,6 +326,36 @@ func (_m *MsgReceiver) Reschedule(_a0 context.Context, _a1 error, _a2 *azservice return r0 } +// MsgReceiver_Reschedule_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reschedule' +type MsgReceiver_Reschedule_Call struct { + *mock.Call +} + +// Reschedule is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 error +// - _a2 *azservicebus.ReceivedMessage +func (_e *MsgReceiver_Expecter) Reschedule(_a0 interface{}, _a1 interface{}, _a2 interface{}) *MsgReceiver_Reschedule_Call { + return &MsgReceiver_Reschedule_Call{Call: _e.mock.On("Reschedule", _a0, _a1, _a2)} +} + +func (_c *MsgReceiver_Reschedule_Call) Run(run func(_a0 context.Context, _a1 error, _a2 *azservicebus.ReceivedMessage)) *MsgReceiver_Reschedule_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(error), args[2].(*azservicebus.ReceivedMessage)) + }) + return _c +} + +func (_c *MsgReceiver_Reschedule_Call) Return(_a0 error) *MsgReceiver_Reschedule_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_Reschedule_Call) RunAndReturn(run func(context.Context, error, *azservicebus.ReceivedMessage) error) *MsgReceiver_Reschedule_Call { + _c.Call.Return(run) + return _c +} + // String provides a mock function with given fields: func (_m *MsgReceiver) String() string { ret := _m.Called() @@ -133,13 +370,39 @@ func (_m *MsgReceiver) String() string { return r0 } -type mockConstructorTestingTNewMsgReceiver interface { - mock.TestingT - Cleanup(func()) +// MsgReceiver_String_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'String' +type MsgReceiver_String_Call struct { + *mock.Call +} + +// String is a helper method to define mock.On call +func (_e *MsgReceiver_Expecter) String() *MsgReceiver_String_Call { + return &MsgReceiver_String_Call{Call: _e.mock.On("String")} +} + +func (_c *MsgReceiver_String_Call) Run(run func()) *MsgReceiver_String_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgReceiver_String_Call) Return(_a0 string) *MsgReceiver_String_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgReceiver_String_Call) RunAndReturn(run func() string) *MsgReceiver_String_Call { + _c.Call.Return(run) + return _c } // NewMsgReceiver creates a new instance of MsgReceiver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMsgReceiver(t mockConstructorTestingTNewMsgReceiver) *MsgReceiver { +// The first argument is typically a *testing.T value. +func NewMsgReceiver(t interface { + mock.TestingT + Cleanup(func()) +}) *MsgReceiver { mock := &MsgReceiver{} mock.Mock.Test(t) diff --git a/azbus/mocks/MsgSender.go b/azbus/mocks/MsgSender.go index 0fd30b4..2440afe 100644 --- a/azbus/mocks/MsgSender.go +++ b/azbus/mocks/MsgSender.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.27.1. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mocks @@ -16,11 +16,47 @@ type MsgSender struct { mock.Mock } +type MsgSender_Expecter struct { + mock *mock.Mock +} + +func (_m *MsgSender) EXPECT() *MsgSender_Expecter { + return &MsgSender_Expecter{mock: &_m.Mock} +} + // Close provides a mock function with given fields: _a0 func (_m *MsgSender) Close(_a0 context.Context) { _m.Called(_a0) } +// MsgSender_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MsgSender_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +// - _a0 context.Context +func (_e *MsgSender_Expecter) Close(_a0 interface{}) *MsgSender_Close_Call { + return &MsgSender_Close_Call{Call: _e.mock.On("Close", _a0)} +} + +func (_c *MsgSender_Close_Call) Run(run func(_a0 context.Context)) *MsgSender_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MsgSender_Close_Call) Return() *MsgSender_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MsgSender_Close_Call) RunAndReturn(run func(context.Context)) *MsgSender_Close_Call { + _c.Call.Return(run) + return _c +} + // GetAZClient provides a mock function with given fields: func (_m *MsgSender) GetAZClient() azbus.AZClient { ret := _m.Called() @@ -35,6 +71,33 @@ func (_m *MsgSender) GetAZClient() azbus.AZClient { return r0 } +// MsgSender_GetAZClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAZClient' +type MsgSender_GetAZClient_Call struct { + *mock.Call +} + +// GetAZClient is a helper method to define mock.On call +func (_e *MsgSender_Expecter) GetAZClient() *MsgSender_GetAZClient_Call { + return &MsgSender_GetAZClient_Call{Call: _e.mock.On("GetAZClient")} +} + +func (_c *MsgSender_GetAZClient_Call) Run(run func()) *MsgSender_GetAZClient_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgSender_GetAZClient_Call) Return(_a0 azbus.AZClient) *MsgSender_GetAZClient_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_GetAZClient_Call) RunAndReturn(run func() azbus.AZClient) *MsgSender_GetAZClient_Call { + _c.Call.Return(run) + return _c +} + // Open provides a mock function with given fields: func (_m *MsgSender) Open() error { ret := _m.Called() @@ -49,6 +112,33 @@ func (_m *MsgSender) Open() error { return r0 } +// MsgSender_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type MsgSender_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +func (_e *MsgSender_Expecter) Open() *MsgSender_Open_Call { + return &MsgSender_Open_Call{Call: _e.mock.On("Open")} +} + +func (_c *MsgSender_Open_Call) Run(run func()) *MsgSender_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgSender_Open_Call) Return(_a0 error) *MsgSender_Open_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_Open_Call) RunAndReturn(run func() error) *MsgSender_Open_Call { + _c.Call.Return(run) + return _c +} + // Send provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgSender) Send(_a0 context.Context, _a1 []byte, _a2 ...azbus.OutMessageOption) error { _va := make([]interface{}, len(_a2)) @@ -70,6 +160,43 @@ func (_m *MsgSender) Send(_a0 context.Context, _a1 []byte, _a2 ...azbus.OutMessa return r0 } +// MsgSender_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send' +type MsgSender_Send_Call struct { + *mock.Call +} + +// Send is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 []byte +// - _a2 ...azbus.OutMessageOption +func (_e *MsgSender_Expecter) Send(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MsgSender_Send_Call { + return &MsgSender_Send_Call{Call: _e.mock.On("Send", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *MsgSender_Send_Call) Run(run func(_a0 context.Context, _a1 []byte, _a2 ...azbus.OutMessageOption)) *MsgSender_Send_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]azbus.OutMessageOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(azbus.OutMessageOption) + } + } + run(args[0].(context.Context), args[1].([]byte), variadicArgs...) + }) + return _c +} + +func (_c *MsgSender_Send_Call) Return(_a0 error) *MsgSender_Send_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_Send_Call) RunAndReturn(run func(context.Context, []byte, ...azbus.OutMessageOption) error) *MsgSender_Send_Call { + _c.Call.Return(run) + return _c +} + // SendMsg provides a mock function with given fields: _a0, _a1, _a2 func (_m *MsgSender) SendMsg(_a0 context.Context, _a1 azservicebus.Message, _a2 ...azbus.OutMessageOption) error { _va := make([]interface{}, len(_a2)) @@ -91,6 +218,43 @@ func (_m *MsgSender) SendMsg(_a0 context.Context, _a1 azservicebus.Message, _a2 return r0 } +// MsgSender_SendMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendMsg' +type MsgSender_SendMsg_Call struct { + *mock.Call +} + +// SendMsg is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 azservicebus.Message +// - _a2 ...azbus.OutMessageOption +func (_e *MsgSender_Expecter) SendMsg(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MsgSender_SendMsg_Call { + return &MsgSender_SendMsg_Call{Call: _e.mock.On("SendMsg", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *MsgSender_SendMsg_Call) Run(run func(_a0 context.Context, _a1 azservicebus.Message, _a2 ...azbus.OutMessageOption)) *MsgSender_SendMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]azbus.OutMessageOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(azbus.OutMessageOption) + } + } + run(args[0].(context.Context), args[1].(azservicebus.Message), variadicArgs...) + }) + return _c +} + +func (_c *MsgSender_SendMsg_Call) Return(_a0 error) *MsgSender_SendMsg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_SendMsg_Call) RunAndReturn(run func(context.Context, azservicebus.Message, ...azbus.OutMessageOption) error) *MsgSender_SendMsg_Call { + _c.Call.Return(run) + return _c +} + // String provides a mock function with given fields: func (_m *MsgSender) String() string { ret := _m.Called() @@ -105,13 +269,39 @@ func (_m *MsgSender) String() string { return r0 } -type mockConstructorTestingTNewMsgSender interface { - mock.TestingT - Cleanup(func()) +// MsgSender_String_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'String' +type MsgSender_String_Call struct { + *mock.Call +} + +// String is a helper method to define mock.On call +func (_e *MsgSender_Expecter) String() *MsgSender_String_Call { + return &MsgSender_String_Call{Call: _e.mock.On("String")} +} + +func (_c *MsgSender_String_Call) Run(run func()) *MsgSender_String_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MsgSender_String_Call) Return(_a0 string) *MsgSender_String_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MsgSender_String_Call) RunAndReturn(run func() string) *MsgSender_String_Call { + _c.Call.Return(run) + return _c } // NewMsgSender creates a new instance of MsgSender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMsgSender(t mockConstructorTestingTNewMsgSender) *MsgSender { +// The first argument is typically a *testing.T value. +func NewMsgSender(t interface { + mock.TestingT + Cleanup(func()) +}) *MsgSender { mock := &MsgSender{} mock.Mock.Test(t) diff --git a/azbus/mocks/OutMessageOption.go b/azbus/mocks/OutMessageOption.go index 6794cba..2255844 100644 --- a/azbus/mocks/OutMessageOption.go +++ b/azbus/mocks/OutMessageOption.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.27.1. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mocks @@ -12,18 +12,53 @@ type OutMessageOption struct { mock.Mock } +type OutMessageOption_Expecter struct { + mock *mock.Mock +} + +func (_m *OutMessageOption) EXPECT() *OutMessageOption_Expecter { + return &OutMessageOption_Expecter{mock: &_m.Mock} +} + // Execute provides a mock function with given fields: _a0 func (_m *OutMessageOption) Execute(_a0 *azservicebus.Message) { _m.Called(_a0) } -type mockConstructorTestingTNewOutMessageOption interface { - mock.TestingT - Cleanup(func()) +// OutMessageOption_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute' +type OutMessageOption_Execute_Call struct { + *mock.Call +} + +// Execute is a helper method to define mock.On call +// - _a0 *azservicebus.Message +func (_e *OutMessageOption_Expecter) Execute(_a0 interface{}) *OutMessageOption_Execute_Call { + return &OutMessageOption_Execute_Call{Call: _e.mock.On("Execute", _a0)} +} + +func (_c *OutMessageOption_Execute_Call) Run(run func(_a0 *azservicebus.Message)) *OutMessageOption_Execute_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*azservicebus.Message)) + }) + return _c +} + +func (_c *OutMessageOption_Execute_Call) Return() *OutMessageOption_Execute_Call { + _c.Call.Return() + return _c +} + +func (_c *OutMessageOption_Execute_Call) RunAndReturn(run func(*azservicebus.Message)) *OutMessageOption_Execute_Call { + _c.Call.Return(run) + return _c } // NewOutMessageOption creates a new instance of OutMessageOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewOutMessageOption(t mockConstructorTestingTNewOutMessageOption) *OutMessageOption { +// The first argument is typically a *testing.T value. +func NewOutMessageOption(t interface { + mock.TestingT + Cleanup(func()) +}) *OutMessageOption { mock := &OutMessageOption{} mock.Mock.Test(t) diff --git a/azbus/receiver.go b/azbus/receiver.go index afe59ac..8174882 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -2,6 +2,7 @@ package azbus import ( "context" + "errors" "fmt" "sync" "time" @@ -11,6 +12,13 @@ import ( "github.com/opentracing/opentracing-go" ) +// so we dont have to import the azure repo everywhere +type ReceivedMessage = azservicebus.ReceivedMessage + +type Handler interface { + Handle(context.Context, *ReceivedMessage) error +} + const ( // RenewalTime is the how often we want to renew the message PEEK lock // @@ -99,21 +107,22 @@ func NewReceiver(log Logger, cfg ReceiverConfig) *Receiver { } } - return &Receiver{ + r := &Receiver{ Cfg: cfg, - log: log, azClient: NewAZClient(cfg.ConnectionString), options: options, } + r.log = log.WithIndex("receiver", r.String()) + return r } func (r *Receiver) GetAZClient() AZClient { return r.azClient } -// String - returns string represena=tation of receiver. -// No log function calls in this methgod please.“ +// String - returns string representation of receiver. func (r *Receiver) String() string { + // No log function calls in this method please. if r.Cfg.SubscriptionName != "" { if r.Cfg.Deadletter { return fmt.Sprintf("%s.%s.deadletter", r.Cfg.TopicOrQueueName, r.Cfg.SubscriptionName) @@ -128,12 +137,29 @@ func (r *Receiver) String() string { // elapsed emits 2 log messages detailing how long processing took. // TODO: emit the processing time as a prometheus metric. -func (r *Receiver) elapsed(ctx context.Context, count int, total int, msg *ReceivedMessage, handler Handler) error { - r.log.Debugf("%s: Processing message %d of %d", r, count, total) +func (r *Receiver) elapsed(ctx context.Context, count int, total int, maxDuration time.Duration, msg *ReceivedMessage, handler Handler) error { now := time.Now() ctx = ContextFromReceivedMessage(ctx, msg) + log := r.log.FromContext(ctx) + defer log.Close() + + log.Debugf("Processing message %d of %d", count, total) err := handler.Handle(ctx, msg) - r.log.Debugf("%s: Processing message took %s", r, time.Since(now)) + duration := time.Since(now) + log.Debugf("Processing message %d took %s", count, duration) + // This is safe because maxDuration is only defined if RenewMessageLock is false. + if !r.Cfg.RenewMessageLock && duration >= maxDuration { + log.Infof("WARNING: processing msg %d duration %v took more than %v seconds", count, duration, maxDuration) + log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES") + log.Infof("WARNING: both can be found in the helm chart for each service.") + } + if err != nil { + if errors.Is(err, ErrPeekLockTimeout) { + log.Infof("WARNING: processing msg %d duration %s returned error: %v", count, duration, err) + log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES") + log.Infof("WARNING: both can be found in the helm chart for each service.") + } + } return err } @@ -181,8 +207,7 @@ func (r *Receiver) ReceiveMessages(handler Handler) error { return azerr } r.log.Debugf( - "Receive %s: NumberOfReceivedMessages %d, RenewMessageLock: %v", - r.String(), + "NumberOfReceivedMessages %d, RenewMessageLock: %v", r.Cfg.NumberOfReceivedMessages, r.Cfg.RenewMessageLock, ) @@ -200,11 +225,11 @@ func (r *Receiver) ReceiveMessages(handler Handler) error { r.log.Debugf("total messages %d", total) err = func(fctx context.Context) error { - var ectx context.Context // we need a cancellation if RenewMEssageLock is enabled + var ectx context.Context // we need a cancellation if RenewMessageLock is enabled var ecancel context.CancelFunc if r.Cfg.RenewMessageLock { // start up RenewMessageLock goroutines before processing any - // messages. Inherit values from input context. + // messages. ectx, ecancel = context.WithCancel(fctx) defer ecancel() for i := 0; i < total; i++ { @@ -216,16 +241,20 @@ func (r *Receiver) ReceiveMessages(handler Handler) error { // ignored by the elapsed function. var rctx context.Context // we need a timeout if RenewMessageLock is disabled var rcancel context.CancelFunc + var maxDuration time.Duration for i := 0; i < total; i++ { msg := messages[i] if r.Cfg.RenewMessageLock { rctx = fctx } else { - rctx, rcancel = setTimeout(fctx, r.log, msg) + rctx, rcancel, maxDuration = setTimeout(fctx, r.log, msg) defer rcancel() } - elapsedErr := r.elapsed(rctx, i+1, total, msg, handler) + elapsedErr := r.elapsed(rctx, i+1, total, maxDuration, msg, handler) if elapsedErr != nil { + // return here so that no further messages are processed + // XXXX: check for ErrPeekLockTimeout and only terminate + // then? return elapsedErr } } @@ -244,7 +273,7 @@ func (r *Receiver) Open() error { if r.receiver != nil { return nil } - r.log.Debugf("Open Receiver %s", r) + r.log.Debugf("Open") client, err := r.azClient.azClient() if err != nil { return err @@ -286,16 +315,16 @@ func (r *Receiver) Close(ctx context.Context) { // Abandon abandons message. This function is not used but is present for consistency. func (r *Receiver) Abandon(ctx context.Context, err error, msg *ReceivedMessage) error { - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) log := r.log.FromContext(ctx) defer log.Close() span, ctx := opentracing.StartSpanFromContext(ctx, "Message.Abandon") defer span.Finish() - log.Infof("%s, Abandon Message on DeliveryCount %d: %v", r, msg.DeliveryCount, err) + log.Infof("Abandon Message on DeliveryCount %d: %v", msg.DeliveryCount, err) err1 := r.receiver.AbandonMessage(ctx, msg, nil) if err1 != nil { - azerr := fmt.Errorf("%s: Abandon Message failure: %w", r, NewAzbusError(err1)) + azerr := fmt.Errorf("Abandon Message failure: %w", NewAzbusError(err1)) log.Infof("%s", azerr) } return nil @@ -307,51 +336,51 @@ func (r *Receiver) Abandon(ctx context.Context, err error, msg *ReceivedMessage) // unused arguments for consistency and in case we need to implement more sophisticated // algorithms in future. func (r *Receiver) Reschedule(ctx context.Context, err error, msg *ReceivedMessage) error { - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) log := r.log.FromContext(ctx) defer log.Close() span, _ := opentracing.StartSpanFromContext(ctx, "Message.Reschedule") defer span.Finish() - log.Infof("%s, Reschedule Message on DeliveryCount %d: %v", r, msg.DeliveryCount, err) + log.Infof("Reschedule Message on DeliveryCount %d: %v", msg.DeliveryCount, err) return nil } // DeadLetter explicitly deadletters a message. func (r *Receiver) DeadLetter(ctx context.Context, err error, msg *ReceivedMessage) error { - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) log := r.log.FromContext(ctx) defer log.Close() span, ctx := opentracing.StartSpanFromContext(ctx, "Message.DeadLetter") defer span.Finish() - log.Infof("%s: DeadLetter Message: %v", r, err) + log.Infof("DeadLetter Message: %v", err) options := azservicebus.DeadLetterOptions{ Reason: to.Ptr(err.Error()), } err1 := r.receiver.DeadLetterMessage(ctx, msg, &options) if err1 != nil { - azerr := fmt.Errorf("%s: DeadLetter Message failure: %w", r, NewAzbusError(err1)) + azerr := fmt.Errorf("DeadLetter Message failure: %w", NewAzbusError(err1)) log.Infof("%s", azerr) } return nil } func (r *Receiver) Complete(ctx context.Context, msg *ReceivedMessage) error { - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) log := r.log.FromContext(ctx) defer log.Close() span, _ := opentracing.StartSpanFromContext(ctx, "Message.Complete") defer span.Finish() - log.Infof("%s: Complete Message", r) + log.Infof("Complete Message") err := r.receiver.CompleteMessage(ctx, msg, nil) if err != nil { // If the completion fails then the message will get rescheduled, but it's effect will // have been made, so we could get duplication issues. - azerr := fmt.Errorf("%s: Complete: failed to settle message: %w", r, NewAzbusError(err)) + azerr := fmt.Errorf("Complete: failed to settle message: %w", NewAzbusError(err)) log.Infof("%s", azerr) } return nil diff --git a/azbus/sender.go b/azbus/sender.go index 72362eb..09fa2df 100644 --- a/azbus/sender.go +++ b/azbus/sender.go @@ -9,8 +9,19 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" otrace "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" + + "github.com/rkvst/go-rkvstcommon/correlationid" ) +// so we dont have to import the azure repo everywhere +type OutMessage = azservicebus.Message + +func NewOutMessage(data []byte) OutMessage { + return azservicebus.Message{ + Body: data, + } +} + // SenderConfig configuration for an azure servicebus namespace and queue type SenderConfig struct { ConnectionString string @@ -36,9 +47,9 @@ func NewSender(log Logger, cfg SenderConfig) *Sender { s := &Sender{ Cfg: cfg, - log: log, azClient: NewAZClient(cfg.ConnectionString), } + s.log = log.WithIndex("sender", s.String()) return s } @@ -54,7 +65,7 @@ func (s *Sender) Close(ctx context.Context) { var err error if s != nil && s.sender != nil { - s.log.Debugf("Close %s", s) + s.log.Debugf("Close") s.mtx.Lock() defer s.mtx.Unlock() err = s.sender.Close(ctx) @@ -85,7 +96,7 @@ func (s *Sender) Open() error { s.log.Infof("%s", azerr) return azerr } - s.log.Debugf("%s: Maximum message size is %d bytes", s, s.maxMessageSizeInBytes) + s.log.Debugf("Maximum message size is %d bytes", s.maxMessageSizeInBytes) sender, err := client.NewSender(s.Cfg.TopicOrQueueName, nil) if err != nil { @@ -94,7 +105,7 @@ func (s *Sender) Open() error { return azerr } - s.log.Debugf("Open Sender %s", s) + s.log.Debugf("Open") s.sender = sender return nil } @@ -116,7 +127,7 @@ func (s *Sender) SendMsg(ctx context.Context, message OutMessage, opts ...OutMes // Without this fix eventsourcepoller and similar services repeatedly context cancel and repeatedly // restart. - ctx = contextWithoutCancel(ctx) + ctx = context.WithoutCancel(ctx) var err error @@ -137,24 +148,27 @@ func (s *Sender) SendMsg(ctx context.Context, message OutMessage, opts ...OutMes size := int64(len(message.Body)) log.Debugf("%s: Msg Sized %d limit %d", s, size, s.maxMessageSizeInBytes) if size > s.maxMessageSizeInBytes { - log.Debugf("%s: Msg Sized %d > limit %d :%v", s, size, s.maxMessageSizeInBytes, ErrMessageOversized) + log.Debugf("Msg Sized %d > limit %d :%v", size, s.maxMessageSizeInBytes, ErrMessageOversized) return fmt.Errorf("%s: Msg Sized %d > limit %d :%w", s, size, s.maxMessageSizeInBytes, ErrMessageOversized) } now := time.Now() if message.ApplicationProperties == nil { message.ApplicationProperties = make(map[string]any) } - opts = AddCorrelationIDOption(ctx, opts...) for _, opt := range opts { opt(&message) } + correlationID := correlationid.FromContext(ctx) + if correlationID != "" { + message.ApplicationProperties[correlationid.CorrelationIDKey] = correlationID + } log.Debugf("ApplicationProperties %v", message.ApplicationProperties) err = s.sender.SendMessage(ctx, &message, nil) if err != nil { - azerr := fmt.Errorf("%s: Send failed in %s: %w", s, time.Since(now), NewAzbusError(err)) + azerr := fmt.Errorf("Send failed in %s: %w", time.Since(now), NewAzbusError(err)) log.Infof("%s", azerr) return azerr } - log.Debugf("%s: Sending message took %s", s, time.Since(now)) + log.Debugf("Sending message took %s", time.Since(now)) return nil } diff --git a/azbus/withoutcancel.go b/azbus/withoutcancel.go deleted file mode 100644 index e09b39e..0000000 --- a/azbus/withoutcancel.go +++ /dev/null @@ -1,35 +0,0 @@ -package azbus - -import ( - "context" - "time" -) - -// The azservicebus obeys a context.WithDeadline if present. However we have -// learned that retries are better so all disposition functions and the sender -// suppress any deadlines in the context. This has the added benefit of not -// reusing a context that has already cancelled - which means that otherwise -// disposition code will exit immediately. -// -// This problem of context having 2 responsibilities (breaking the single -// responsibility principle) is known - see this proposal: -// -// https://github.com/golang/go/issues/40221 -// -// context.WithoutCancel() was committed 2023-03-29 https://go-review.googlesource.com/c/go/+/479918 -// So we can assume it will be available in 1.21 in August 2023 -// -// meanwhile this is the workaround until Go 1.21 when this file will be deleted -type withoutCancelCtx struct { - context.Context -} - -func contextWithoutCancel(ctx context.Context) context.Context { - return withoutCancelCtx{ctx} -} -func (withoutCancelCtx) Deadline() (deadline time.Time, ok bool) { return } -func (withoutCancelCtx) Done() <-chan struct{} { return nil } -func (withoutCancelCtx) Err() error { return nil } - -// func (c withoutCancelCtx) Value(key any) any { return value(c, key) } -func (c withoutCancelCtx) String() string { return "WithoutCancel" } diff --git a/go.mod b/go.mod index bffb03a..67bb7e9 100644 --- a/go.mod +++ b/go.mod @@ -16,11 +16,12 @@ replace ( require ( github.com/Azure/azure-sdk-for-go v68.0.0+incompatible github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.1 - github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.2.1 + github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.4.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1 github.com/Azure/go-autorest/autorest v0.11.29 github.com/Azure/go-autorest/autorest/azure/auth v0.5.12 github.com/KimMachineGun/automemlimit v0.2.6 + github.com/ethereum/go-ethereum v0.0.0-20221208112643-d318a5aa973a github.com/go-redis/redis/v8 v8.11.5 github.com/google/uuid v1.3.1 github.com/gorilla/securecookie v1.1.1 @@ -39,9 +40,8 @@ require ( ) require ( + github.com/Azure/go-amqp v1.0.0 // indirect github.com/btcsuite/btcd v0.20.1-beta // indirect - github.com/ethereum/go-ethereum v0.0.0-20221208112643-d318a5aa973a // indirect - github.com/go-stack/stack v1.8.1 // indirect ) require ( @@ -54,13 +54,11 @@ require ( github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups/v3 v3.0.1 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/docker/go-units v0.4.0 // indirect @@ -68,7 +66,6 @@ require ( github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/holiman/uint256 v1.2.3 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492 // indirect diff --git a/go.sum b/go.sum index 15a0100..b0ad99d 100644 --- a/go.sum +++ b/go.sum @@ -27,11 +27,13 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdE github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= -github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.2.1 h1:ryVRjO3SrGrSM8PNlLuMbMYFz9vexPzvenNUEBfsgCo= -github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.2.1/go.mod h1:R6+0udeRV8iYSTVuT5RT7If4sc46K5Bz3ZKrmvZQF7U= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.4.0 h1:MxbPJrYY81a8xnMml4qICSq1z2WusPw3jSfdIMupnYM= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.4.0/go.mod h1:pXDkeh10bAqElvd+S5Ppncj+DCKvJGXNa8rRT2R7rIw= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1 h1:QSdcrd/UFJv6Bp/CfoVf2SrENpFn9P6Yh8yb+xNhYMM= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1/go.mod h1:eZ4g6GUvXiGulfIbbhh1Xr4XwUYaYaWMqzGD/284wCA= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4= +github.com/Azure/go-amqp v1.0.0 h1:QfCugi1M+4F2JDTRgVnRw7PYXLXZ9hmqk3+9+oJh3OA= +github.com/Azure/go-amqp v1.0.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.11.24/go.mod h1:G6kyRlFnTuSbEYkQGawPfsCswgme4iYf6rfSKUDzbCc= @@ -98,10 +100,6 @@ github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+Wji github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= -github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= -github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= -github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= -github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= @@ -146,10 +144,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= -github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= -github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -175,17 +169,16 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/ethereum/go-ethereum v1.12.2 h1:eGHJ4ij7oyVqUQn48LBz3B7pvQ8sV0wGJiIE6gDq/6Y= -github.com/ethereum/go-ethereum v1.12.2/go.mod h1:1cRAEV+rp/xX0zraSCBnu9Py3HQ+geRMj3HdR+k0wfI= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= @@ -205,7 +198,6 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -299,8 +291,6 @@ github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:i github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.2.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= -github.com/holiman/uint256 v1.2.3 h1:K8UWO1HUJpRMXBxbmaY1Y8IAMZC/RsKB+ArEnnK4l5o= -github.com/holiman/uint256 v1.2.3/go.mod h1:SC8Ryt4n+UBbPbIBKaG9zbbDlp4jOru9xFZmPzLUTxw= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88/go.mod h1:nNs7wvRfN1eKaMknBydLNQU6146XQim8t4h+q90biWo= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= @@ -811,7 +801,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= -nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= -nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/taskfiles/Taskfile_mocks.yml b/taskfiles/Taskfile_mocks.yml new file mode 100644 index 0000000..b20795e --- /dev/null +++ b/taskfiles/Taskfile_mocks.yml @@ -0,0 +1,24 @@ +--- + +version: '3' + +tasks: + + generate: + desc: Generate all mocks + deps: + - task: azbus + + generate-single-mock: + summary: generate a specific mock + cmds: + - rm -f {{.PATH}}/mocks/* + - mockery --all --exported --disable-version-string --with-expecter --dir {{.PATH}} --output {{.PATH}}/mocks + + azbus: + summary: Generate the common package azbus mocks + sources: + - ./azbus/**/*.go + cmds: + - task: generate-single-mock + vars: {PATH: "azbus" }