Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job-distributor): add exp. backoff retry to feeds.SyncNodeInfo() #15752

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/neat-penguins-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added add exponential backoff retry to feeds.SyncNodeInfo()
102 changes: 83 additions & 19 deletions core/services/feeds/service.go
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unit tests were skipped in this draft PR as I want to get some feedback about the approach before finishing the PR

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"sync"
"time"

"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/lib/pq"
Expand Down Expand Up @@ -141,6 +144,7 @@ type service struct {
lggr logger.Logger
version string
loopRegistrarConfig plugins.RegistrarConfig
syncNodeInfoCancel AtomicCancelFunc
}

// NewService constructs a new feeds service
Expand Down Expand Up @@ -183,6 +187,7 @@ func NewService(
lggr: lggr,
version: version,
loopRegistrarConfig: rc,
syncNodeInfoCancel: AtomicCancelFunc{fn: func() {}},
}

return svc
Expand Down Expand Up @@ -254,8 +259,42 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar
return id, nil
}

// SyncNodeInfo syncs the node's information with FMS
// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine.
// In case of failures, it retries with an exponential backoff for up to 24h.
func (s *service) syncNodeInfoWithRetry(id int64) {
ctx, cancel := context.WithCancel(context.Background())

// cancel the previous context -- and, by extension, the existing goroutine --
// so that we can start anew
s.syncNodeInfoCancel.CallAndSwap(cancel)

retryOpts := []retry.Option{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we didn use retry.BackOffDelay ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood from the docs that it's the default. But I can make it explicit if you think it's worth it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all good, i was just curious, happy either way.

retry.Context(ctx),
retry.DelayType(retry.BackOffDelay),
retry.Delay(10 * time.Second),
retry.MaxDelay(30 * time.Minute),
retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries
graham-chainlink marked this conversation as resolved.
Show resolved Hide resolved
retry.OnRetry(func(attempt uint, err error) {
s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err)
}),
}

go func() {
err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...)
if err != nil {
s.lggr.Errorw("failed to sync node info; aborting", "err", err)
} else {
s.lggr.Info("successfully synced node info")
}

s.syncNodeInfoCancel.CallAndSwap(func(){})
}()
}

func (s *service) SyncNodeInfo(ctx context.Context, id int64) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

// Get the FMS RPC client
fmsClient, err := s.connMgr.GetClient(id)
if err != nil {
Expand Down Expand Up @@ -401,9 +440,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64
return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager")
}

if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)

return id, nil
}
Expand All @@ -425,9 +462,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error
return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager")
}

if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)

return id, nil
}
Expand Down Expand Up @@ -466,9 +501,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64
return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config")
}

if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(ccfg.FeedsManagerID)

return id, nil
}
Expand Down Expand Up @@ -1020,9 +1053,7 @@ func (s *service) CancelSpec(ctx context.Context, id int64) error {
)

err = s.transact(ctx, func(tx datasources) error {
var (
txerr error
)
var txerr error

if txerr = tx.orm.CancelSpec(ctx, id); txerr != nil {
return txerr
Expand Down Expand Up @@ -1145,6 +1176,8 @@ func (s *service) Close() error {
// This blocks until it finishes
s.connMgr.Close()

s.syncNodeInfoCancel.CallAndSwap(func(){})

return nil
})
}
Expand All @@ -1162,10 +1195,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv
},
OnConnect: func(pb.FeedsManagerClient) {
// Sync the node's information with FMS once connected
err := s.SyncNodeInfo(ctx, mgr.ID)
if err != nil {
s.lggr.Infof("Error syncing node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)
},
})
}
Expand Down Expand Up @@ -1209,8 +1239,10 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error {
metrics := counts.toMetrics()

// Set the prometheus gauge metrics.
for _, status := range []JobProposalStatus{JobProposalStatusPending, JobProposalStatusApproved,
JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked} {
for _, status := range []JobProposalStatus{
JobProposalStatusPending, JobProposalStatusApproved,
JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked,
} {
status := status

promJobProposalCounts.With(prometheus.Labels{"status": string(status)}).Set(metrics[status])
Expand Down Expand Up @@ -1550,6 +1582,18 @@ func (s *service) isRevokable(propStatus JobProposalStatus, specStatus SpecStatu
return propStatus != JobProposalStatusDeleted && (specStatus == SpecStatusPending || specStatus == SpecStatusCancelled)
}

type AtomicCancelFunc struct {
fn context.CancelFunc
mutex sync.Mutex
}

func (f *AtomicCancelFunc) CallAndSwap(other func()) {
f.mutex.Lock()
defer f.mutex.Unlock()
f.fn()
f.fn = other
}

var _ Service = &NullService{}

// NullService defines an implementation of the Feeds Service that is used
Expand All @@ -1562,75 +1606,95 @@ func (ns NullService) Close() error { return nil }
func (ns NullService) ApproveSpec(ctx context.Context, id int64, force bool) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) CountJobProposalsByStatus(ctx context.Context) (*JobProposalCounts, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) CancelSpec(ctx context.Context, id int64) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ListSpecsByJobProposalIDs(ctx context.Context, ids []int64) ([]JobProposalSpec, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) GetManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) GetSpec(ctx context.Context, id int64) (*JobProposalSpec, error) {
return nil, ErrFeedsManagerDisabled
}
func (ns NullService) ListManagers(ctx context.Context) ([]FeedsManager, error) { return nil, nil }
func (ns NullService) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) GetChainConfig(ctx context.Context, id int64) (*ChainConfig, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) DeleteChainConfig(ctx context.Context, id int64) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) ListChainConfigsByManagerIDs(ctx context.Context, mgrIDs []int64) ([]ChainConfig, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) {
return 0, ErrFeedsManagerDisabled
}
func (ns NullService) ListJobProposals(ctx context.Context) ([]JobProposal, error) { return nil, nil }
func (ns NullService) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RevokeJob(ctx context.Context, args *RevokeJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RegisterManager(ctx context.Context, params RegisterManagerParams) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RejectSpec(ctx context.Context, id int64) error {
return ErrFeedsManagerDisabled
}
func (ns NullService) SyncNodeInfo(ctx context.Context, id int64) error { return nil }
func (ns NullService) UpdateManager(ctx context.Context, mgr FeedsManager) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) EnableManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) DisableManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) IsJobManaged(ctx context.Context, jobID int64) (bool, error) {
return false, nil
}

func (ns NullService) UpdateSpecDefinition(ctx context.Context, id int64, spec string) error {
return ErrFeedsManagerDisabled
}
Expand Down
Loading