Skip to content

Commit

Permalink
Merge pull request #12594 from smartcontractkit/cherrypick-jfpc-cache…
Browse files Browse the repository at this point in the history
…-logging-changes

BCF-3121 Cherrypick jfpc cache logging changes
  • Loading branch information
ilija42 authored Mar 26, 2024
2 parents 1eda459 + 0f67d0b commit f588207
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .changeset/chilly-garlics-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Fix error log formatting for in memory data source cache for juels fee per coin
5 changes: 5 additions & 0 deletions .changeset/shaggy-pots-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Add error log if juels fee per coin cache is over 24h old and lower other logs severity in cache to warn
30 changes: 21 additions & 9 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l
}

const defaultCacheFreshness = time.Minute * 5
const defaultCacheFreshnessAlert = time.Hour * 24
const dataSourceCacheKey = "dscache"

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) {
Expand Down Expand Up @@ -158,7 +159,7 @@ func (ds *inMemoryDataSource) currentAnswer() (*big.Int, *big.Int) {
func (ds *inMemoryDataSource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
md, err := bridges.MarshalBridgeMetaData(ds.currentAnswer())
if err != nil {
ds.lggr.Warnw("unable to attach metadata for run", "err", err)
ds.lggr.Warnf("unable to attach metadata for run, err: %v", err)
}

vars := pipeline.NewVarsFrom(map[string]interface{}{
Expand Down Expand Up @@ -236,12 +237,17 @@ func (ds *inMemoryDataSourceCache) updater() {
for ; true; <-ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache", "err", err)
ds.lggr.Warnf("failed to update cache, err: %v", err)
}
cancel()
}
}

type ResultTimePair struct {
Result serializablebig.Big `json:"result"`
Time time.Time `json:"time"`
}

func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()
Expand All @@ -257,7 +263,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.latestUpdateErr = latestUpdateErr
// raise log severity
if previousUpdateErr != nil {
ds.lggr.Errorf("consecutive cache updates errored: previous err: %w new err: %w", previousUpdateErr, ds.latestUpdateErr)
ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr)
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)
}
Expand All @@ -270,8 +276,8 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
}

// backup in case data source fails continuously and node gets rebooted
if err = ds.kvStore.Store(dataSourceCacheKey, serializablebig.New(value)); err != nil {
ds.lggr.Errorf("failed to persist latest task run value", err)
if err = ds.kvStore.Store(dataSourceCacheKey, &ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}); err != nil {
ds.lggr.Errorf("failed to persist latest task run value, err: %v", err)
}

return nil
Expand All @@ -287,7 +293,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
ds.mu.RUnlock()

if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache, returning stale result now", "err", err)
ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err)
}

ds.mu.RLock()
Expand All @@ -296,11 +302,17 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
}

func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) {
var val serializablebig.Big
var resTime ResultTimePair
latestResult, latestTrrs := ds.get(ctx)
if latestTrrs == nil {
ds.lggr.Errorf("cache is empty, returning persisted value now")
return val.ToInt(), ds.kvStore.Get(dataSourceCacheKey, &val)
ds.lggr.Warnf("cache is empty, returning persisted value now")
if err := ds.kvStore.Get(dataSourceCacheKey, &resTime); err != nil {
return nil, err
}
if time.Since(resTime.Time) >= defaultCacheFreshnessAlert {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}

setEATelemetry(ds.inMemoryDataSource, latestResult, latestTrrs, ObservationTimestamp{
Expand Down
12 changes: 6 additions & 6 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))
mockKVStore := mocks.KVStore{}
mockKVStore.On("Store", mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
require.NoError(t, err)

Expand All @@ -98,14 +98,14 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
})

t.Run("test total updater fail with persisted value recovery", func(t *testing.T) {
persistedVal := big.NewInt(1337)
runner := pipelinemocks.NewRunner(t)
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*serializablebig.Big)
arg.ToInt().Set(persistedVal)
persistedVal := serializablebig.NewI(1337)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*ocrcommon.ResultTimePair)
arg.Result = *persistedVal
})

// set updater to a long time so that it doesn't log errors after the test is done
Expand All @@ -125,7 +125,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Return(assert.AnError)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Return(assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
Expand Down

0 comments on commit f588207

Please sign in to comment.