diff --git a/.changeset/neat-penguins-report.md b/.changeset/neat-penguins-report.md new file mode 100644 index 00000000000..053faa00178 --- /dev/null +++ b/.changeset/neat-penguins-report.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#added add exponential backoff retry to feeds.SyncNodeInfo() diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 04ab747c9ab..5edd974a310 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -5,7 +5,10 @@ import ( "database/sql" "encoding/hex" "fmt" + "sync" + "time" + "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/lib/pq" @@ -141,6 +144,7 @@ type service struct { lggr logger.Logger version string loopRegistrarConfig plugins.RegistrarConfig + syncNodeInfoCancel AtomicCancelFunc } // NewService constructs a new feeds service @@ -183,6 +187,7 @@ func NewService( lggr: lggr, version: version, loopRegistrarConfig: rc, + syncNodeInfoCancel: AtomicCancelFunc{fn: func() {}}, } return svc @@ -254,8 +259,42 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar return id, nil } -// SyncNodeInfo syncs the node's information with FMS +// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine. +// In case of failures, it retries with an exponential backoff for up to 24h. +func (s *service) syncNodeInfoWithRetry(id int64) { + ctx, cancel := context.WithCancel(context.Background()) + + // cancel the previous context -- and, by extension, the existing goroutine -- + // so that we can start anew + s.syncNodeInfoCancel.CallAndSwap(cancel) + + retryOpts := []retry.Option{ + retry.Context(ctx), + retry.DelayType(retry.BackOffDelay), + retry.Delay(10 * time.Second), + retry.MaxDelay(30 * time.Minute), + retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries + retry.OnRetry(func(attempt uint, err error) { + s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err) + }), + } + + go func() { + err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...) + if err != nil { + s.lggr.Errorw("failed to sync node info; aborting", "err", err) + } else { + s.lggr.Info("successfully synced node info") + } + + s.syncNodeInfoCancel.CallAndSwap(func(){}) + }() +} + func (s *service) SyncNodeInfo(ctx context.Context, id int64) error { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + // Get the FMS RPC client fmsClient, err := s.connMgr.GetClient(id) if err != nil { @@ -401,9 +440,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager") } - if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) return id, nil } @@ -425,9 +462,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager") } - if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) return id, nil } @@ -466,9 +501,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config") } - if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(ccfg.FeedsManagerID) return id, nil } @@ -1020,9 +1053,7 @@ func (s *service) CancelSpec(ctx context.Context, id int64) error { ) err = s.transact(ctx, func(tx datasources) error { - var ( - txerr error - ) + var txerr error if txerr = tx.orm.CancelSpec(ctx, id); txerr != nil { return txerr @@ -1145,6 +1176,8 @@ func (s *service) Close() error { // This blocks until it finishes s.connMgr.Close() + s.syncNodeInfoCancel.CallAndSwap(func(){}) + return nil }) } @@ -1162,10 +1195,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv }, OnConnect: func(pb.FeedsManagerClient) { // Sync the node's information with FMS once connected - err := s.SyncNodeInfo(ctx, mgr.ID) - if err != nil { - s.lggr.Infof("Error syncing node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) }, }) } @@ -1209,8 +1239,10 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error { metrics := counts.toMetrics() // Set the prometheus gauge metrics. - for _, status := range []JobProposalStatus{JobProposalStatusPending, JobProposalStatusApproved, - JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked} { + for _, status := range []JobProposalStatus{ + JobProposalStatusPending, JobProposalStatusApproved, + JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked, + } { status := status promJobProposalCounts.With(prometheus.Labels{"status": string(status)}).Set(metrics[status]) @@ -1550,6 +1582,18 @@ func (s *service) isRevokable(propStatus JobProposalStatus, specStatus SpecStatu return propStatus != JobProposalStatusDeleted && (specStatus == SpecStatusPending || specStatus == SpecStatusCancelled) } +type AtomicCancelFunc struct { + fn context.CancelFunc + mutex sync.Mutex +} + +func (f *AtomicCancelFunc) CallAndSwap(other func()) { + f.mutex.Lock() + defer f.mutex.Unlock() + f.fn() + f.fn = other +} + var _ Service = &NullService{} // NullService defines an implementation of the Feeds Service that is used @@ -1562,24 +1606,31 @@ func (ns NullService) Close() error { return nil } func (ns NullService) ApproveSpec(ctx context.Context, id int64, force bool) error { return ErrFeedsManagerDisabled } + func (ns NullService) CountJobProposalsByStatus(ctx context.Context) (*JobProposalCounts, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) CancelSpec(ctx context.Context, id int64) error { return ErrFeedsManagerDisabled } + func (ns NullService) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) ListSpecsByJobProposalIDs(ctx context.Context, ids []int64) ([]JobProposalSpec, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) GetManager(ctx context.Context, id int64) (*FeedsManager, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsManager, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) GetSpec(ctx context.Context, id int64) (*JobProposalSpec, error) { return nil, ErrFeedsManagerDisabled } @@ -1587,15 +1638,19 @@ func (ns NullService) ListManagers(ctx context.Context) ([]FeedsManager, error) func (ns NullService) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) GetChainConfig(ctx context.Context, id int64) (*ChainConfig, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) DeleteChainConfig(ctx context.Context, id int64) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) ListChainConfigsByManagerIDs(ctx context.Context, mgrIDs []int64) ([]ChainConfig, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) { return 0, ErrFeedsManagerDisabled } @@ -1603,18 +1658,23 @@ func (ns NullService) ListJobProposals(ctx context.Context) ([]JobProposal, erro func (ns NullService) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) RevokeJob(ctx context.Context, args *RevokeJobArgs) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) RegisterManager(ctx context.Context, params RegisterManagerParams) (int64, error) { return 0, ErrFeedsManagerDisabled } + func (ns NullService) RejectSpec(ctx context.Context, id int64) error { return ErrFeedsManagerDisabled } @@ -1622,15 +1682,19 @@ func (ns NullService) SyncNodeInfo(ctx context.Context, id int64) error { return func (ns NullService) UpdateManager(ctx context.Context, mgr FeedsManager) error { return ErrFeedsManagerDisabled } + func (ns NullService) EnableManager(ctx context.Context, id int64) (*FeedsManager, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) DisableManager(ctx context.Context, id int64) (*FeedsManager, error) { return nil, ErrFeedsManagerDisabled } + func (ns NullService) IsJobManaged(ctx context.Context, jobID int64) (bool, error) { return false, nil } + func (ns NullService) UpdateSpecDefinition(ctx context.Context, id int64, spec string) error { return ErrFeedsManagerDisabled }