From 0f67d0bff33b12e8d3d08c04c930386402ffb42b Mon Sep 17 00:00:00 2001 From: ilija42 <57732589+ilija42@users.noreply.github.com> Date: Tue, 26 Mar 2024 17:35:48 +0100 Subject: [PATCH] Fix in memory data source cache error logging and add err log when cache is over 24h old (#12586) * Fix in memory data source cache error logging * Add stale jfcp cache error log and lower other logs to warn * Update logger warnw to warnf function Co-authored-by: Jordan Krage --------- Co-authored-by: Jordan Krage --- .changeset/chilly-garlics-kneel.md | 5 ++++ .changeset/shaggy-pots-pretend.md | 5 ++++ core/services/ocrcommon/data_source.go | 30 ++++++++++++++------- core/services/ocrcommon/data_source_test.go | 12 ++++----- 4 files changed, 37 insertions(+), 15 deletions(-) create mode 100644 .changeset/chilly-garlics-kneel.md create mode 100644 .changeset/shaggy-pots-pretend.md diff --git a/.changeset/chilly-garlics-kneel.md b/.changeset/chilly-garlics-kneel.md new file mode 100644 index 00000000000..fc8b9425250 --- /dev/null +++ b/.changeset/chilly-garlics-kneel.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Fix error log formatting for in memory data source cache for juels fee per coin diff --git a/.changeset/shaggy-pots-pretend.md b/.changeset/shaggy-pots-pretend.md new file mode 100644 index 00000000000..644986ddb56 --- /dev/null +++ b/.changeset/shaggy-pots-pretend.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Add error log if juels fee per coin cache is over 24h old and lower other logs severity in cache to warn diff --git a/core/services/ocrcommon/data_source.go b/core/services/ocrcommon/data_source.go index 011b8d0644d..f810c8e044d 100644 --- a/core/services/ocrcommon/data_source.go +++ b/core/services/ocrcommon/data_source.go @@ -101,6 +101,7 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l } const defaultCacheFreshness = time.Minute * 5 +const defaultCacheFreshnessAlert = time.Hour * 24 const dataSourceCacheKey = "dscache" func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) { @@ -158,7 +159,7 @@ func (ds *inMemoryDataSource) currentAnswer() (*big.Int, *big.Int) { func (ds *inMemoryDataSource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { md, err := bridges.MarshalBridgeMetaData(ds.currentAnswer()) if err != nil { - ds.lggr.Warnw("unable to attach metadata for run", "err", err) + ds.lggr.Warnf("unable to attach metadata for run, err: %v", err) } vars := pipeline.NewVarsFrom(map[string]interface{}{ @@ -236,12 +237,17 @@ func (ds *inMemoryDataSourceCache) updater() { for ; true; <-ticker.C { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) if err := ds.updateCache(ctx); err != nil { - ds.lggr.Warnf("failed to update cache", "err", err) + ds.lggr.Warnf("failed to update cache, err: %v", err) } cancel() } } +type ResultTimePair struct { + Result serializablebig.Big `json:"result"` + Time time.Time `json:"time"` +} + func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { ds.mu.Lock() defer ds.mu.Unlock() @@ -257,7 +263,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { ds.latestUpdateErr = latestUpdateErr // raise log severity if previousUpdateErr != nil { - ds.lggr.Errorf("consecutive cache updates errored: previous err: %w new err: %w", previousUpdateErr, ds.latestUpdateErr) + ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr) } return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID) } @@ -270,8 +276,8 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { } // backup in case data source fails continuously and node gets rebooted - if err = ds.kvStore.Store(dataSourceCacheKey, serializablebig.New(value)); err != nil { - ds.lggr.Errorf("failed to persist latest task run value", err) + if err = ds.kvStore.Store(dataSourceCacheKey, &ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}); err != nil { + ds.lggr.Errorf("failed to persist latest task run value, err: %v", err) } return nil @@ -287,7 +293,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul ds.mu.RUnlock() if err := ds.updateCache(ctx); err != nil { - ds.lggr.Warnf("failed to update cache, returning stale result now", "err", err) + ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err) } ds.mu.RLock() @@ -296,11 +302,17 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul } func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) { - var val serializablebig.Big + var resTime ResultTimePair latestResult, latestTrrs := ds.get(ctx) if latestTrrs == nil { - ds.lggr.Errorf("cache is empty, returning persisted value now") - return val.ToInt(), ds.kvStore.Get(dataSourceCacheKey, &val) + ds.lggr.Warnf("cache is empty, returning persisted value now") + if err := ds.kvStore.Get(dataSourceCacheKey, &resTime); err != nil { + return nil, err + } + if time.Since(resTime.Time) >= defaultCacheFreshnessAlert { + ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr) + } + return resTime.Result.ToInt(), nil } setEATelemetry(ds.inMemoryDataSource, latestResult, latestTrrs, ObservationTimestamp{ diff --git a/core/services/ocrcommon/data_source_test.go b/core/services/ocrcommon/data_source_test.go index a921bc060ff..2e1b4f63df7 100644 --- a/core/services/ocrcommon/data_source_test.go +++ b/core/services/ocrcommon/data_source_test.go @@ -75,7 +75,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t)) mockKVStore := mocks.KVStore{} mockKVStore.On("Store", mock.Anything, mock.Anything).Return(nil) - mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil) + mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil) dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2) require.NoError(t, err) @@ -98,14 +98,14 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { }) t.Run("test total updater fail with persisted value recovery", func(t *testing.T) { - persistedVal := big.NewInt(1337) runner := pipelinemocks.NewRunner(t) ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t)) mockKVStore := mocks.KVStore{} - mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Run(func(args mock.Arguments) { - arg := args.Get(1).(*serializablebig.Big) - arg.ToInt().Set(persistedVal) + persistedVal := serializablebig.NewI(1337) + mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(1).(*ocrcommon.ResultTimePair) + arg.Result = *persistedVal }) // set updater to a long time so that it doesn't log errors after the test is done @@ -125,7 +125,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t)) mockKVStore := mocks.KVStore{} - mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Return(assert.AnError) + mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Return(assert.AnError) // set updater to a long time so that it doesn't log errors after the test is done dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)