Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FUN-973] s4 snapshot caching #12275

Merged
merged 10 commits into from
Mar 11, 2024
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/functions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
// Create all OCR2 plugin Oracles and all extra services needed to run a Functions job.
func NewFunctionsServices(ctx context.Context, functionsOracleArgs, thresholdOracleArgs, s4OracleArgs *libocr2.OCR2OracleArgs, conf *FunctionsServicesConfig) ([]job.ServiceCtx, error) {
pluginORM := functions.NewORM(conf.DB, conf.Logger, conf.QConfig, common.HexToAddress(conf.ContractID))
s4ORM := s4.NewPostgresORM(conf.DB, conf.Logger, conf.QConfig, s4.SharedTableName, FunctionsS4Namespace)
s4ORM := s4.NewCachedORMWrapper(s4.NewPostgresORM(conf.DB, conf.Logger, conf.QConfig, s4.SharedTableName, FunctionsS4Namespace), conf.Logger)

var pluginConfig config.PluginConfig
if err := json.Unmarshal(conf.Job.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig); err != nil {
Expand Down
119 changes: 119 additions & 0 deletions core/services/s4/cached_orm_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package s4

import (
"fmt"
"math/big"
"strings"
"time"

"github.com/patrickmn/go-cache"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency must already exist I suppose since I don't see any changes to the go.mod file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! the evmregistry is already using it


ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

const (
// defaultExpiration decides how long info will be valid for.
defaultExpiration = 10 * time.Minute
// cleanupInterval decides when the expired items in cache will be deleted.
cleanupInterval = 5 * time.Minute

getSnapshotCachePrefix = "GetSnapshot"
)

// CachedORM is a cached orm wrapper that implements the ORM interface.
// It adds a cache layer in order to remove unnecessary pressure to the underlaying implementation
type CachedORM struct {
underlayingORM ORM
cache *cache.Cache
lggr logger.Logger
}

var _ ORM = (*CachedORM)(nil)

func NewCachedORMWrapper(orm ORM, lggr logger.Logger) *CachedORM {
return &CachedORM{
underlayingORM: orm,
cache: cache.New(defaultExpiration, cleanupInterval),
lggr: lggr,
}
}

func (c CachedORM) Get(address *ubig.Big, slotId uint, qopts ...pg.QOpt) (*Row, error) {
return c.underlayingORM.Get(address, slotId, qopts...)
}

func (c CachedORM) Update(row *Row, qopts ...pg.QOpt) error {
c.deleteRowFromSnapshotCache(row)

return c.underlayingORM.Update(row, qopts...)
}

func (c CachedORM) DeleteExpired(limit uint, utcNow time.Time, qopts ...pg.QOpt) (int64, error) {
deletedRows, err := c.underlayingORM.DeleteExpired(limit, utcNow, qopts...)
if err != nil {
return 0, err
}

if deletedRows > 0 {
c.cache.Flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to flush the entire cache every time a single row is expired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more ctx here but basically its because we know some rows where deleted, but we don't know which ones, so the best effort on keeping the cache sync is to flush it

}

return deletedRows, nil
}

func (c CachedORM) GetSnapshot(addressRange *AddressRange, qopts ...pg.QOpt) ([]*SnapshotRow, error) {
key := fmt.Sprintf("%s_%s_%s", getSnapshotCachePrefix, addressRange.MinAddress.String(), addressRange.MaxAddress.String())

cached, found := c.cache.Get(key)
if found {
return cached.([]*SnapshotRow), nil
}

c.lggr.Debug("Snapshot not found in cache, fetching it from underlaying implementation")
data, err := c.underlayingORM.GetSnapshot(addressRange, qopts...)
if err != nil {
return nil, err
}
c.cache.Set(key, data, defaultExpiration)

return data, nil
}

func (c CachedORM) GetUnconfirmedRows(limit uint, qopts ...pg.QOpt) ([]*Row, error) {
return c.underlayingORM.GetUnconfirmedRows(limit, qopts...)
}

// deleteRowFromSnapshotCache will clean the cache for every snapshot that would involve a given row
// in case of an error parsing a key it will also delete the key from the cache
func (c CachedORM) deleteRowFromSnapshotCache(row *Row) {
for key := range c.cache.Items() {
keyParts := strings.Split(key, "_")
if len(keyParts) != 3 {
continue
}

if keyParts[0] != getSnapshotCachePrefix {
continue
}

minAddress, ok := new(big.Int).SetString(keyParts[1], 10)
if !ok {
c.lggr.Errorf("error while converting minAddress string: %s to big.Int, deleting key %q", keyParts[1], key)
c.cache.Delete(key)
continue
}

maxAddress, ok := new(big.Int).SetString(keyParts[2], 10)
if !ok {
c.lggr.Errorf("error while converting minAddress string: %s to big.Int, deleting key %q ", keyParts[2], key)
c.cache.Delete(key)
continue
}

if row.Address.ToInt().Cmp(minAddress) >= 0 && row.Address.ToInt().Cmp(maxAddress) <= 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand a case where this would not be true...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S4 supports snapshot sharding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. I re-read the CLIP and this now makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we want to only delete the cached snapshots that its addressRange contain a specific row. Just to avoid deleting snapshots that are not affected

c.cache.Delete(key)
}
}
}
Loading
Loading