Skip to content

Commit

Permalink
BCF - 3139 Chain Reader Log Poller filters in config and some code cl…
Browse files Browse the repository at this point in the history
…eanup (#13356)

* Inital commit for log poller filters in config

* Change Chain Reader Lp filters from per event binding to per contract

* Fix lint

* Add ChReader contract log poller filter init

* Add per event polling filter override for Chain Reader

* Fix event definitions and filters handling in CR

* Add changeset and lint

* Update variable naming in chain reader bindings.go

* Change write target test

* Fix filter handling

* Change contractBinding to not manage readBinding lp filters and lint

* Move contractBinding to a separate file

* Improve event and contract bind handling and lint

* Handle concurrency edge cases in reader bindings

* Minor lint

* Add event validation test and some comments to chain reader

* Add Chain Reader config json marshall test

* Use require instead of assert for Test_ChainReaderConfig

* Add comments to AddReadBinding

* Rename CR verifyEventInputsUsed to verifyEventIndexedInputsUsed

* Resolve merge issues in chain_reader_interface_tester.go

---------

Co-authored-by: Domino Valdano <[email protected]>
  • Loading branch information
ilija42 and reductionista authored Jun 14, 2024
1 parent 8e1b930 commit 0228243
Show file tree
Hide file tree
Showing 14 changed files with 644 additions and 154 deletions.
5 changes: 5 additions & 0 deletions .changeset/eleven-buckets-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Add Log Poller support to Chain Reader through setting them in config. All filters should be part of the contract wide filter unless an event needs specific polling configuration, which can be set on a per event basis..
1 change: 1 addition & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
return logs, nil
}

// TODO flaky BCF-3258
func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions core/internal/features/ocr2/features_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ fromBlock = %d
if test.chainReaderAndCodec {
chainReaderSpec = `
[relayConfig.chainReader.contracts.median]
contractPollingFilter.genericEventNames = ["LatestRoundRequested"]
contractABI = '''
[
{
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
)

type readBinding interface {
GetLatestValue(ctx context.Context, params, returnVal any) error
QueryKey(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]commontypes.Sequence, error)
Bind(ctx context.Context, binding commontypes.BoundContract) error
SetCodec(codec commontypes.RemoteCodec)
Register(ctx context.Context) error
Unregister(ctx context.Context) error
GetLatestValue(ctx context.Context, params, returnVal any) error
QueryKey(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]commontypes.Sequence, error)
}
64 changes: 36 additions & 28 deletions core/services/relay/evm/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,65 @@ import (
"fmt"

commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
)

// key is contract name
type contractBindings map[string]readBindings
// bindings manage all contract bindings, key is contract name.
type bindings map[string]*contractBinding

// key is read name
type readBindings map[string]readBinding

func (b contractBindings) GetReadBinding(contractName, readName string) (readBinding, error) {
rb, rbExists := b[contractName]
if !rbExists {
func (b bindings) GetReadBinding(contractName, readName string) (readBinding, error) {
// GetReadBindings should only be called after Chain Reader init.
cb, cbExists := b[contractName]
if !cbExists {
return nil, fmt.Errorf("%w: no contract named %s", commontypes.ErrInvalidType, contractName)
}

reader, readerExists := rb[readName]
if !readerExists {
rb, rbExists := cb.readBindings[readName]
if !rbExists {
return nil, fmt.Errorf("%w: no readName named %s in contract %s", commontypes.ErrInvalidType, readName, contractName)
}
return reader, nil
return rb, nil
}

func (b contractBindings) AddReadBinding(contractName, readName string, reader readBinding) {
rbs, rbsExists := b[contractName]
if !rbsExists {
rbs = readBindings{}
b[contractName] = rbs
// AddReadBinding adds read bindings. Calling this outside of Chain Reader init is not thread safe.
func (b bindings) AddReadBinding(contractName, readName string, rb readBinding) {
cb, cbExists := b[contractName]
if !cbExists {
cb = &contractBinding{
name: contractName,
readBindings: make(map[string]readBinding),
}
b[contractName] = cb
}
rbs[readName] = reader
cb.readBindings[readName] = rb
}

func (b contractBindings) Bind(ctx context.Context, boundContracts []commontypes.BoundContract) error {
// Bind binds contract addresses to contract bindings and read bindings.
// Bind also registers the common contract polling filter and eventBindings polling filters.
func (b bindings) Bind(ctx context.Context, lp logpoller.LogPoller, boundContracts []commontypes.BoundContract) error {
for _, bc := range boundContracts {
rbs, rbsExist := b[bc.Name]
if !rbsExist {
cb, cbExists := b[bc.Name]
if !cbExists {
return fmt.Errorf("%w: no contract named %s", commontypes.ErrInvalidConfig, bc.Name)
}
for _, r := range rbs {
if err := r.Bind(ctx, bc); err != nil {

if err := cb.Bind(ctx, lp, bc); err != nil {
return err
}

for _, rb := range cb.readBindings {
if err := rb.Bind(ctx, bc); err != nil {
return err
}
}
}
return nil
}

func (b contractBindings) ForEach(ctx context.Context, fn func(readBinding, context.Context) error) error {
for _, rbs := range b {
for _, rb := range rbs {
if err := fn(rb, ctx); err != nil {
return err
}
func (b bindings) ForEach(ctx context.Context, fn func(context.Context, *contractBinding) error) error {
for _, cb := range b {
if err := fn(ctx, cb); err != nil {
return err
}
}
return nil
Expand Down
Loading

0 comments on commit 0228243

Please sign in to comment.