diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 04ab747c9ab..79bb8102177 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -5,7 +5,9 @@ import ( "database/sql" "encoding/hex" "fmt" + "time" + "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/lib/pq" @@ -141,6 +143,7 @@ type service struct { lggr logger.Logger version string loopRegistrarConfig plugins.RegistrarConfig + syncNodeInfoCancel context.CancelFunc } // NewService constructs a new feeds service @@ -183,6 +186,7 @@ func NewService( lggr: lggr, version: version, loopRegistrarConfig: rc, + syncNodeInfoCancel: func() {}, } return svc @@ -254,8 +258,45 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar return id, nil } -// SyncNodeInfo syncs the node's information with FMS +// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine. +// In case of failures, it retries with an exponential backoff for up to 24h. +func (s *service) syncNodeInfoWithRetry(id int64) { + // cancel the previous context -- and, by extension, the existing goroutine -- + // so that we can start anew + s.syncNodeInfoCancel() + + var ctx context.Context + ctx, s.syncNodeInfoCancel = context.WithCancel(context.Background()) + + retryOpts := []retry.Option{ + retry.Context(ctx), + retry.Delay(5 * time.Second), + retry.Delay(10 * time.Second), + retry.MaxDelay(30 * time.Minute), + retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries + retry.LastErrorOnly(true), + retry.OnRetry(func(attempt uint, err error) { + s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err) + }), + } + + go func() { + err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...) + if err != nil { + s.lggr.Errorw("failed to sync node info; aborting", "err", err) + } else { + s.lggr.Info("successfully synced node info") + } + + s.syncNodeInfoCancel() + s.syncNodeInfoCancel = func() {} + }() +} + func (s *service) SyncNodeInfo(ctx context.Context, id int64) error { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + // Get the FMS RPC client fmsClient, err := s.connMgr.GetClient(id) if err != nil { @@ -401,9 +442,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager") } - if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) return id, nil } @@ -425,9 +464,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager") } - if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) return id, nil } @@ -466,9 +503,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config") } - if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(ccfg.FeedsManagerID) return id, nil } @@ -1145,6 +1180,8 @@ func (s *service) Close() error { // This blocks until it finishes s.connMgr.Close() + s.syncNodeInfoCancel() + return nil }) } @@ -1162,10 +1199,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv }, OnConnect: func(pb.FeedsManagerClient) { // Sync the node's information with FMS once connected - err := s.SyncNodeInfo(ctx, mgr.ID) - if err != nil { - s.lggr.Infof("Error syncing node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) }, }) }