diff --git a/.changeset/brown-penguins-grin.md b/.changeset/brown-penguins-grin.md new file mode 100644 index 00000000000..24a06a030fc --- /dev/null +++ b/.changeset/brown-penguins-grin.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix in memory data source cache changes/bug that only allowed pipeline results where none of the data sources failed. #bugfix diff --git a/.changeset/poor-masks-fold.md b/.changeset/poor-masks-fold.md new file mode 100644 index 00000000000..1564aa0791f --- /dev/null +++ b/.changeset/poor-masks-fold.md @@ -0,0 +1,6 @@ +--- +"chainlink": minor +--- + +Move JuelsPerFeeCoinCacheDuration under JuelsPerFeeCoinCache struct in config. Rename JuelsPerFeeCoinCacheDuration to updateInterval. Add stalenessAlertThreshold to JuelsPerFeeCoinCache config. +StalenessAlertThreshold cfg option has a default of 24 hours which means that it doesn't have to be set unless we want to override the duration after which a stale cache should start throwing errors. diff --git a/core/internal/features/ocr2/features_ocr2_test.go b/core/internal/features/ocr2/features_ocr2_test.go index 216ca272b1b..1273608a203 100644 --- a/core/internal/features/ocr2/features_ocr2_test.go +++ b/core/internal/features/ocr2/features_ocr2_test.go @@ -487,7 +487,8 @@ juelsPerFeeCoinSource = """ answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "1m" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "1m" `, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, blockBeforeConfig.Number().Int64(), chainReaderSpec, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil) require.NoError(t, err) err = apps[i].AddJobV2(testutils.Context(t), &ocrJob) @@ -840,7 +841,8 @@ juelsPerFeeCoinSource = """ answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "1m" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "1m" `, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil) require.NoError(t, err) err = apps[i].AddJobV2(testutils.Context(t), &ocrJob) diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 5283b03affe..dcb57e5fd3c 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -124,7 +124,8 @@ ds1_multiply [type=multiply times=1.23]; ds1 -> ds1_parse -> ds1_multiply -> answer1; answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "1m" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "1m" ` const BootstrapTestSpecTemplate = ` type = "bootstrap" @@ -2189,7 +2190,8 @@ ds1_multiply [type=multiply times=1.23]; ds1 -> ds1_parse -> ds1_multiply -> answer1; answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "30s" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "30s" ` defn2 = ` name = 'LINK / ETH | version 3 | contract 0x0000000000000000000000000000000000000000' @@ -2219,7 +2221,8 @@ ds1_multiply [type=multiply times=1.23]; ds1 -> ds1_parse -> ds1_multiply -> answer1; answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "20m" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "20m" ` jp = &feeds.JobProposal{ diff --git a/core/services/ocr2/plugins/median/config/config.go b/core/services/ocr2/plugins/median/config/config.go index 218f7491094..9cfda641552 100644 --- a/core/services/ocr2/plugins/median/config/config.go +++ b/core/services/ocr2/plugins/median/config/config.go @@ -14,9 +14,14 @@ import ( // The PluginConfig struct contains the custom arguments needed for the Median plugin. type PluginConfig struct { - JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"` - JuelsPerFeeCoinCacheDuration models.Interval `json:"juelsPerFeeCoinCacheDuration"` - JuelsPerFeeCoinCacheDisabled bool `json:"juelsPerFeeCoinCacheDisabled"` + JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"` + // JuelsPerFeeCoinCache is disabled when nil + JuelsPerFeeCoinCache *JuelsPerFeeCoinCache `json:"juelsPerFeeCoinCache"` +} + +type JuelsPerFeeCoinCache struct { + UpdateInterval models.Interval `json:"updateInterval"` + StalenessAlertThreshold models.Interval `json:"stalenessAlertThreshold"` } // ValidatePluginConfig validates the arguments for the Median plugin. @@ -25,12 +30,13 @@ func ValidatePluginConfig(config PluginConfig) error { return errors.Wrap(err, "invalid juelsPerFeeCoinSource pipeline") } - // unset duration defaults later - if config.JuelsPerFeeCoinCacheDuration != 0 { - if config.JuelsPerFeeCoinCacheDuration.Duration() < time.Second*30 { - return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is below 30 second minimum", config.JuelsPerFeeCoinCacheDuration.Duration().String()) - } else if config.JuelsPerFeeCoinCacheDuration.Duration() > time.Minute*20 { - return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is above 20 minute maximum", config.JuelsPerFeeCoinCacheDuration.Duration().String()) + // unset durations have a default set late + if config.JuelsPerFeeCoinCache != nil { + updateInterval := config.JuelsPerFeeCoinCache.UpdateInterval.Duration() + if updateInterval != 0 && updateInterval < time.Second*30 { + return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is below 30 second minimum", updateInterval.String()) + } else if updateInterval > time.Minute*20 { + return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is above 20 minute maximum", updateInterval.String()) } } diff --git a/core/services/ocr2/plugins/median/config/config_test.go b/core/services/ocr2/plugins/median/config/config_test.go index e0dcd4f9203..e54b59215bd 100644 --- a/core/services/ocr2/plugins/median/config/config_test.go +++ b/core/services/ocr2/plugins/median/config/config_test.go @@ -32,11 +32,11 @@ func TestValidatePluginConfig(t *testing.T) { t.Run("cache duration validation", func(t *testing.T) { for _, tc := range []testCase{ - {"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSource cache duration: 29s is below 30 second minimum")}, - {"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSource cache duration: 20m1s is above 20 minute maximum")}, + {"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 29s is below 30 second minimum")}, + {"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 20m1s is above 20 minute maximum")}, } { t.Run(tc.name, func(t *testing.T) { - assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCacheDuration: tc.cacheDuration}), tc.expectedError.Error()) + assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCache: &JuelsPerFeeCoinCache{UpdateInterval: tc.cacheDuration}}), tc.expectedError.Error()) }) } }) diff --git a/core/services/ocr2/plugins/median/services.go b/core/services/ocr2/plugins/median/services.go index 4615f934511..779ea4f346c 100644 --- a/core/services/ocr2/plugins/median/services.go +++ b/core/services/ocr2/plugins/median/services.go @@ -127,9 +127,9 @@ func NewMedianServices(ctx context.Context, CreatedAt: time.Now(), }, lggr) - if !pluginConfig.JuelsPerFeeCoinCacheDisabled { + if pluginConfig.JuelsPerFeeCoinCache != nil { lggr.Infof("juelsPerFeeCoin data source caching is enabled") - juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()) + juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, *pluginConfig.JuelsPerFeeCoinCache) if err2 != nil { return nil, err2 } diff --git a/core/services/ocrcommon/data_source.go b/core/services/ocrcommon/data_source.go index e90382a06af..9ca111dea68 100644 --- a/core/services/ocrcommon/data_source.go +++ b/core/services/ocrcommon/data_source.go @@ -3,7 +3,6 @@ package ocrcommon import ( "context" "encoding/json" - errjoin "errors" "fmt" "math/big" "sync" @@ -19,6 +18,7 @@ import ( serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -103,8 +103,8 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l } } -const defaultCacheFreshness = time.Minute * 5 -const defaultCacheFreshnessAlert = time.Hour * 24 +const defaultUpdateInterval = time.Minute * 5 +const defaultStalenessAlertThreshold = time.Hour * 24 const dataSourceCacheKey = "dscache" type DataSourceCacheService interface { @@ -113,22 +113,27 @@ type DataSourceCacheService interface { median.DataSource } -func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (DataSourceCacheService, error) { +func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheCfg config.JuelsPerFeeCoinCache) (DataSourceCacheService, error) { inMemoryDS, ok := ds.(*inMemoryDataSource) if !ok { return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds) } - if cacheFreshness == 0 { - cacheFreshness = defaultCacheFreshness + updateInterval, stalenessAlertThreshold := cacheCfg.UpdateInterval.Duration(), cacheCfg.StalenessAlertThreshold.Duration() + if updateInterval == 0 { + updateInterval = defaultUpdateInterval + } + if stalenessAlertThreshold == 0 { + stalenessAlertThreshold = defaultStalenessAlertThreshold } dsCache := &inMemoryDataSourceCache{ - kvStore: kvStore, - cacheFreshness: cacheFreshness, - inMemoryDataSource: inMemoryDS, - chStop: make(chan struct{}), - chDone: make(chan struct{}), + inMemoryDataSource: inMemoryDS, + kvStore: kvStore, + updateInterval: updateInterval, + stalenessAlertThreshold: stalenessAlertThreshold, + chStop: make(chan struct{}), + chDone: make(chan struct{}), } return dsCache, nil } @@ -231,16 +236,18 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R // If cache update is overdue Observe defaults to standard inMemoryDataSource behaviour. type inMemoryDataSourceCache struct { *inMemoryDataSource - // cacheFreshness indicates duration between cache updates. - // Even if updates fail, previous values are returned. - cacheFreshness time.Duration - mu sync.RWMutex - chStop services.StopChan - chDone chan struct{} - latestUpdateErr error - latestTrrs pipeline.TaskRunResults - latestResult pipeline.FinalResult - kvStore job.KVStore + // updateInterval indicates duration between cache updates. + // Even if update fail, previous values are returned. + updateInterval time.Duration + // stalenessAlertThreshold indicates duration before logs raise severity level because of stale cache. + stalenessAlertThreshold time.Duration + mu sync.RWMutex + chStop services.StopChan + chDone chan struct{} + latestUpdateErr error + latestTrrs pipeline.TaskRunResults + latestResult pipeline.FinalResult + kvStore job.KVStore } func (ds *inMemoryDataSourceCache) Start(context.Context) error { @@ -256,7 +263,7 @@ func (ds *inMemoryDataSourceCache) Close() error { // updater periodically updates data source cache. func (ds *inMemoryDataSourceCache) updater() { - ticker := time.NewTicker(ds.cacheFreshness) + ticker := time.NewTicker(ds.updateInterval) updateCache := func() { ctx, cancel := ds.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Second*10)) defer cancel() @@ -286,31 +293,30 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { ds.mu.Lock() defer ds.mu.Unlock() - // check for any errors - _, latestTrrs, latestUpdateErr := ds.executeRun(ctx) - if latestTrrs.FinalResult(ds.lggr).HasErrors() { - latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...) - } - - if latestUpdateErr != nil { + _, latestTrrs, err := ds.executeRun(ctx) + if err != nil { previousUpdateErr := ds.latestUpdateErr - ds.latestUpdateErr = latestUpdateErr - // raise log severity + ds.latestUpdateErr = err + // warn log if previous cache update also errored if previousUpdateErr != nil { ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr) } - return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID) + + return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID) } - ds.latestTrrs = latestTrrs - ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr) - value, err := ds.inMemoryDataSource.parse(ds.latestResult) + value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr)) if err != nil { - return errors.Wrapf(err, "invalid result") + ds.latestUpdateErr = errors.Wrapf(err, "invalid result") + return ds.latestUpdateErr } - // backup in case data source fails continuously and node gets rebooted + // update cache values + ds.latestTrrs = latestTrrs + ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr) + ds.latestUpdateErr = nil + // backup in case data source fails continuously and node gets rebooted timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}) if err != nil { return fmt.Errorf("failed to marshal result time pair, err: %w", err) @@ -333,7 +339,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul ds.mu.RUnlock() if err := ds.updateCache(ctx); err != nil { - ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err) + ds.lggr.Warnf("failed to update cache, returning stale result now, err: %v", err) } ds.mu.RLock() @@ -349,15 +355,15 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey) if err != nil { - return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err) + return nil, fmt.Errorf("in memory data source cache is empty and failed to get backup persisted value, err: %w", err) } - if err := json.Unmarshal(timePairBytes, &resTime); err != nil { - return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err) + if err = json.Unmarshal(timePairBytes, &resTime); err != nil { + return nil, fmt.Errorf("in memory data source cache is empty and failed to unmarshal backup persisted value, err: %w", err) } - if time.Since(resTime.Time) >= defaultCacheFreshnessAlert { - ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr) + if time.Since(resTime.Time) >= ds.stalenessAlertThreshold { + ds.lggr.Errorf("in memory data source cache is empty and the persisted value hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) } return resTime.Result.ToInt(), nil } @@ -368,6 +374,13 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty ConfigDigest: timestamp.ConfigDigest.Hex(), }) + // if last update was unsuccessful, check how much time passed since a successful update + if ds.latestUpdateErr != nil { + if time.Since(ds.latestTrrs.GetTaskRunResultsFinishedAt()) >= ds.stalenessAlertThreshold { + ds.lggr.Errorf("in memory cache is old and hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) + } + + } return ds.parse(latestResult) } diff --git a/core/services/ocrcommon/data_source_test.go b/core/services/ocrcommon/data_source_test.go index b9b19d30bf2..05ba0f4aa42 100644 --- a/core/services/ocrcommon/data_source_test.go +++ b/core/services/ocrcommon/data_source_test.go @@ -21,9 +21,11 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/job/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" pipelinemocks "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/mocks" + "github.com/smartcontractkit/chainlink/v2/core/store/models" ) var ( @@ -78,7 +80,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { mockKVStore := mocks.KVStore{} mockKVStore.On("Store", mock.Anything, mock.Anything, mock.Anything).Return(nil) mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, nil) - dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2) + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Second * 2)}) require.NoError(t, err) servicetest.Run(t, dsCache) @@ -112,7 +114,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { mockKVStore.On("Get", mock.Anything, mock.Anything).Return(result, nil) // set updater to a long time so that it doesn't log errors after the test is done - dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100) + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)}) require.NoError(t, err) changeResultValue(runner, "-1", true, false) servicetest.Run(t, dsCache) @@ -131,7 +133,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, assert.AnError) // set updater to a long time so that it doesn't log errors after the test is done - dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100) + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)}) require.NoError(t, err) changeResultValue(runner, "-1", true, false) servicetest.Run(t, dsCache) diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index a88b2165a2e..a0fc28c6862 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -217,6 +217,17 @@ func (result *TaskRunResult) IsTerminal() bool { // TaskRunResults represents a collection of results for all task runs for one pipeline run type TaskRunResults []TaskRunResult +// GetTaskRunResultsFinishedAt returns latest finishedAt time from TaskRunResults. +func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time { + var finishedTime time.Time + for _, trr := range trrs { + if trr.FinishedAt.Valid && trr.FinishedAt.Time.After(finishedTime) { + finishedTime = trr.FinishedAt.Time + } + } + return finishedTime +} + // FinalResult pulls the FinalResult for the pipeline_run from the task runs // It needs to respect the output index of each task func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult { diff --git a/tools/ci/install_solana b/tools/ci/install_solana index 912acd1bf48..4d84ef4ad0f 100755 --- a/tools/ci/install_solana +++ b/tools/ci/install_solana @@ -1,8 +1,8 @@ #!/usr/bin/env bash set -euo pipefail -VERSION=v1.13.3 -SHASUM=3a063fe58e6f8bc9e9de84a8d1b96da87e9184cb357d462522f7ec8a2c23bec2 +VERSION=v1.17.28 +SHASUM=97faa4d14becfccd3bc539dbc0aaf28c84cfe9d80d299ec70092fb5844403724 echo "Installing solana@${VERSION}" curl -sSfL https://release.solana.com/$VERSION/install --output install_solana.sh \