Skip to content

Commit

Permalink
cleanup: deduplicate shared cache logic (#803)
Browse files Browse the repository at this point in the history
* cleanup: deduplicate shared cache logic

* use Read + pass name to cache state management
  • Loading branch information
aalu1418 authored Jul 31, 2024
1 parent f2ca801 commit b3035e3
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 248 deletions.
40 changes: 22 additions & 18 deletions pkg/solana/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,29 +167,33 @@ func TestCache(t *testing.T) {
}))

lggr := logger.Test(t)
stateCache := StateCache{
StateID: solana.MustPublicKeyFromBase58("11111111111111111111111111111111"),
cfg: config.NewDefault(),
reader: testSetupReader(t, mockServer.URL),
lggr: lggr,
}
stateCache := NewStateCache(
solana.MustPublicKeyFromBase58("11111111111111111111111111111111"),
"test-chain-id",
config.NewDefault(),
testSetupReader(t, mockServer.URL),
lggr,
)
require.NoError(t, stateCache.Start(ctx))
require.NoError(t, stateCache.Close())
require.NoError(t, stateCache.fetchState(ctx))
assert.Equal(t, "GADeYvXjPwZP7ds1yDY9VFp12bNjdxT1YyksMvFGK9xn", stateCache.state.Transmissions.String())
assert.True(t, !stateCache.stateTime.IsZero())

transmissionsCache := TransmissionsCache{
TransmissionsID: solana.MustPublicKeyFromBase58("11111111111111111111111111111112"),
cfg: config.NewDefault(),
reader: testSetupReader(t, mockServer.URL),
lggr: lggr,
}
require.NoError(t, stateCache.Fetch(ctx))
state, err := stateCache.Read()
require.NoError(t, err)
assert.Equal(t, "GADeYvXjPwZP7ds1yDY9VFp12bNjdxT1YyksMvFGK9xn", state.Transmissions.String())
assert.True(t, !stateCache.Timestamp().IsZero())

transmissionsCache := NewTransmissionsCache(
solana.MustPublicKeyFromBase58("11111111111111111111111111111112"),
"test-chain-id",
config.NewDefault(),
testSetupReader(t, mockServer.URL),
lggr,
)
require.NoError(t, transmissionsCache.Start(ctx))
require.NoError(t, transmissionsCache.Close())

require.NoError(t, transmissionsCache.fetchLatestTransmission(ctx))
answer, err := transmissionsCache.ReadAnswer()
require.NoError(t, transmissionsCache.Fetch(ctx))
answer, err := transmissionsCache.Read()
assert.NoError(t, err)
assert.Equal(t, expectedTime, answer.Timestamp)
assert.Equal(t, expectedAns, answer.Data.String())
Expand Down
143 changes: 143 additions & 0 deletions pkg/solana/client/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package client

import (
"context"
"errors"
"sync"
"time"

"github.com/gagliardetto/solana-go"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

type CacheGetter[R any] func(ctx context.Context) (res R, slot uint64, err error)

// Cache is a generic implementation for caching data from the chain
type Cache[R any] struct {
services.StateMachine

// identifier
metricName string
Account solana.PublicKey
ChainID string

// stored answer
resLock sync.RWMutex
res R
resTime time.Time

// dependencies
getter CacheGetter[R]
cfg config.Config
lggr logger.Logger

// polling
done chan struct{}
stopCh services.StopChan
}

func NewCache[R any](metricName string, account solana.PublicKey, chainID string, cfg config.Config, getFunc CacheGetter[R], lggr logger.Logger) *Cache[R] {
return &Cache[R]{
metricName: metricName,
Account: account,
ChainID: chainID,
getter: getFunc,
lggr: lggr,
cfg: cfg,
}
}

func (c *Cache[R]) Name() string {
return c.lggr.Name()
}

// Start polling
func (c *Cache[R]) Start(ctx context.Context) error {
return c.StartOnce("cache_"+c.metricName, func() error {
c.done = make(chan struct{})
c.stopCh = make(chan struct{})
// We synchronously update the config on start so that
// when OCR starts there is config available (if possible).
// Avoids confusing "contract has not been configured" OCR errors.
err := c.Fetch(ctx)
if err != nil {
c.lggr.Warnf("error in initial fetch %s", err)
}
go c.Poll()
return nil
})
}

// Close stops the polling
func (c *Cache[R]) Close() error {
return c.StopOnce("cache_"+c.metricName, func() error {
close(c.stopCh)
<-c.done
return nil
})
}

// Poll contains the polling implementation
func (c *Cache[R]) Poll() {
defer close(c.done)
ctx, cancel := c.stopCh.NewCtx()
defer cancel()
c.lggr.Debugf("Starting polling: %s", c.Account)
tick := time.After(0)
for {
select {
case <-ctx.Done():
c.lggr.Debugf("Stopping polling: %s", c.Account)
return
case <-tick:
start := time.Now()
err := c.Fetch(ctx)
if err != nil {
c.lggr.Errorf("error in Poll.fetch %s", err)
}
// Note negative duration will be immediately ready
tick = time.After(utils.WithJitter(c.cfg.OCR2CachePollPeriod()) - time.Since(start))
}
}
}

// Read reads the latest result from memory with mutex and errors if timeout is exceeded
func (c *Cache[R]) Read() (R, error) {
c.resLock.RLock()
defer c.resLock.RUnlock()

// check if stale timeout
var err error
if time.Since(c.resTime) > c.cfg.OCR2CacheTTL() {
err = errors.New("error in Read: stale data, polling is likely experiencing errors")
}
return c.res, err
}

func (c *Cache[R]) Timestamp() time.Time {
return c.resTime
}

func (c *Cache[R]) Fetch(ctx context.Context) error {
c.lggr.Debugf("fetch for account: %s", c.Account)
res, _, err := c.getter(ctx)
if err != nil {
return err
}
c.lggr.Debugf("latest fetched for account: %s, result: %v", c.Account, res)

timestamp := time.Now()
monitor.SetCacheTimestamp(timestamp, c.metricName, c.ChainID, c.Account.String())
// acquire lock and write to state
c.resLock.Lock()
defer c.resLock.Unlock()
c.res = res
c.resTime = timestamp
return nil
}
4 changes: 2 additions & 2 deletions pkg/solana/config_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *ConfigTracker) Notify() <-chan struct{} {
// LatestConfigDetails returns information about the latest configuration,
// but not the configuration itself.
func (c *ConfigTracker) LatestConfigDetails(ctx context.Context) (changedInBlock uint64, configDigest types.ConfigDigest, err error) {
state, err := c.stateCache.ReadState()
state, err := c.stateCache.Read()
return state.Config.LatestConfigBlockNumber, state.Config.LatestConfigDigest, err
}

Expand Down Expand Up @@ -66,7 +66,7 @@ func ConfigFromState(state State) (types.ContractConfig, error) {

// LatestConfig returns the latest configuration.
func (c *ConfigTracker) LatestConfig(ctx context.Context, changedInBlock uint64) (types.ContractConfig, error) {
state, err := c.stateCache.ReadState()
state, err := c.stateCache.Read()
if err != nil {
return types.ContractConfig{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/solana/median_contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ func (c *MedianContract) LatestTransmissionDetails(
latestTimestamp time.Time,
err error,
) {
state, err := c.stateCache.ReadState()
state, err := c.stateCache.Read()
if err != nil {
return configDigest, epoch, round, latestAnswer, latestTimestamp, err
}
answer, err := c.transmissionsCache.ReadAnswer()
answer, err := c.transmissionsCache.Read()
if err != nil {
return configDigest, epoch, round, latestAnswer, latestTimestamp, err
}
Expand Down Expand Up @@ -60,6 +60,6 @@ func (c *MedianContract) LatestRoundRequested(
round uint8,
err error,
) {
state, err := c.stateCache.ReadState()
state, err := c.stateCache.Read()
return state.Config.LatestConfigDigest, 0, 0, err
}
4 changes: 2 additions & 2 deletions pkg/solana/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func newConfigProvider(ctx context.Context, lggr logger.Logger, chain Chain, arg
}

func (c *configProvider) Name() string {
return c.stateCache.lggr.Name()
return c.stateCache.Name()
}

func (c *configProvider) Start(ctx context.Context) error {
Expand Down Expand Up @@ -260,7 +260,7 @@ type medianProvider struct {
}

func (p *medianProvider) Name() string {
return p.stateCache.lggr.Name()
return p.stateCache.Name()
}

// start both cache services
Expand Down
116 changes: 5 additions & 111 deletions pkg/solana/state_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,139 +2,33 @@ package solana

import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
"time"

bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

var (
configVersion uint8 = 1
)

type StateCache struct {
services.StateMachine
// on-chain program + 2x state accounts (state + transmissions)
StateID solana.PublicKey
chainID string

stateLock sync.RWMutex
state State
stateTime time.Time

// dependencies
reader client.Reader
cfg config.Config
lggr logger.Logger

// polling
done chan struct{}
stopCh services.StopChan
*client.Cache[State]
}

func NewStateCache(stateID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *StateCache {
return &StateCache{
StateID: stateID,
chainID: chainID,
reader: reader,
lggr: lggr,
cfg: cfg,
}
}

// Start polling
func (c *StateCache) Start(ctx context.Context) error {
return c.StartOnce("pollState", func() error {
c.done = make(chan struct{})
c.stopCh = make(chan struct{})
// We synchronously update the config on start so that
// when OCR starts there is config available (if possible).
// Avoids confusing "contract has not been configured" OCR errors.
err := c.fetchState(ctx)
if err != nil {
c.lggr.Warnf("error in initial PollState.fetchState %s", err)
}
go c.PollState()
return nil
})
}

// PollState contains the state and transmissions polling implementation
func (c *StateCache) PollState() {
defer close(c.done)
ctx, cancel := c.stopCh.NewCtx()
defer cancel()
c.lggr.Debugf("Starting state polling for state: %s", c.StateID)
tick := time.After(0)
for {
select {
case <-ctx.Done():
c.lggr.Debugf("Stopping state polling for state: %s", c.StateID)
return
case <-tick:
// async poll both ocr2 states
start := time.Now()
err := c.fetchState(ctx)
if err != nil {
c.lggr.Errorf("error in PollState.fetchState %s", err)
}
// Note negative duration will be immediately ready
tick = time.After(utils.WithJitter(c.cfg.OCR2CachePollPeriod()) - time.Since(start))
}
}
}

// Close stops the polling
func (c *StateCache) Close() error {
return c.StopOnce("pollState", func() error {
close(c.stopCh)
<-c.done
return nil
})
}

// ReadState reads the latest state from memory with mutex and errors if timeout is exceeded
func (c *StateCache) ReadState() (State, error) {
c.stateLock.RLock()
defer c.stateLock.RUnlock()

var err error
if time.Since(c.stateTime) > c.cfg.OCR2CacheTTL() {
err = errors.New("error in ReadState: stale state data, polling is likely experiencing errors")
name := "ocr2_median_state"
getter := func(ctx context.Context) (State, uint64, error) {
return GetState(ctx, reader, stateID, cfg.Commitment())
}
return c.state, err
}

func (c *StateCache) fetchState(ctx context.Context) error {
c.lggr.Debugf("fetch state for account: %s", c.StateID.String())
state, _, err := GetState(ctx, c.reader, c.StateID, c.cfg.Commitment())
if err != nil {
return err
}

c.lggr.Debugf("state fetched for account: %s, result (config digest): %v", c.StateID, hex.EncodeToString(state.Config.LatestConfigDigest[:]))

timestamp := time.Now()
monitor.SetCacheTimestamp(timestamp, "ocr2_median_state", c.chainID, c.StateID.String())
// acquire lock and write to state
c.stateLock.Lock()
defer c.stateLock.Unlock()
c.state = state
c.stateTime = timestamp
return nil
return &StateCache{client.NewCache(name, stateID, chainID, cfg, getter, logger.With(lggr, "cache", name))}
}

func GetState(ctx context.Context, reader client.AccountReader, account solana.PublicKey, commitment rpc.CommitmentType) (State, uint64, error) {
Expand Down
Loading

0 comments on commit b3035e3

Please sign in to comment.