From b6c221cc2c269fa801031519bb71c4acd8af26ad Mon Sep 17 00:00:00 2001 From: Paul Hewlett Date: Fri, 16 Feb 2024 14:51:10 +0000 Subject: [PATCH 1/2] azbus handlers azbus Receiver opens and closes handlers when it is opened or closed. Convenience methods to add or get properties from a ReceivedMeaages or an OutMessage are now provided. AB#9031 --- .gitignore | 2 + azbus/mocks/Handler.go | 96 ++++++++++++++++++++++++++---- azbus/mocks/MsgReceiver.go | 74 ----------------------- azbus/mocks/MsgSender.go | 100 ++++--------------------------- azbus/mocks/OutMessageOption.go | 68 --------------------- azbus/msgreceiver.go | 5 +- azbus/msgsender.go | 5 +- azbus/outmessage.go | 33 +++++++++++ azbus/receivedmessage.go | 19 ++++++ azbus/receiver.go | 101 ++++++++++++++++---------------- azbus/sender.go | 44 ++++---------- azbus/tracing.go | 17 ++++-- metrics/metrics.go | 3 - 13 files changed, 225 insertions(+), 342 deletions(-) delete mode 100644 azbus/mocks/OutMessageOption.go create mode 100644 azbus/outmessage.go create mode 100644 azbus/receivedmessage.go diff --git a/.gitignore b/.gitignore index f5f294c..46d179e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .task/ +.bash_history +.viminfo .local/ .vscode/ diff --git a/azbus/mocks/Handler.go b/azbus/mocks/Handler.go index 32821ee..4391998 100644 --- a/azbus/mocks/Handler.go +++ b/azbus/mocks/Handler.go @@ -3,11 +3,10 @@ package mocks import ( - azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - azbus "github.com/datatrails/go-datatrails-common/azbus" - context "context" + azbus "github.com/datatrails/go-datatrails-common/azbus" + mock "github.com/stretchr/testify/mock" ) @@ -24,23 +23,55 @@ func (_m *Handler) EXPECT() *Handler_Expecter { return &Handler_Expecter{mock: &_m.Mock} } +// Close provides a mock function with given fields: +func (_m *Handler) Close() { + _m.Called() +} + +// Handler_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type Handler_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *Handler_Expecter) Close() *Handler_Close_Call { + return &Handler_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *Handler_Close_Call) Run(run func()) *Handler_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Handler_Close_Call) Return() *Handler_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *Handler_Close_Call) RunAndReturn(run func()) *Handler_Close_Call { + _c.Call.Return(run) + return _c +} + // Handle provides a mock function with given fields: _a0, _a1 -func (_m *Handler) Handle(_a0 context.Context, _a1 *azservicebus.ReceivedMessage) (azbus.Disposition, context.Context, error) { +func (_m *Handler) Handle(_a0 context.Context, _a1 *azbus.ReceivedMessage) (azbus.Disposition, context.Context, error) { ret := _m.Called(_a0, _a1) var r0 azbus.Disposition var r1 context.Context var r2 error - if rf, ok := ret.Get(0).(func(context.Context, *azservicebus.ReceivedMessage) (azbus.Disposition, context.Context, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, *azbus.ReceivedMessage) (azbus.Disposition, context.Context, error)); ok { return rf(_a0, _a1) } - if rf, ok := ret.Get(0).(func(context.Context, *azservicebus.ReceivedMessage) azbus.Disposition); ok { + if rf, ok := ret.Get(0).(func(context.Context, *azbus.ReceivedMessage) azbus.Disposition); ok { r0 = rf(_a0, _a1) } else { r0 = ret.Get(0).(azbus.Disposition) } - if rf, ok := ret.Get(1).(func(context.Context, *azservicebus.ReceivedMessage) context.Context); ok { + if rf, ok := ret.Get(1).(func(context.Context, *azbus.ReceivedMessage) context.Context); ok { r1 = rf(_a0, _a1) } else { if ret.Get(1) != nil { @@ -48,7 +79,7 @@ func (_m *Handler) Handle(_a0 context.Context, _a1 *azservicebus.ReceivedMessage } } - if rf, ok := ret.Get(2).(func(context.Context, *azservicebus.ReceivedMessage) error); ok { + if rf, ok := ret.Get(2).(func(context.Context, *azbus.ReceivedMessage) error); ok { r2 = rf(_a0, _a1) } else { r2 = ret.Error(2) @@ -64,14 +95,14 @@ type Handler_Handle_Call struct { // Handle is a helper method to define mock.On call // - _a0 context.Context -// - _a1 *azservicebus.ReceivedMessage +// - _a1 *azbus.ReceivedMessage func (_e *Handler_Expecter) Handle(_a0 interface{}, _a1 interface{}) *Handler_Handle_Call { return &Handler_Handle_Call{Call: _e.mock.On("Handle", _a0, _a1)} } -func (_c *Handler_Handle_Call) Run(run func(_a0 context.Context, _a1 *azservicebus.ReceivedMessage)) *Handler_Handle_Call { +func (_c *Handler_Handle_Call) Run(run func(_a0 context.Context, _a1 *azbus.ReceivedMessage)) *Handler_Handle_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*azservicebus.ReceivedMessage)) + run(args[0].(context.Context), args[1].(*azbus.ReceivedMessage)) }) return _c } @@ -81,7 +112,48 @@ func (_c *Handler_Handle_Call) Return(_a0 azbus.Disposition, _a1 context.Context return _c } -func (_c *Handler_Handle_Call) RunAndReturn(run func(context.Context, *azservicebus.ReceivedMessage) (azbus.Disposition, context.Context, error)) *Handler_Handle_Call { +func (_c *Handler_Handle_Call) RunAndReturn(run func(context.Context, *azbus.ReceivedMessage) (azbus.Disposition, context.Context, error)) *Handler_Handle_Call { + _c.Call.Return(run) + return _c +} + +// Open provides a mock function with given fields: +func (_m *Handler) Open() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Handler_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type Handler_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +func (_e *Handler_Expecter) Open() *Handler_Open_Call { + return &Handler_Open_Call{Call: _e.mock.On("Open")} +} + +func (_c *Handler_Open_Call) Run(run func()) *Handler_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Handler_Open_Call) Return(_a0 error) *Handler_Open_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Handler_Open_Call) RunAndReturn(run func() error) *Handler_Open_Call { _c.Call.Return(run) return _c } diff --git a/azbus/mocks/MsgReceiver.go b/azbus/mocks/MsgReceiver.go index c25d574..05f7523 100644 --- a/azbus/mocks/MsgReceiver.go +++ b/azbus/mocks/MsgReceiver.go @@ -23,39 +23,6 @@ func (_m *MsgReceiver) EXPECT() *MsgReceiver_Expecter { return &MsgReceiver_Expecter{mock: &_m.Mock} } -// Close provides a mock function with given fields: _a0 -func (_m *MsgReceiver) Close(_a0 context.Context) { - _m.Called(_a0) -} - -// MsgReceiver_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' -type MsgReceiver_Close_Call struct { - *mock.Call -} - -// Close is a helper method to define mock.On call -// - _a0 context.Context -func (_e *MsgReceiver_Expecter) Close(_a0 interface{}) *MsgReceiver_Close_Call { - return &MsgReceiver_Close_Call{Call: _e.mock.On("Close", _a0)} -} - -func (_c *MsgReceiver_Close_Call) Run(run func(_a0 context.Context)) *MsgReceiver_Close_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *MsgReceiver_Close_Call) Return() *MsgReceiver_Close_Call { - _c.Call.Return() - return _c -} - -func (_c *MsgReceiver_Close_Call) RunAndReturn(run func(context.Context)) *MsgReceiver_Close_Call { - _c.Call.Return(run) - return _c -} - // GetAZClient provides a mock function with given fields: func (_m *MsgReceiver) GetAZClient() azbus.AZClient { ret := _m.Called() @@ -138,47 +105,6 @@ func (_c *MsgReceiver_Listen_Call) RunAndReturn(run func() error) *MsgReceiver_L return _c } -// Open provides a mock function with given fields: -func (_m *MsgReceiver) Open() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MsgReceiver_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' -type MsgReceiver_Open_Call struct { - *mock.Call -} - -// Open is a helper method to define mock.On call -func (_e *MsgReceiver_Expecter) Open() *MsgReceiver_Open_Call { - return &MsgReceiver_Open_Call{Call: _e.mock.On("Open")} -} - -func (_c *MsgReceiver_Open_Call) Run(run func()) *MsgReceiver_Open_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MsgReceiver_Open_Call) Return(_a0 error) *MsgReceiver_Open_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MsgReceiver_Open_Call) RunAndReturn(run func() error) *MsgReceiver_Open_Call { - _c.Call.Return(run) - return _c -} - // Shutdown provides a mock function with given fields: _a0 func (_m *MsgReceiver) Shutdown(_a0 context.Context) error { ret := _m.Called(_a0) diff --git a/azbus/mocks/MsgSender.go b/azbus/mocks/MsgSender.go index 38b0b0a..fb2fe1e 100644 --- a/azbus/mocks/MsgSender.go +++ b/azbus/mocks/MsgSender.go @@ -3,11 +3,10 @@ package mocks import ( - azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - azbus "github.com/datatrails/go-datatrails-common/azbus" - context "context" + azbus "github.com/datatrails/go-datatrails-common/azbus" + mock "github.com/stretchr/testify/mock" ) @@ -139,20 +138,13 @@ func (_c *MsgSender_Open_Call) RunAndReturn(run func() error) *MsgSender_Open_Ca return _c } -// Send provides a mock function with given fields: _a0, _a1, _a2 -func (_m *MsgSender) Send(_a0 context.Context, _a1 []byte, _a2 ...azbus.OutMessageOption) error { - _va := make([]interface{}, len(_a2)) - for _i := range _a2 { - _va[_i] = _a2[_i] - } - var _ca []interface{} - _ca = append(_ca, _a0, _a1) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) +// Send provides a mock function with given fields: _a0, _a1 +func (_m *MsgSender) Send(_a0 context.Context, _a1 *azbus.OutMessage) error { + ret := _m.Called(_a0, _a1) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []byte, ...azbus.OutMessageOption) error); ok { - r0 = rf(_a0, _a1, _a2...) + if rf, ok := ret.Get(0).(func(context.Context, *azbus.OutMessage) error); ok { + r0 = rf(_a0, _a1) } else { r0 = ret.Error(0) } @@ -167,22 +159,14 @@ type MsgSender_Send_Call struct { // Send is a helper method to define mock.On call // - _a0 context.Context -// - _a1 []byte -// - _a2 ...azbus.OutMessageOption -func (_e *MsgSender_Expecter) Send(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MsgSender_Send_Call { - return &MsgSender_Send_Call{Call: _e.mock.On("Send", - append([]interface{}{_a0, _a1}, _a2...)...)} +// - _a1 *azbus.OutMessage +func (_e *MsgSender_Expecter) Send(_a0 interface{}, _a1 interface{}) *MsgSender_Send_Call { + return &MsgSender_Send_Call{Call: _e.mock.On("Send", _a0, _a1)} } -func (_c *MsgSender_Send_Call) Run(run func(_a0 context.Context, _a1 []byte, _a2 ...azbus.OutMessageOption)) *MsgSender_Send_Call { +func (_c *MsgSender_Send_Call) Run(run func(_a0 context.Context, _a1 *azbus.OutMessage)) *MsgSender_Send_Call { _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]azbus.OutMessageOption, len(args)-2) - for i, a := range args[2:] { - if a != nil { - variadicArgs[i] = a.(azbus.OutMessageOption) - } - } - run(args[0].(context.Context), args[1].([]byte), variadicArgs...) + run(args[0].(context.Context), args[1].(*azbus.OutMessage)) }) return _c } @@ -192,65 +176,7 @@ func (_c *MsgSender_Send_Call) Return(_a0 error) *MsgSender_Send_Call { return _c } -func (_c *MsgSender_Send_Call) RunAndReturn(run func(context.Context, []byte, ...azbus.OutMessageOption) error) *MsgSender_Send_Call { - _c.Call.Return(run) - return _c -} - -// SendMsg provides a mock function with given fields: _a0, _a1, _a2 -func (_m *MsgSender) SendMsg(_a0 context.Context, _a1 azservicebus.Message, _a2 ...azbus.OutMessageOption) error { - _va := make([]interface{}, len(_a2)) - for _i := range _a2 { - _va[_i] = _a2[_i] - } - var _ca []interface{} - _ca = append(_ca, _a0, _a1) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, azservicebus.Message, ...azbus.OutMessageOption) error); ok { - r0 = rf(_a0, _a1, _a2...) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MsgSender_SendMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendMsg' -type MsgSender_SendMsg_Call struct { - *mock.Call -} - -// SendMsg is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 azservicebus.Message -// - _a2 ...azbus.OutMessageOption -func (_e *MsgSender_Expecter) SendMsg(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MsgSender_SendMsg_Call { - return &MsgSender_SendMsg_Call{Call: _e.mock.On("SendMsg", - append([]interface{}{_a0, _a1}, _a2...)...)} -} - -func (_c *MsgSender_SendMsg_Call) Run(run func(_a0 context.Context, _a1 azservicebus.Message, _a2 ...azbus.OutMessageOption)) *MsgSender_SendMsg_Call { - _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]azbus.OutMessageOption, len(args)-2) - for i, a := range args[2:] { - if a != nil { - variadicArgs[i] = a.(azbus.OutMessageOption) - } - } - run(args[0].(context.Context), args[1].(azservicebus.Message), variadicArgs...) - }) - return _c -} - -func (_c *MsgSender_SendMsg_Call) Return(_a0 error) *MsgSender_SendMsg_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MsgSender_SendMsg_Call) RunAndReturn(run func(context.Context, azservicebus.Message, ...azbus.OutMessageOption) error) *MsgSender_SendMsg_Call { +func (_c *MsgSender_Send_Call) RunAndReturn(run func(context.Context, *azbus.OutMessage) error) *MsgSender_Send_Call { _c.Call.Return(run) return _c } diff --git a/azbus/mocks/OutMessageOption.go b/azbus/mocks/OutMessageOption.go deleted file mode 100644 index 2255844..0000000 --- a/azbus/mocks/OutMessageOption.go +++ /dev/null @@ -1,68 +0,0 @@ -// Code generated by mockery. DO NOT EDIT. - -package mocks - -import ( - azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - mock "github.com/stretchr/testify/mock" -) - -// OutMessageOption is an autogenerated mock type for the OutMessageOption type -type OutMessageOption struct { - mock.Mock -} - -type OutMessageOption_Expecter struct { - mock *mock.Mock -} - -func (_m *OutMessageOption) EXPECT() *OutMessageOption_Expecter { - return &OutMessageOption_Expecter{mock: &_m.Mock} -} - -// Execute provides a mock function with given fields: _a0 -func (_m *OutMessageOption) Execute(_a0 *azservicebus.Message) { - _m.Called(_a0) -} - -// OutMessageOption_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute' -type OutMessageOption_Execute_Call struct { - *mock.Call -} - -// Execute is a helper method to define mock.On call -// - _a0 *azservicebus.Message -func (_e *OutMessageOption_Expecter) Execute(_a0 interface{}) *OutMessageOption_Execute_Call { - return &OutMessageOption_Execute_Call{Call: _e.mock.On("Execute", _a0)} -} - -func (_c *OutMessageOption_Execute_Call) Run(run func(_a0 *azservicebus.Message)) *OutMessageOption_Execute_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*azservicebus.Message)) - }) - return _c -} - -func (_c *OutMessageOption_Execute_Call) Return() *OutMessageOption_Execute_Call { - _c.Call.Return() - return _c -} - -func (_c *OutMessageOption_Execute_Call) RunAndReturn(run func(*azservicebus.Message)) *OutMessageOption_Execute_Call { - _c.Call.Return(run) - return _c -} - -// NewOutMessageOption creates a new instance of OutMessageOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewOutMessageOption(t interface { - mock.TestingT - Cleanup(func()) -}) *OutMessageOption { - mock := &OutMessageOption{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/azbus/msgreceiver.go b/azbus/msgreceiver.go index ef4093b..1f464e3 100644 --- a/azbus/msgreceiver.go +++ b/azbus/msgreceiver.go @@ -5,13 +5,10 @@ import ( ) type MsgReceiver interface { - Open() error - Close(context.Context) - String() string - // Listener interface Listen() error Shutdown(context.Context) error GetAZClient() AZClient + String() string } diff --git a/azbus/msgsender.go b/azbus/msgsender.go index 580ecc5..7b54ac1 100644 --- a/azbus/msgsender.go +++ b/azbus/msgsender.go @@ -6,9 +6,10 @@ import ( type MsgSender interface { Open() error - Send(context.Context, []byte, ...OutMessageOption) error - SendMsg(context.Context, OutMessage, ...OutMessageOption) error Close(context.Context) + + Send(context.Context, *OutMessage) error String() string + GetAZClient() AZClient } diff --git a/azbus/outmessage.go b/azbus/outmessage.go new file mode 100644 index 0000000..c65b3f5 --- /dev/null +++ b/azbus/outmessage.go @@ -0,0 +1,33 @@ +package azbus + +import ( + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +// OutMessage abstracts the output message interface. +type OutMessage = azservicebus.Message + +// We dont use With style options as this is executed in the hotpath. +func NewOutMessage(data []byte) *OutMessage { + var o OutMessage + return newOutMessage(&o, data) +} + +// function outlining +func newOutMessage(o *OutMessage, data []byte) *OutMessage { + o.Body = data + o.ApplicationProperties = make(map[string]any) + return o +} + +// SetProperty adds key-value pair to OutMessage and can be chained. +func OutMessageSetProperty(o *OutMessage, k string, v any) { + o.ApplicationProperties[k] = v +} + +func OutMessageProperties(o *OutMessage) map[string]any { + if o.ApplicationProperties != nil { + return o.ApplicationProperties + } + return make(map[string]any) +} diff --git a/azbus/receivedmessage.go b/azbus/receivedmessage.go new file mode 100644 index 0000000..f1f7fad --- /dev/null +++ b/azbus/receivedmessage.go @@ -0,0 +1,19 @@ +package azbus + +import ( + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +type ReceivedMessage = azservicebus.ReceivedMessage + +func ReceivedProperties(r *ReceivedMessage) map[string]any { + if r.ApplicationProperties != nil { + return r.ApplicationProperties + } + return make(map[string]any) +} + +// SetProperty adds key-value pair to Message and can be chained. +func ReceivedSetProperty(r *ReceivedMessage, k string, v any) { + r.ApplicationProperties[k] = v +} diff --git a/azbus/receiver.go b/azbus/receiver.go index da54f79..76868db 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -14,12 +14,11 @@ var ( ErrNoHandler = errors.New("no handler defined") ) -// so we dont have to import the azure repo everywhere -type ReceivedMessage = azservicebus.ReceivedMessage - // Handler processes a ReceivedMessage. type Handler interface { Handle(context.Context, *ReceivedMessage) (Disposition, context.Context, error) + Open() error + Close() } const ( @@ -45,18 +44,6 @@ const ( // Settings for Receivers: // -// NumberOfReceivedMessages int -// -// The number of messages fetched simultaneously from azure servicebus. -// Currently this figure cannot be high (suggest <6) as the peeklock -// timeout for all msgs starts as soon as fetched. This means that -// if the processing of a previous msg fetched takes a long time -// because of send retries then following messages start with some of -// their peeklock time used up. Currently we have 60s to process all the -// messages. This can be fixed by setting RenewMessageLock to true. -// Some services start up multiple handlers and this setting should be 1 -// in this case. -// // RenewMessageLock bool // // If true the peeklocktimeout is restarted after 50s. This is currently @@ -82,9 +69,8 @@ type ReceiverConfig struct { SubscriptionName string // See azbus/receiver.go - NumberOfReceivedMessages int - RenewMessageLock bool - RenewMessageTime time.Duration + RenewMessageLock bool + RenewMessageTime time.Duration // If a deadletter receiver then this is true Deadletter bool @@ -125,7 +111,15 @@ func WithRenewalTime(t int) ReceiverOption { } } +// NewReciver creates a new Receiver that will process a number of messages simultaneously. +// Each handler executes in its own goroutine. func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiver { + var r Receiver + return newReceiver(&r, log, cfg, opts...) +} + +// function outlining (look it up). +func newReceiver(r *Receiver, log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiver { var options *azservicebus.ReceiverOptions if cfg.Deadletter { options = &azservicebus.ReceiverOptions{ @@ -134,15 +128,13 @@ func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiv } } - r := Receiver{ - Cfg: cfg, - azClient: NewAZClient(cfg.ConnectionString), - options: options, - handlers: []Handler{}, - } + r.Cfg = cfg + r.azClient = NewAZClient(cfg.ConnectionString) + r.options = options + r.handlers = []Handler{} r.log = log.WithIndex("receiver", r.String()) for _, opt := range opts { - opt(&r) + opt(r) } // Set this to a default that corresponds to the az servicebus default peek-lock timeout @@ -150,7 +142,7 @@ func NewReceiver(log Logger, cfg ReceiverConfig, opts ...ReceiverOption) *Receiv r.Cfg.RenewMessageTime = RenewalTime } - return &r + return r } func (r *Receiver) GetAZClient() AZClient { @@ -232,28 +224,19 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive func (r *Receiver) receiveMessages() error { - if len(r.handlers) != r.Cfg.NumberOfReceivedMessages { - return fmt.Errorf("%s: Number of Handlers %d is not equal to %d", r, len(r.handlers), r.Cfg.NumberOfReceivedMessages) - } - - err := r.Open() - if err != nil { - azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) - r.log.Infof("%s", azerr) - return azerr - } + numberOfReceivedMessages := len(r.handlers) r.log.Debugf( "NumberOfReceivedMessages %d, RenewMessageLock: %v", - r.Cfg.NumberOfReceivedMessages, + numberOfReceivedMessages, r.Cfg.RenewMessageLock, ) // Start all the workers - msgs := make(chan *ReceivedMessage, r.Cfg.NumberOfReceivedMessages) + msgs := make(chan *ReceivedMessage, numberOfReceivedMessages) ctx, cancel := context.WithCancel(context.Background()) defer cancel() var wg sync.WaitGroup - for i := 0; i < r.Cfg.NumberOfReceivedMessages; i++ { + for i := range numberOfReceivedMessages { go func(rctx context.Context, ii int, rr *Receiver) { rr.log.Debugf("Start worker %d", ii) for { @@ -289,7 +272,7 @@ func (r *Receiver) receiveMessages() error { for { var err error var messages []*ReceivedMessage - messages, err = r.receiver.ReceiveMessages(ctx, r.Cfg.NumberOfReceivedMessages, nil) + messages, err = r.receiver.ReceiveMessages(ctx, numberOfReceivedMessages, nil) if err != nil { azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) r.log.Infof("%s", azerr) @@ -298,7 +281,7 @@ func (r *Receiver) receiveMessages() error { total := len(messages) r.log.Debugf("total messages %d", total) - for i := 0; i < total; i++ { + for i := range total { wg.Add(1) msgs <- messages[i] } @@ -309,24 +292,28 @@ func (r *Receiver) receiveMessages() error { // The following 2 methods satisfy the startup.Listener interface. func (r *Receiver) Listen() error { - if r.handlers != nil && len(r.handlers) > 0 { - return r.receiveMessages() + r.log.Debugf("listen") + err := r.open() + if err != nil { + azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + return azerr } - return ErrNoHandler + return r.receiveMessages() } func (r *Receiver) Shutdown(ctx context.Context) error { - r.Close(ctx) + r.close_() return nil } -func (r *Receiver) Open() error { +func (r *Receiver) open() error { var err error if r.receiver != nil { return nil } - r.log.Debugf("Open") + client, err := r.azClient.azClient() if err != nil { return err @@ -345,21 +332,31 @@ func (r *Receiver) Open() error { } r.receiver = receiver + for j := range len(r.handlers) { + err = r.handlers[j].Open() + if err != nil { + return fmt.Errorf("failed to open handler: %w", err) + } + } return nil } -func (r *Receiver) Close(ctx context.Context) { +func (r *Receiver) close_() { if r != nil { - r.mtx.Lock() - defer r.mtx.Unlock() - if r.receiver != nil { - err := r.receiver.Close(ctx) + r.mtx.Lock() + defer r.mtx.Unlock() + + err := r.receiver.Close(context.Background()) if err != nil { azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err)) r.log.Infof("%s", azerr) } r.receiver = nil + for j := range len(r.handlers) { + r.handlers[j].Close() + } + r.handlers = []Handler{} } } } diff --git a/azbus/sender.go b/azbus/sender.go index 831345b..95d512d 100644 --- a/azbus/sender.go +++ b/azbus/sender.go @@ -12,15 +12,6 @@ import ( "github.com/datatrails/go-datatrails-common/tracing" ) -// so we dont have to import the azure repo everywhere -type OutMessage = azservicebus.Message - -func NewOutMessage(data []byte) OutMessage { - return azservicebus.Message{ - Body: data, - } -} - // SenderConfig configuration for an azure servicebus namespace and queue type SenderConfig struct { ConnectionString string @@ -109,20 +100,8 @@ func (s *Sender) Open() error { return nil } -type OutMessageOption func(*OutMessage) - -func WithProperty(key string, value any) OutMessageOption { - return func(o *OutMessage) { - o.ApplicationProperties[key] = value - } -} - -func (s *Sender) Send(ctx context.Context, msg []byte, opts ...OutMessageOption) error { - return s.SendMsg(ctx, NewOutMessage(msg), opts...) -} - // Send submits a message to the queue. Ignores cancellation. -func (s *Sender) SendMsg(ctx context.Context, message OutMessage, opts ...OutMessageOption) error { +func (s *Sender) Send(ctx context.Context, message *OutMessage) error { // Without this fix eventsourcepoller and similar services repeatedly context cancel and repeatedly // restart. @@ -141,11 +120,13 @@ func (s *Sender) SendMsg(ctx context.Context, message OutMessage, opts ...OutMes log := s.log.FromContext(ctx) defer log.Close() - err = s.Open() - if err != nil { - return err + // boots & braces + if s.sender == nil { + err = s.Open() + if err != nil { + return err + } } - size := int64(len(message.Body)) log.Debugf("%s: Msg Sized %d limit %d", s, size, s.maxMessageSizeInBytes) if size > s.maxMessageSizeInBytes { @@ -153,15 +134,10 @@ func (s *Sender) SendMsg(ctx context.Context, message OutMessage, opts ...OutMes return fmt.Errorf("%s: Msg Sized %d > limit %d :%w", s, size, s.maxMessageSizeInBytes, ErrMessageOversized) } now := time.Now() - if message.ApplicationProperties == nil { - message.ApplicationProperties = make(map[string]any) - } - for _, opt := range opts { - opt(&message) - } - s.UpdateSendingMesssageForSpan(ctx, &message, span) - err = s.sender.SendMessage(ctx, &message, nil) + s.updateSendingMesssageForSpan(ctx, message, span) + + err = s.sender.SendMessage(ctx, message, nil) if err != nil { azerr := fmt.Errorf("Send failed in %s: %w", time.Since(now), NewAzbusError(err)) log.Infof("%s", azerr) diff --git a/azbus/tracing.go b/azbus/tracing.go index f3e59cf..0a95316 100644 --- a/azbus/tracing.go +++ b/azbus/tracing.go @@ -8,39 +8,44 @@ import ( func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (Disposition, context.Context, error) { log := r.log.FromContext(ctx) + defer log.Close() log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", message.ApplicationProperties) var opts = []opentracing.StartSpanOption{} carrier := opentracing.TextMapCarrier{} for k, v := range message.ApplicationProperties { + // XXX: why only string values? value, ok := v.(string) if ok { carrier.Set(k, value) + } else { + log.Debugf("Non-string value is not copied %s:%v", k, v) } } spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier) if err != nil { - log.Infof("HandleReceivedMessageWithTracingContext(): Unable to extract span context: %v", err) + log.Infof("handleReceivedMessageWithTracingContext(): Unable to extract span context: %v", err) } else { opts = append(opts, opentracing.ChildOf(spanCtx)) } - span := opentracing.StartSpan("Handle message", opts...) + span := opentracing.StartSpan("handle message", opts...) defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) return handler.Handle(ctx, message) } -func (s *Sender) UpdateSendingMesssageForSpan(ctx context.Context, message *OutMessage, span opentracing.Span) { +func (s *Sender) updateSendingMesssageForSpan(ctx context.Context, message *OutMessage, span opentracing.Span) { log := s.log.FromContext(ctx) + defer log.Close() carrier := opentracing.TextMapCarrier{} err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, carrier) if err != nil { - log.Infof("UpdateSendingMesssageForSpan(): Unable to inject span context: %v", err) + log.Infof("updateSendingMesssageForSpan(): Unable to inject span context: %v", err) return } for k, v := range carrier { - message.ApplicationProperties[k] = v + OutMessageSetProperty(message, k, v) } - log.Debugf("UpdateSendingMesssageForSpan(): ApplicationProperties %v", message.ApplicationProperties) + log.Debugf("updateSendingMesssageForSpan(): ApplicationProperties %v", OutMessageProperties(message)) } diff --git a/metrics/metrics.go b/metrics/metrics.go index db45d63..68e3403 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -128,9 +128,6 @@ func (m *Metrics) Port() string { return "" } -// The following code is only for restproxy endpoints and allows the propagation of -// the tenant Id from the underlying GRPC service. - // NewPromHandler - this handler is used on the endpoint that serves metrics endpoint // which is provided on a different port to the service. // The default InstrumentMetricHandler is suppressed. From 397d20d540c5412531c90a53707ebef2601d7763 Mon Sep 17 00:00:00 2001 From: Paul Hewlett Date: Fri, 22 Mar 2024 10:41:19 +0000 Subject: [PATCH 2/2] fixup! azbus handlers --- azbus/tracing.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/azbus/tracing.go b/azbus/tracing.go index 0a95316..5869671 100644 --- a/azbus/tracing.go +++ b/azbus/tracing.go @@ -13,13 +13,15 @@ func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", message.ApplicationProperties) var opts = []opentracing.StartSpanOption{} carrier := opentracing.TextMapCarrier{} + // This just gets all the message Application Properties into a string map. That map is then passed into the + // open tracing constructor which extracts any bits it is interested in to use to setup the spans etc. + // It will ignore anything it doesn't care about. So the filtering of the map is done for us and + // we don't need to pre-filter it. for k, v := range message.ApplicationProperties { // XXX: why only string values? value, ok := v.(string) if ok { carrier.Set(k, value) - } else { - log.Debugf("Non-string value is not copied %s:%v", k, v) } } spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier)