From a1a4281720083fd0b90c4acd04da20ed93166f93 Mon Sep 17 00:00:00 2001 From: Gustavo Gama Date: Fri, 20 Dec 2024 01:33:00 -0300 Subject: [PATCH] feat(job-distributor): add exp. backoff retry to feeds.SyncNodeInfo() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There’s a behavior that we’ve observed for some time on the NOP side where they will add/update a chain configuration of the Job Distributor panel but the change is not reflected on the service itself. This leads to inefficiencies as NOPs are unaware of this and thus need to be notified so that they may "reapply" the configuration. After some investigation, we suspect that this is due to connectivity issues between the nodes and the job distributor instance, which causes the message with the update to be lost. This PR attempts to solve this by adding a "retry" wrapper on top of the existing `SyncNodeInfo` method. We rely on `avast/retry-go` to implement the bulk of the retry logic. It's configured with a minimal delay of 10 seconds, maximum delay of 30 minutes and retry a total of 56 times -- which adds up to a bit more than 24 hours. Ticket Number: DPA-1371 --- core/services/feeds/service.go | 62 ++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 04ab747c9ab..79bb8102177 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -5,7 +5,9 @@ import ( "database/sql" "encoding/hex" "fmt" + "time" + "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/lib/pq" @@ -141,6 +143,7 @@ type service struct { lggr logger.Logger version string loopRegistrarConfig plugins.RegistrarConfig + syncNodeInfoCancel context.CancelFunc } // NewService constructs a new feeds service @@ -183,6 +186,7 @@ func NewService( lggr: lggr, version: version, loopRegistrarConfig: rc, + syncNodeInfoCancel: func() {}, } return svc @@ -254,8 +258,45 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar return id, nil } -// SyncNodeInfo syncs the node's information with FMS +// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine. +// In case of failures, it retries with an exponential backoff for up to 24h. +func (s *service) syncNodeInfoWithRetry(id int64) { + // cancel the previous context -- and, by extension, the existing goroutine -- + // so that we can start anew + s.syncNodeInfoCancel() + + var ctx context.Context + ctx, s.syncNodeInfoCancel = context.WithCancel(context.Background()) + + retryOpts := []retry.Option{ + retry.Context(ctx), + retry.Delay(5 * time.Second), + retry.Delay(10 * time.Second), + retry.MaxDelay(30 * time.Minute), + retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries + retry.LastErrorOnly(true), + retry.OnRetry(func(attempt uint, err error) { + s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err) + }), + } + + go func() { + err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...) + if err != nil { + s.lggr.Errorw("failed to sync node info; aborting", "err", err) + } else { + s.lggr.Info("successfully synced node info") + } + + s.syncNodeInfoCancel() + s.syncNodeInfoCancel = func() {} + }() +} + func (s *service) SyncNodeInfo(ctx context.Context, id int64) error { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + // Get the FMS RPC client fmsClient, err := s.connMgr.GetClient(id) if err != nil { @@ -401,9 +442,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager") } - if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) return id, nil } @@ -425,9 +464,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager") } - if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) return id, nil } @@ -466,9 +503,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config") } - if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil { - s.lggr.Infof("FMS: Unable to sync node info: %v", err) - } + s.syncNodeInfoWithRetry(ccfg.FeedsManagerID) return id, nil } @@ -1145,6 +1180,8 @@ func (s *service) Close() error { // This blocks until it finishes s.connMgr.Close() + s.syncNodeInfoCancel() + return nil }) } @@ -1162,10 +1199,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv }, OnConnect: func(pb.FeedsManagerClient) { // Sync the node's information with FMS once connected - err := s.SyncNodeInfo(ctx, mgr.ID) - if err != nil { - s.lggr.Infof("Error syncing node info: %v", err) - } + s.syncNodeInfoWithRetry(mgr.ID) }, }) }