diff --git a/pkg/jobcreator/controller.go b/pkg/jobcreator/controller.go index a8fc8ed3..6ea21dd2 100644 --- a/pkg/jobcreator/controller.go +++ b/pkg/jobcreator/controller.go @@ -182,7 +182,7 @@ func (controller *JobCreatorController) Start(ctx context.Context, cm *system.Cl errorChan <- err return errorChan } - err = controller.web3Events.Start(controller.web3SDK, ctx, cm) + err = controller.web3Events.Start(ctx, cm, controller.web3SDK) if err != nil { errorChan <- err return errorChan diff --git a/pkg/jobcreator/onchain_jobcreator.go b/pkg/jobcreator/onchain_jobcreator.go index bcfd957f..476cea67 100644 --- a/pkg/jobcreator/onchain_jobcreator.go +++ b/pkg/jobcreator/onchain_jobcreator.go @@ -57,7 +57,7 @@ func (jobCreator *OnChainJobCreator) Start(ctx context.Context, cm *system.Clean return errorChan } - err = jobCreator.web3Events.Start(jobCreator.web3SDK, ctx, cm) + err = jobCreator.web3Events.Start(ctx, cm, jobCreator.web3SDK) if err != nil { errorChan <- err return errorChan diff --git a/pkg/mediator/controller.go b/pkg/mediator/controller.go index 7c201bb1..670ec5f7 100644 --- a/pkg/mediator/controller.go +++ b/pkg/mediator/controller.go @@ -138,7 +138,7 @@ func (controller *MediatorController) Start(ctx context.Context, cm *system.Clea return errorChan } // activate the web3 event listeners - err = controller.web3Events.Start(controller.web3SDK, ctx, cm) + err = controller.web3Events.Start(ctx, cm, controller.web3SDK) if err != nil { errorChan <- err return errorChan diff --git a/pkg/resourceprovider/controller.go b/pkg/resourceprovider/controller.go index 2bfe8b59..b964f9d1 100644 --- a/pkg/resourceprovider/controller.go +++ b/pkg/resourceprovider/controller.go @@ -152,7 +152,7 @@ func (controller *ResourceProviderController) Start(ctx context.Context, cm *sys errorChan <- err return errorChan } - err = controller.web3Events.Start(controller.web3SDK, ctx, cm) + err = controller.web3Events.Start(ctx, cm, controller.web3SDK) if err != nil { errorChan <- err return errorChan diff --git a/pkg/solver/controller.go b/pkg/solver/controller.go index 55376ac5..ee0bb84e 100644 --- a/pkg/solver/controller.go +++ b/pkg/solver/controller.go @@ -93,7 +93,7 @@ func (controller *SolverController) Start(ctx context.Context, cm *system.Cleanu // activate the web3 event listeners log.Debug().Msgf("controller.web3Events.Start") - err = controller.web3Events.Start(controller.web3SDK, ctx, cm) + err = controller.web3Events.Start(ctx, cm, controller.web3SDK) if err != nil { errorChan <- err return errorChan diff --git a/pkg/web3/events.go b/pkg/web3/events.go index 6a5025ff..e8e8ff69 100644 --- a/pkg/web3/events.go +++ b/pkg/web3/events.go @@ -45,15 +45,15 @@ func NewEventChannels() *EventChannels { } func (eventChannels *EventChannels) Start( - sdk *Web3SDK, ctx context.Context, cm *system.CleanupManager, + sdk *Web3SDK, ) error { for _, collection := range eventChannels.collections { c := collection go func() { for { - err := c.Start(sdk, ctx, cm) + err := c.Start(ctx, cm, sdk) if err != nil { log.Error().Msgf("error starting listeners: %s reconnect in 2 seconds", err.Error()) } diff --git a/pkg/web3/events_jobcreator.go b/pkg/web3/events_jobcreator.go index d6bfa214..1b2b220e 100644 --- a/pkg/web3/events_jobcreator.go +++ b/pkg/web3/events_jobcreator.go @@ -24,9 +24,9 @@ func NewJobCreatorEventChannels() *JobCreatorEventChannels { } func (s *JobCreatorEventChannels) Start( - sdk *Web3SDK, ctx context.Context, cm *system.CleanupManager, + sdk *Web3SDK, ) error { blockNumber, err := sdk.getBlockNumber() if err != nil { @@ -54,7 +54,7 @@ func (s *JobCreatorEventChannels) Start( for { select { case <-ctx.Done(): - return fmt.Errorf("cancel by context") + return nil case event := <-s.jobAddedChan: log.Debug(). Str("storage->event", "JobAdded"). @@ -63,7 +63,10 @@ func (s *JobCreatorEventChannels) Start( go handler(*event) } case err := <-jobAddedSub.Err(): - return fmt.Errorf("cancel by job JobAdded event subscribe error %w", err) + if err != nil { + return fmt.Errorf("cancel by job JobAdded event subscribe error %w", err) + } + return nil } } } diff --git a/pkg/web3/events_mediation.go b/pkg/web3/events_mediation.go index 43b511af..915b16bf 100644 --- a/pkg/web3/events_mediation.go +++ b/pkg/web3/events_mediation.go @@ -24,9 +24,9 @@ func NewMediationEventChannels() *MediationEventChannels { } func (m *MediationEventChannels) Start( - sdk *Web3SDK, ctx context.Context, cm *system.CleanupManager, + sdk *Web3SDK, ) error { blockNumber, err := sdk.getBlockNumber() if err != nil { @@ -54,7 +54,7 @@ func (m *MediationEventChannels) Start( for { select { case <-ctx.Done(): - return fmt.Errorf("cancel by context") + return nil case event := <-m.mediationRequestedChan: log.Debug(). Str("mediation->event", "MediationRequested"). @@ -63,7 +63,10 @@ func (m *MediationEventChannels) Start( go handler(*event) } case err := <-mediationRequestedSub.Err(): - return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err) + if err != nil { + return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err) + } + return nil } } } diff --git a/pkg/web3/events_payments.go b/pkg/web3/events_payments.go index 61b5a6f2..3ee15371 100644 --- a/pkg/web3/events_payments.go +++ b/pkg/web3/events_payments.go @@ -24,9 +24,9 @@ func NewPaymentEventChannels() *PaymentEventChannels { } func (p *PaymentEventChannels) Start( - sdk *Web3SDK, ctx context.Context, cm *system.CleanupManager, + sdk *Web3SDK, ) error { blockNumber, err := sdk.getBlockNumber() if err != nil { @@ -54,7 +54,7 @@ func (p *PaymentEventChannels) Start( for { select { case <-ctx.Done(): - return fmt.Errorf("cancel by context") + return nil case event := <-p.paymentChan: log.Debug(). Str("payments->event", "Payment"). @@ -63,7 +63,10 @@ func (p *PaymentEventChannels) Start( go handler(*event) } case err := <-paymentSub.Err(): - return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err) + if err != nil { + return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err) + } + return nil } } } diff --git a/pkg/web3/events_pow.go b/pkg/web3/events_pow.go index 49e2d6f4..ffe15245 100644 --- a/pkg/web3/events_pow.go +++ b/pkg/web3/events_pow.go @@ -24,9 +24,9 @@ func NewPowEventChannels() *PowEventChannels { } func (s *PowEventChannels) Start( - sdk *Web3SDK, ctx context.Context, cm *system.CleanupManager, + sdk *Web3SDK, ) error { blockNumber, err := sdk.getBlockNumber() if err != nil { @@ -55,7 +55,7 @@ func (s *PowEventChannels) Start( for { select { case <-ctx.Done(): - return fmt.Errorf("cancel by context") + return nil case event := <-s.newPowRoundChan: log.Debug(). Str("pow->event", "PowNewPowRound"). @@ -65,7 +65,10 @@ func (s *PowEventChannels) Start( go handler(*event) } case err := <-newPowRoundSub.Err(): - return fmt.Errorf("cancel by pow newPowRound event subscribe error %w", err) + if err != nil { + return fmt.Errorf("cancel by pow newPowRound event subscribe error %w", err) + } + return nil } } } diff --git a/pkg/web3/events_storage.go b/pkg/web3/events_storage.go index 3525bcaf..7d76212f 100644 --- a/pkg/web3/events_storage.go +++ b/pkg/web3/events_storage.go @@ -24,9 +24,9 @@ func NewStorageEventChannels() *StorageEventChannels { } func (s *StorageEventChannels) Start( - sdk *Web3SDK, ctx context.Context, cm *system.CleanupManager, + sdk *Web3SDK, ) error { blockNumber, err := sdk.getBlockNumber() if err != nil { @@ -54,7 +54,7 @@ func (s *StorageEventChannels) Start( for { select { case <-ctx.Done(): - return fmt.Errorf("cancel by context") + return nil case event := <-s.dealStateChangeChan: log.Debug(). Str("storage->event", "DealStateChange"). @@ -63,7 +63,10 @@ func (s *StorageEventChannels) Start( go handler(*event) } case err := <-dealStateChangeSub.Err(): - return fmt.Errorf("cancel by storage DealStateChange event subscribe error %w", err) + if err != nil { + return fmt.Errorf("cancel by storage DealStateChange event subscribe error %w", err) + } + return nil } } } diff --git a/pkg/web3/events_token.go b/pkg/web3/events_token.go index 1250f59e..947ab599 100644 --- a/pkg/web3/events_token.go +++ b/pkg/web3/events_token.go @@ -25,9 +25,9 @@ func NewTokenEventChannels() *TokenEventChannels { } func (t *TokenEventChannels) Start( - sdk *Web3SDK, ctx context.Context, cm *system.CleanupManager, + sdk *Web3SDK, ) error { blockNumber, err := sdk.getBlockNumber() if err != nil { @@ -57,7 +57,7 @@ func (t *TokenEventChannels) Start( for { select { case <-ctx.Done(): - return fmt.Errorf("cancel by context") + return nil case event := <-t.transferChan: log.Debug(). Str("token->event", "Transfer"). @@ -66,7 +66,10 @@ func (t *TokenEventChannels) Start( go handler(*event) } case err := <-transferSub.Err(): - return fmt.Errorf("cancel by token Transfer event subscribe error %w", err) + if err != nil { + return fmt.Errorf("cancel by token Transfer event subscribe error %w", err) + } + return nil } } } diff --git a/pkg/web3/types.go b/pkg/web3/types.go index f5af6437..f37cdfb2 100644 --- a/pkg/web3/types.go +++ b/pkg/web3/types.go @@ -29,8 +29,8 @@ type Web3Options struct { type EventChannelCollection interface { Start( - sdk *Web3SDK, ctx context.Context, cm *system.CleanupManager, + sdk *Web3SDK, ) error }