Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1288 LogPoller - Ability to pass custom search queries #11242

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,11 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A
func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs Confirmations, qopts ...pg.QOpt) (int64, error) {
return 0, ErrDisabled
}

func (d disabled) FetchAnyLogs(queryName string, query string, args QueryArgs, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) FetchAnyResults(queryName string, query string, args QueryArgs, dest interface{}, qopts ...pg.QOpt) error {
return ErrDisabled
}
57 changes: 25 additions & 32 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ type LogPoller interface {
IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)

// FetchAnyLogs gets any logs matching the query and provided args. Data is decoded and returned as slice of Log.
// queryName is used for monitoring and debugging purposes. It's also pushed as a label to Prometheus, so use
// a fixed set of values for it (e.g. don't generate new value with every call).
// Otherwise, it will increase cardinality and affect Prometheus performance
FetchAnyLogs(queryName string, query string, args QueryArgs, qopts ...pg.QOpt) ([]Log, error)
// FetchAnyResults similar to FetchAnyLogs will get data from LogPoller but will unmarshall that according to the type passed as dest.
// This gives user more flexibility when using advanced queries that not necessary match Log struct. (e.g. pick only a single value from the log)
// Results will be returned in a dest interface{}, it's up to user to pass struct fitting the query result.
FetchAnyResults(queryName string, query string, args QueryArgs, dest interface{}, qopts ...pg.QOpt) error
}

type Confirmations int
Expand Down Expand Up @@ -679,9 +689,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
}

lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks)
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithQueryer(tx))
})
err = lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithParentCtx(ctx))
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
return err
Expand Down Expand Up @@ -750,21 +758,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
// These deletes are bounded by reorg depth, so they are
// fast and should not slow down the log readers.
err3 := lp.orm.DeleteBlocksAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
if err3 != nil {
lp.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err3)
return err3
}
err3 = lp.orm.DeleteLogsAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
if err3 != nil {
lp.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err3)
return err3
}
return nil
})
err2 = lp.orm.DeleteLogsAndBlocksAfter(blockAfterLCA.Number, pg.WithParentCtx(ctx))
if err2 != nil {
// If we error on db commit, we can't know if the tx went through or not.
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
Expand Down Expand Up @@ -849,20 +843,11 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
return
}
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp.Unix())
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber, pg.WithQueryer(tx)); err2 != nil {
return err2
}
if len(logs) == 0 {
return nil
}
return lp.orm.InsertLogs(convertLogs(logs,
[]LogPollerBlock{{BlockNumber: currentBlockNumber,
BlockTimestamp: currentBlock.Timestamp}},
lp.lggr,
lp.ec.ConfiguredChainID(),
), pg.WithQueryer(tx))
})
block := NewLogPollerBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber)
err = lp.orm.InsertLogsWithBlock(
convertLogs(logs, []LogPollerBlock{block}, lp.lggr, lp.ec.ConfiguredChainID()),
block,
)
if err != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
return
Expand Down Expand Up @@ -1197,6 +1182,14 @@ func (lp *logPoller) IndexedLogsWithSigsExcluding(address common.Address, eventS
return lp.orm.SelectIndexedLogsWithSigsExcluding(eventSigA, eventSigB, topicIndex, address, fromBlock, toBlock, confs, qopts...)
}

func (lp *logPoller) FetchAnyLogs(queryName string, query string, args QueryArgs, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectAnyLogs(queryName, query, args, qopts...)
}

func (lp *logPoller) FetchAnyResults(queryName string, query string, args QueryArgs, dest interface{}, qopts ...pg.QOpt) error {
return lp.orm.SelectAnyRows(queryName, query, args, dest, qopts...)
}

func EvmWord(i uint64) common.Hash {
var b = make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
Expand Down
54 changes: 54 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions core/chains/evm/logpoller/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,12 @@ func (l *Log) ToGethLog() types.Log {
Index: uint(l.LogIndex),
}
}

func NewLogPollerBlock(blockHash common.Hash, blockNumber int64, timestamp time.Time, finalizedBlockNumber int64) LogPollerBlock {
return LogPollerBlock{
BlockHash: blockHash,
BlockNumber: blockNumber,
BlockTimestamp: timestamp,
FinalizedBlockNumber: finalizedBlockNumber,
}
}
48 changes: 32 additions & 16 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,15 @@ func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QC
}
}

func (o *ObservedORM) Q() pg.Q {
return o.ORM.Q()
}

func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogs", func() error {
return o.ORM.InsertLogs(logs, qopts...)
})
}

func (o *ObservedORM) InsertBlock(hash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlock int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertBlock", func() error {
return o.ORM.InsertBlock(hash, blockNumber, blockTimestamp, lastFinalizedBlock, qopts...)
func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogsWithBlock", func() error {
return o.ORM.InsertLogsWithBlock(logs, block, qopts...)
})
}

Expand All @@ -102,21 +98,15 @@ func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error {
})
}

func (o *ObservedORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksAfter", func() error {
return o.ORM.DeleteBlocksAfter(start, qopts...)
})
}

func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksBefore", func() error {
return o.ORM.DeleteBlocksBefore(end, qopts...)
})
}

func (o *ObservedORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAfter", func() error {
return o.ORM.DeleteLogsAfter(start, qopts...)
func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error {
return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...)
})
}

Expand Down Expand Up @@ -234,6 +224,18 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(address common.Address, eventS
})
}

func (o *ObservedORM) SelectAnyLogs(queryName string, query string, args QueryArgs, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, queryName, func() ([]Log, error) {
return o.ORM.SelectAnyLogs(queryName, query, args, qopts...)
})
}

func (o *ObservedORM) SelectAnyRows(queryName string, query string, args QueryArgs, dest interface{}, qopts ...pg.QOpt) error {
return withObservedQueryAndMaybeResults(o, queryName, func() (interface{}, error) {
return dest, o.ORM.SelectAnyRows(queryName, query, args, dest, qopts...)
})
}

func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) {
results, err := withObservedQuery(o, queryName, query)
if err == nil {
Expand All @@ -244,6 +246,20 @@ func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query
return results, err
}

func withObservedQueryAndMaybeResults(o *ObservedORM, queryName string, query func() (interface{}, error)) error {
results, err := withObservedQuery(o, queryName, query)
if err != nil {
return err
}

if resultsSlice, ok := results.([]interface{}); ok {
o.datasetSize.
WithLabelValues(o.chainId, queryName).
Set(float64(len(resultsSlice)))
}
return nil
}

func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T, error)) (T, error) {
queryStarted := time.Now()
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMultipleMetricsArePublished(t *testing.T) {
_, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(0, []common.Address{{}}, []common.Hash{{}}, 1, pg.WithParentCtx(ctx))
_, _ = orm.SelectIndexedLogsCreatedAfter(common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx))
_ = orm.InsertLogs([]Log{}, pg.WithParentCtx(ctx))
_ = orm.InsertBlock(common.Hash{}, 1, time.Now(), 0, pg.WithParentCtx(ctx))
_ = orm.InsertLogsWithBlock([]Log{}, NewLogPollerBlock(common.Hash{}, 1, time.Now(), 0), pg.WithParentCtx(ctx))

require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration))
require.Equal(t, 10, testutil.CollectAndCount(orm.datasetSize))
Expand Down
Loading
Loading