Skip to content

Commit

Permalink
feat(job-distributor): periodically sync node info with job distributors
Browse files Browse the repository at this point in the history
There’s a behavior that we’ve observed for some time on the NOP side
where they will add/update a chain configuration of the Job
Distributor panel but the change is not reflected on the service itself.
This leads to inefficiencies as NOPs are unaware of this and thus need
to be notified so that they may "reapply" the configuration.

After some investigation, we suspect that this is due to connectivity
issues between the nodes and the job distributor instance, which causes
the message with the update to be lost.

This PR attempts to solve this by adding a "retry" wrapper on top of the
existing `SyncNodeInfo` method. We rely on `avast/retry-go` to implement
the bulk of the retry logic. It's configured with a minimal delay of
10 seconds, maximum delay of 30 minutes and retry a total of 56 times --
which adds up to a bit more than 24 hours.

Ticket Number: DPA-1371
  • Loading branch information
gustavogama-cll committed Dec 20, 2024
1 parent 1ce7214 commit 57b55cc
Show file tree
Hide file tree
Showing 26 changed files with 149 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .changeset/neat-penguins-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added periodically sync node info with job distributors
1 change: 1 addition & 0 deletions core/config/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type AppConfig interface {
AppID() uuid.UUID
RootDir() string
ShutdownGracePeriod() time.Duration
FeedsManagerSyncInterval() time.Duration
InsecureFastScrypt() bool
EVMEnabled() bool
EVMRPCEnabled() bool
Expand Down
2 changes: 2 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ InsecureFastScrypt = false # Default
RootDir = '~/.chainlink' # Default
# ShutdownGracePeriod is the maximum time allowed to shut down gracefully. If exceeded, the node will terminate immediately to avoid being SIGKILLed.
ShutdownGracePeriod = '5s' # Default
# FeedsManagerSyncInterval is the interval between calls to the feeds manager instance to synchronize the chain config
FeedsManagerSyncInterval = '1h' # Default

[Feature]
# FeedsManager enables the feeds manager service.
Expand Down
18 changes: 12 additions & 6 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ var ErrUnsupported = errors.New("unsupported with config v2")
// Core holds the core configuration. See chainlink.Config for more information.
type Core struct {
// General/misc
AppID uuid.UUID `toml:"-"` // random or test
InsecureFastScrypt *bool
RootDir *string
ShutdownGracePeriod *commonconfig.Duration
AppID uuid.UUID `toml:"-"` // random or test
InsecureFastScrypt *bool
RootDir *string
ShutdownGracePeriod *commonconfig.Duration
FeedsManagerSyncInterval *commonconfig.Duration

Feature Feature `toml:",omitempty"`
Database Database `toml:",omitempty"`
Expand Down Expand Up @@ -72,6 +73,9 @@ func (c *Core) SetFrom(f *Core) {
if v := f.ShutdownGracePeriod; v != nil {
c.ShutdownGracePeriod = v
}
if v := f.FeedsManagerSyncInterval; v != nil {
c.FeedsManagerSyncInterval = v
}

c.Feature.setFrom(&f.Feature)
c.Database.setFrom(&f.Database)
Expand Down Expand Up @@ -410,8 +414,10 @@ func (l *DatabaseLock) Mode() string {

func (l *DatabaseLock) ValidateConfig() (err error) {
if l.LeaseRefreshInterval.Duration() > l.LeaseDuration.Duration()/2 {
err = multierr.Append(err, configutils.ErrInvalid{Name: "LeaseRefreshInterval", Value: l.LeaseRefreshInterval.String(),
Msg: fmt.Sprintf("must be less than or equal to half of LeaseDuration (%s)", l.LeaseDuration.String())})
err = multierr.Append(err, configutils.ErrInvalid{
Name: "LeaseRefreshInterval", Value: l.LeaseRefreshInterval.String(),
Msg: fmt.Sprintf("must be less than or equal to half of LeaseDuration (%s)", l.LeaseDuration.String()),
})
}
return
}
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ func (g *generalConfig) ShutdownGracePeriod() time.Duration {
return g.c.ShutdownGracePeriod.Duration()
}

func (g *generalConfig) FeedsManagerSyncInterval() time.Duration {
return g.c.FeedsManagerSyncInterval.Duration()
}

func (g *generalConfig) FluxMonitor() config.FluxMonitor {
return &fluxMonitorConfig{c: g.c.FluxMonitor}
}
Expand Down
8 changes: 5 additions & 3 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ func TestConfig_Marshal(t *testing.T) {

global := Config{
Core: toml.Core{
InsecureFastScrypt: ptr(true),
RootDir: ptr("test/root/dir"),
ShutdownGracePeriod: commoncfg.MustNewDuration(10 * time.Second),
InsecureFastScrypt: ptr(true),
RootDir: ptr("test/root/dir"),
ShutdownGracePeriod: commoncfg.MustNewDuration(10 * time.Second),
FeedsManagerSyncInterval: commoncfg.MustNewDuration(15 * time.Minute),
Insecure: toml.Insecure{
DevWebServer: ptr(false),
OCRDevelopmentMode: ptr(false),
Expand Down Expand Up @@ -859,6 +860,7 @@ func TestConfig_Marshal(t *testing.T) {
{"global", global, `InsecureFastScrypt = true
RootDir = 'test/root/dir'
ShutdownGracePeriod = '10s'
FeedsManagerSyncInterval = '15m0s'
[Insecure]
DevWebServer = false
Expand Down
45 changes: 45 additions & 0 deletions core/services/chainlink/mocks/general_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
InsecureFastScrypt = false
RootDir = '~/.chainlink'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
InsecureFastScrypt = true
RootDir = 'test/root/dir'
ShutdownGracePeriod = '10s'
FeedsManagerSyncInterval = '15m0s'

[Feature]
FeedsManager = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
InsecureFastScrypt = false
RootDir = 'my/root/dir'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
1 change: 1 addition & 0 deletions core/services/feeds/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type GeneralConfig interface {
OCR() coreconfig.OCR
Insecure() coreconfig.Insecure
FeedsManagerSyncInterval() time.Duration
}

type FeatureConfig interface {
Expand Down
65 changes: 51 additions & 14 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"time"

"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/lib/pq"
Expand Down Expand Up @@ -141,6 +143,7 @@ type service struct {
lggr logger.Logger
version string
loopRegistrarConfig plugins.RegistrarConfig
syncNodeInfoCancel context.CancelFunc
}

// NewService constructs a new feeds service
Expand Down Expand Up @@ -183,6 +186,7 @@ func NewService(
lggr: lggr,
version: version,
loopRegistrarConfig: rc,
syncNodeInfoCancel: func() {},
}

return svc
Expand Down Expand Up @@ -254,8 +258,48 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar
return id, nil
}

// SyncNodeInfo syncs the node's information with FMS
// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine.
// In case of failures, it retries with an exponential backoff for up to 24h.
func (s *service) syncNodeInfoWithRetry(id int64) {
s.lggr.Info("syncNodeInfoWithRetry | start")

// cancel the previous context -- and, by extension, the existing goroutine --
// so that we can start over
s.lggr.Info("new sync node info requested; cancelling current retry")
s.syncNodeInfoCancel()

var ctx context.Context
ctx, s.syncNodeInfoCancel = context.WithCancel(context.Background())

retryOpts := []retry.Option{
retry.Context(ctx),
retry.Delay(5 * time.Second),
retry.Delay(10 * time.Second),
retry.MaxDelay(30 * time.Minute),
retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries
retry.LastErrorOnly(true),
retry.OnRetry(func(attempt uint, err error) {
s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err)
}),
}

go func() {
err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...)
if err != nil {
s.lggr.Errorw("failed to sync node info; aborting", "err", err)
} else {
s.lggr.Info("successfully synced node info")
}

s.syncNodeInfoCancel()
s.syncNodeInfoCancel = func() {}
}()
}

func (s *service) SyncNodeInfo(ctx context.Context, id int64) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

// Get the FMS RPC client
fmsClient, err := s.connMgr.GetClient(id)
if err != nil {
Expand Down Expand Up @@ -401,9 +445,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64
return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager")
}

if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)

return id, nil
}
Expand All @@ -425,9 +467,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error
return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager")
}

if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)

return id, nil
}
Expand Down Expand Up @@ -466,9 +506,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64
return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config")
}

if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(ccfg.FeedsManagerID)

return id, nil
}
Expand Down Expand Up @@ -1145,6 +1183,8 @@ func (s *service) Close() error {
// This blocks until it finishes
s.connMgr.Close()

s.syncNodeInfoCancel()

return nil
})
}
Expand All @@ -1162,10 +1202,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv
},
OnConnect: func(pb.FeedsManagerClient) {
// Sync the node's information with FMS once connected
err := s.SyncNodeInfo(ctx, mgr.ID)
if err != nil {
s.lggr.Infof("Error syncing node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)
},
})
}
Expand Down
1 change: 1 addition & 0 deletions core/web/resolver/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
InsecureFastScrypt = false
RootDir = '~/.chainlink'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
1 change: 1 addition & 0 deletions core/web/resolver/testdata/config-full.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
InsecureFastScrypt = true
RootDir = 'test/root/dir'
ShutdownGracePeriod = '10s'
FeedsManagerSyncInterval = '15m0s'

[Feature]
FeedsManager = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
InsecureFastScrypt = false
RootDir = 'my/root/dir'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
7 changes: 7 additions & 0 deletions docs/CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ HTTPURL = 'https://foo.bar' # Required
InsecureFastScrypt = false # Default
RootDir = '~/.chainlink' # Default
ShutdownGracePeriod = '5s' # Default
FeedsManagerSyncInterval = '1h' # Default
```


Expand All @@ -45,6 +46,12 @@ ShutdownGracePeriod = '5s' # Default
```
ShutdownGracePeriod is the maximum time allowed to shut down gracefully. If exceeded, the node will terminate immediately to avoid being SIGKILLed.

### FeedsManagerSyncInterval
```toml
FeedsManagerSyncInterval = '1h' # Default
```
FeedsManagerSyncInterval is the interval between calls to the feeds manager instance to synchronize the chain config

## Feature
```toml
[Feature]
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/config/merge_raw_configs.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ Publickey = 'abcdef'
InsecureFastScrypt = false
RootDir = '~/.chainlink'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/default.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ AllowSimplePasswords = false
InsecureFastScrypt = false
RootDir = '~/.chainlink'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/defaults-override.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ HTTPURL = 'https://foo.bar'
InsecureFastScrypt = false
RootDir = '~/.chainlink'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ HTTPURL = 'https://foo.bar'
InsecureFastScrypt = false
RootDir = '~/.chainlink'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ HTTPURL = 'https://foo.bar'
InsecureFastScrypt = false
RootDir = '~/.chainlink'
ShutdownGracePeriod = '5s'
FeedsManagerSyncInterval = '1h0m0s'

[Feature]
FeedsManager = true
Expand Down
Loading

0 comments on commit 57b55cc

Please sign in to comment.