Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix InMemoryDataSourceCache cleanup (#12647) #12652

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fresh-oranges-brake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

fix jfpc cache cleanup
7 changes: 5 additions & 2 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,12 @@ func NewMedianServices(ctx context.Context,

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
return nil, err
juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration())
if err2 != nil {
return nil, err2
}
juelsPerFeeCoinSource = juelsPerFeeCoinSourceCache
srvs = append(srvs, juelsPerFeeCoinSourceCache)
}

if cmdName := env.MedianPlugin.Cmd.Get(); cmdName != "" {
Expand Down
42 changes: 37 additions & 5 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -104,7 +105,13 @@ const defaultCacheFreshness = time.Minute * 5
const defaultCacheFreshnessAlert = time.Hour * 24
const dataSourceCacheKey = "dscache"

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) {
type DataSourceCacheService interface {
Start(context.Context) error
Close() error
median.DataSource
}

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (DataSourceCacheService, error) {
inMemoryDS, ok := ds.(*inMemoryDataSource)
if !ok {
return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds)
Expand All @@ -118,8 +125,9 @@ func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cache
kvStore: kvStore,
cacheFreshness: cacheFreshness,
inMemoryDataSource: inMemoryDS,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
}
go func() { dsCache.updater() }()
return dsCache, nil
}

Expand Down Expand Up @@ -225,21 +233,45 @@ type inMemoryDataSourceCache struct {
// Even if updates fail, previous values are returned.
cacheFreshness time.Duration
mu sync.RWMutex
chStop services.StopChan
chDone chan struct{}
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
}

func (ds *inMemoryDataSourceCache) Start(context.Context) error {
go func() { ds.updater() }()
return nil
}

func (ds *inMemoryDataSourceCache) Close() error {
close(ds.chStop)
<-ds.chDone
return nil
}

// updater periodically updates data source cache.
func (ds *inMemoryDataSourceCache) updater() {
ticker := time.NewTicker(ds.cacheFreshness)
for ; true; <-ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
updateCache := func() {
ctx, cancel := ds.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Second*10))
defer cancel()
if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache, err: %v", err)
}
cancel()
}

updateCache()
for {
select {
case <-ticker.C:
updateCache()
case <-ds.chStop:
close(ds.chDone)
return
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -78,6 +79,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
require.NoError(t, err)
servicetest.Run(t, dsCache)

mockVal := int64(1)
// Test if Observe notices that cache updater failed and can refresh the cache on its own
Expand Down Expand Up @@ -112,12 +114,12 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)

time.Sleep(time.Millisecond * 100)
val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
require.NoError(t, err)
assert.Equal(t, persistedVal.String(), val.String())

})

t.Run("test total updater fail with no persisted value ", func(t *testing.T) {
Expand All @@ -131,6 +133,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)

time.Sleep(time.Millisecond * 100)
_, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
Expand Down
Loading