Skip to content

Commit

Permalink
CCIP-1336 LogPoller - Bunch of minor improvements (#11348)
Browse files Browse the repository at this point in the history
* Tracking number of logs inserted by LP

* Adding read query to LP

* Index for fourth word in data

* Regenerating mocks

* Filter by address in SelectIndexedLogsByTxHash

* Fix

* Post review fixes

* Post merge fixes
  • Loading branch information
mateusz-sekara authored Nov 29, 2023
1 parent de6c45e commit 99071cd
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 52 deletions.
6 changes: 5 additions & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (disabled) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash,
return nil, ErrDisabled
}

func (d disabled) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
func (d disabled) IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}

Expand Down Expand Up @@ -106,3 +106,7 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A
func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs Confirmations, qopts ...pg.QOpt) (int64, error) {
return 0, ErrDisabled
}

func (d disabled) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}
20 changes: 17 additions & 3 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ type LogPoller interface {
IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
}

type Confirmations int
Expand Down Expand Up @@ -969,8 +970,8 @@ func (lp *logPoller) IndexedLogsCreatedAfter(eventSig common.Hash, address commo
return lp.orm.SelectIndexedLogsCreatedAfter(address, eventSig, topicIndex, topicValues, after, confs, qopts...)
}

func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectIndexedLogsByTxHash(eventSig, txHash, qopts...)
func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectIndexedLogsByTxHash(address, eventSig, txHash, qopts...)
}

// LogsDataWordGreaterThan note index is 0 based.
Expand Down Expand Up @@ -1021,6 +1022,19 @@ func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, event
return lp.orm.SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...)
}

// LogsDataWordBetween retrieves a slice of Log records that match specific criteria.
// Besides generic filters like eventSig, address and confs, it also verifies data content against wordValue
// data[wordIndexMin] <= wordValue <= data[wordIndexMax].
//
// Passing the same value for wordIndexMin and wordIndexMax will check the equality of the wordValue at that index.
// Leading to returning logs matching: data[wordIndexMin] == wordValue.
//
// This function is particularly useful for filtering logs by data word values and their positions within the event data.
// It returns an empty slice if no logs match the provided criteria.
func (lp *logPoller) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectLogsDataWordBetween(address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs, qopts...)
}

// GetBlocksRange tries to get the specified block numbers from the log pollers
// blocks table. It falls back to the RPC for any unfulfilled requested blocks.
func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {
Expand Down
51 changes: 42 additions & 9 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 68 additions & 23 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

type queryType string

const (
create queryType = "create"
read queryType = "read"
del queryType = "delete"
)

var (
sqlLatencyBuckets = []float64{
float64(1 * time.Millisecond),
Expand Down Expand Up @@ -41,47 +49,63 @@ var (
Name: "log_poller_query_duration",
Help: "Measures duration of Log Poller's queries fetching logs",
Buckets: sqlLatencyBuckets,
}, []string{"evmChainID", "query"})
}, []string{"evmChainID", "query", "type"})
lpQueryDataSets = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "log_poller_query_dataset_size",
Help: "Measures size of the datasets returned by Log Poller's queries",
}, []string{"evmChainID", "query"})
}, []string{"evmChainID", "query", "type"})
lpLogsInserted = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "log_poller_logs_inserted",
Help: "Counter to track number of logs inserted by Log Poller",
}, []string{"evmChainID"})
lpBlockInserted = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "log_poller_blocks_inserted",
Help: "Counter to track number of blocks inserted by Log Poller",
}, []string{"evmChainID"})
)

// ObservedORM is a decorator layer for ORM used by LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for the queries.
// It doesn't change internal logic, because all calls are delegated to the origin ORM
type ObservedORM struct {
ORM
queryDuration *prometheus.HistogramVec
datasetSize *prometheus.GaugeVec
chainId string
queryDuration *prometheus.HistogramVec
datasetSize *prometheus.GaugeVec
logsInserted *prometheus.CounterVec
blocksInserted *prometheus.CounterVec
chainId string
}

// NewObservedORM creates an observed version of log poller's ORM created by NewORM
// Please see ObservedLogPoller for more details on how latencies are measured
func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *ObservedORM {
return &ObservedORM{
ORM: NewORM(chainID, db, lggr, cfg),
queryDuration: lpQueryDuration,
datasetSize: lpQueryDataSets,
chainId: chainID.String(),
ORM: NewORM(chainID, db, lggr, cfg),
queryDuration: lpQueryDuration,
datasetSize: lpQueryDataSets,
logsInserted: lpLogsInserted,
blocksInserted: lpBlockInserted,
chainId: chainID.String(),
}
}

func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogs", func() error {
err := withObservedExec(o, "InsertLogs", create, func() error {
return o.ORM.InsertLogs(logs, qopts...)
})
trackInsertedLogsAndBlock(o, logs, nil, err)
return err
}

func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogsWithBlock", func() error {
err := withObservedExec(o, "InsertLogsWithBlock", create, func() error {
return o.ORM.InsertLogsWithBlock(logs, block, qopts...)
})
trackInsertedLogsAndBlock(o, logs, &block, err)
return err
}

func (o *ObservedORM) InsertFilter(filter Filter, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertFilter", func() error {
return withObservedExec(o, "InsertFilter", create, func() error {
return o.ORM.InsertFilter(filter, qopts...)
})
}
Expand All @@ -93,25 +117,25 @@ func (o *ObservedORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) {
}

func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteFilter", func() error {
return withObservedExec(o, "DeleteFilter", del, func() error {
return o.ORM.DeleteFilter(name, qopts...)
})
}

func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksBefore", func() error {
return withObservedExec(o, "DeleteBlocksBefore", del, func() error {
return o.ORM.DeleteBlocksBefore(end, qopts...)
})
}

func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error {
return withObservedExec(o, "DeleteLogsAndBlocksAfter", del, func() error {
return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...)
})
}

func (o *ObservedORM) DeleteExpiredLogs(qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteExpiredLogs", func() error {
return withObservedExec(o, "DeleteExpiredLogs", del, func() error {
return o.ORM.DeleteExpiredLogs(qopts...)
})
}
Expand Down Expand Up @@ -176,9 +200,9 @@ func (o *ObservedORM) SelectLogs(start, end int64, address common.Address, event
})
}

func (o *ObservedORM) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogsByTxHash", func() ([]Log, error) {
return o.ORM.SelectIndexedLogsByTxHash(eventSig, txHash, qopts...)
func (o *ObservedORM) SelectIndexedLogsByTxHash(address common.Address, eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "SelectIndexedLogsByTxHash", func() ([]Log, error) {
return o.ORM.SelectIndexedLogsByTxHash(address, eventSig, txHash, qopts...)
})
}

Expand Down Expand Up @@ -212,6 +236,12 @@ func (o *ObservedORM) SelectLogsDataWordGreaterThan(address common.Address, even
})
}

func (o *ObservedORM) SelectLogsDataWordBetween(address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "SelectLogsDataWordBetween", func() ([]Log, error) {
return o.ORM.SelectLogsDataWordBetween(address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs, qopts...)
})
}

func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "SelectIndexedLogsTopicGreaterThan", func() ([]Log, error) {
return o.ORM.SelectIndexedLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...)
Expand All @@ -228,7 +258,7 @@ func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query
results, err := withObservedQuery(o, queryName, query)
if err == nil {
o.datasetSize.
WithLabelValues(o.chainId, queryName).
WithLabelValues(o.chainId, queryName, string(read)).
Set(float64(len(results)))
}
return results, err
Expand All @@ -238,18 +268,33 @@ func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T,
queryStarted := time.Now()
defer func() {
o.queryDuration.
WithLabelValues(o.chainId, queryName).
WithLabelValues(o.chainId, queryName, string(read)).
Observe(float64(time.Since(queryStarted)))
}()
return query()
}

func withObservedExec(o *ObservedORM, query string, exec func() error) error {
func withObservedExec(o *ObservedORM, query string, queryType queryType, exec func() error) error {
queryStarted := time.Now()
defer func() {
o.queryDuration.
WithLabelValues(o.chainId, query).
WithLabelValues(o.chainId, query, string(queryType)).
Observe(float64(time.Since(queryStarted)))
}()
return exec()
}

func trackInsertedLogsAndBlock(o *ObservedORM, logs []Log, block *LogPollerBlock, err error) {
if err != nil {
return
}
o.logsInserted.
WithLabelValues(o.chainId).
Add(float64(len(logs)))

if block != nil {
o.blocksInserted.
WithLabelValues(o.chainId).
Inc()
}
}
Loading

0 comments on commit 99071cd

Please sign in to comment.