From fb99f1663b14cebe67f84ba6a419ab1b76eb1314 Mon Sep 17 00:00:00 2001 From: Gustavo Gama Date: Tue, 17 Dec 2024 22:56:16 -0300 Subject: [PATCH] feat(job-distributor): periodically sync node info with job distributors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There’s a behavior that we’ve observed for some time on the NOP side where they will add/update a chain configuration of the Job Distributor panel but the change is not reflected on the service itself. This leads to inefficiencies as NOPs are unaware of this and thus need to be notified so that they may "reapply" the configuration. After some investigation, we suspect that this is due to connectivity issues between the nodes and the job distributor instance, which causes the message with the update to be lost. As a fix, Brendon suggested periodically resending the latest chain updates to all connected job distributors. This PR implements this solution by creating a goroutine upon job distributor service start, which sets up a `time.Tick` that calls the `SyncNodeInfo` method according to a parameterized interval. The default interval is set to 1 hour, as suggested by the operations team. Ticket Number: DPA-1371 --- core/config/app_config.go | 1 + core/config/docs/core.toml | 2 ++ core/config/toml/types.go | 18 +++++++++----- core/services/chainlink/config_general.go | 4 ++++ core/services/feeds/config.go | 1 + core/services/feeds/service.go | 29 +++++++++++++++++++++++ 6 files changed, 49 insertions(+), 6 deletions(-) diff --git a/core/config/app_config.go b/core/config/app_config.go index 3f2a5472b24..0a14e898723 100644 --- a/core/config/app_config.go +++ b/core/config/app_config.go @@ -18,6 +18,7 @@ type AppConfig interface { AppID() uuid.UUID RootDir() string ShutdownGracePeriod() time.Duration + FeedsManagerSyncInterval() time.Duration InsecureFastScrypt() bool EVMEnabled() bool EVMRPCEnabled() bool diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 20c519e81a1..dfee55ebac7 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -5,6 +5,8 @@ InsecureFastScrypt = false # Default RootDir = '~/.chainlink' # Default # ShutdownGracePeriod is the maximum time allowed to shut down gracefully. If exceeded, the node will terminate immediately to avoid being SIGKILLed. ShutdownGracePeriod = '5s' # Default +# FeedsManagerSyncInterval is the interval between calls to the feeds manager instance to synchronize the chain config +FeedsManagerSyncInterval = '1h' # Default [Feature] # FeedsManager enables the feeds manager service. diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 620f7d96eee..f1d82f7cff6 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -34,10 +34,11 @@ var ErrUnsupported = errors.New("unsupported with config v2") // Core holds the core configuration. See chainlink.Config for more information. type Core struct { // General/misc - AppID uuid.UUID `toml:"-"` // random or test - InsecureFastScrypt *bool - RootDir *string - ShutdownGracePeriod *commonconfig.Duration + AppID uuid.UUID `toml:"-"` // random or test + InsecureFastScrypt *bool + RootDir *string + ShutdownGracePeriod *commonconfig.Duration + FeedsManagerSyncInterval *commonconfig.Duration Feature Feature `toml:",omitempty"` Database Database `toml:",omitempty"` @@ -72,6 +73,9 @@ func (c *Core) SetFrom(f *Core) { if v := f.ShutdownGracePeriod; v != nil { c.ShutdownGracePeriod = v } + if v := f.FeedsManagerSyncInterval; v != nil { + c.FeedsManagerSyncInterval = v + } c.Feature.setFrom(&f.Feature) c.Database.setFrom(&f.Database) @@ -410,8 +414,10 @@ func (l *DatabaseLock) Mode() string { func (l *DatabaseLock) ValidateConfig() (err error) { if l.LeaseRefreshInterval.Duration() > l.LeaseDuration.Duration()/2 { - err = multierr.Append(err, configutils.ErrInvalid{Name: "LeaseRefreshInterval", Value: l.LeaseRefreshInterval.String(), - Msg: fmt.Sprintf("must be less than or equal to half of LeaseDuration (%s)", l.LeaseDuration.String())}) + err = multierr.Append(err, configutils.ErrInvalid{ + Name: "LeaseRefreshInterval", Value: l.LeaseRefreshInterval.String(), + Msg: fmt.Sprintf("must be less than or equal to half of LeaseDuration (%s)", l.LeaseDuration.String()), + }) } return } diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go index dd0dc87b59a..4d3670154f8 100644 --- a/core/services/chainlink/config_general.go +++ b/core/services/chainlink/config_general.go @@ -422,6 +422,10 @@ func (g *generalConfig) ShutdownGracePeriod() time.Duration { return g.c.ShutdownGracePeriod.Duration() } +func (g *generalConfig) FeedsManagerSyncInterval() time.Duration { + return g.c.FeedsManagerSyncInterval.Duration() +} + func (g *generalConfig) FluxMonitor() config.FluxMonitor { return &fluxMonitorConfig{c: g.c.FluxMonitor} } diff --git a/core/services/feeds/config.go b/core/services/feeds/config.go index 626dc862e94..a874f6f5658 100644 --- a/core/services/feeds/config.go +++ b/core/services/feeds/config.go @@ -10,6 +10,7 @@ import ( type GeneralConfig interface { OCR() coreconfig.OCR Insecure() coreconfig.Insecure + FeedsManagerSyncInterval() time.Duration } type FeatureConfig interface { diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 04ab747c9ab..b638bb30f61 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/hex" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" @@ -1135,6 +1136,8 @@ func (s *service) Start(ctx context.Context) error { s.lggr.Error("failed to observe job proposal count when starting service", err) } + go s.periodicallySyncNodeInfo(ctx) + return nil }) } @@ -1550,6 +1553,32 @@ func (s *service) isRevokable(propStatus JobProposalStatus, specStatus SpecStatu return propStatus != JobProposalStatusDeleted && (specStatus == SpecStatusPending || specStatus == SpecStatusCancelled) } +func (s *service) periodicallySyncNodeInfo(ctx context.Context) { + s.lggr.Info("starting periodic sync node info goroutine") + + timer := time.NewTicker(s.gCfg.FeedsManagerSyncInterval()) + for { + select { + case <-timer.C: + managers, err := s.ListManagers(ctx) + if err != nil { + s.lggr.Errorw("failed to list managers", "err", err) + } + + for _, manager := range managers { + s.lggr.Infow("synchronizing node info", "managerID", manager.ID) + err := s.SyncNodeInfo(ctx, manager.ID) + if err != nil { + s.lggr.Errorw("failed to sync node info", "id", manager.ID, "err", err) + } + } + case <-ctx.Done(): + s.lggr.Debugw("context done; exiting periodic sync node info goroutine") + return + } + } +} + var _ Service = &NullService{} // NullService defines an implementation of the Feeds Service that is used