From 99071cd7d8a8b2f13c31afb933af7e3a5615775f Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Wed, 29 Nov 2023 15:10:39 +0100 Subject: [PATCH] CCIP-1336 LogPoller - Bunch of minor improvements (#11348) * Tracking number of logs inserted by LP * Adding read query to LP * Index for fourth word in data * Regenerating mocks * Filter by address in SelectIndexedLogsByTxHash * Fix * Post review fixes * Post merge fixes --- core/chains/evm/logpoller/disabled.go | 6 +- core/chains/evm/logpoller/log_poller.go | 20 ++- core/chains/evm/logpoller/mocks/log_poller.go | 51 +++++-- core/chains/evm/logpoller/observability.go | 91 +++++++++--- .../evm/logpoller/observability_test.go | 72 ++++++++-- core/chains/evm/logpoller/orm.go | 35 ++++- core/chains/evm/logpoller/orm_test.go | 132 +++++++++++++++++- core/chains/evm/logpoller/query.go | 12 ++ .../0211_log_poller_word_indexes.sql | 6 + 9 files changed, 373 insertions(+), 52 deletions(-) create mode 100644 core/store/migrate/migrations/0211_log_poller_word_indexes.sql diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index b54d4e6fc84..05d591042f4 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -71,7 +71,7 @@ func (disabled) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, return nil, ErrDisabled } -func (d disabled) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { +func (d disabled) IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { return nil, ErrDisabled } @@ -106,3 +106,7 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs Confirmations, qopts ...pg.QOpt) (int64, error) { return 0, ErrDisabled } + +func (d disabled) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { + return nil, ErrDisabled +} diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 7c4ea66cec7..de1999da260 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -53,12 +53,13 @@ type LogPoller interface { IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) - IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) + IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) + LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) } type Confirmations int @@ -969,8 +970,8 @@ func (lp *logPoller) IndexedLogsCreatedAfter(eventSig common.Hash, address commo return lp.orm.SelectIndexedLogsCreatedAfter(address, eventSig, topicIndex, topicValues, after, confs, qopts...) } -func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectIndexedLogsByTxHash(eventSig, txHash, qopts...) +func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return lp.orm.SelectIndexedLogsByTxHash(address, eventSig, txHash, qopts...) } // LogsDataWordGreaterThan note index is 0 based. @@ -1021,6 +1022,19 @@ func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, event return lp.orm.SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...) } +// LogsDataWordBetween retrieves a slice of Log records that match specific criteria. +// Besides generic filters like eventSig, address and confs, it also verifies data content against wordValue +// data[wordIndexMin] <= wordValue <= data[wordIndexMax]. +// +// Passing the same value for wordIndexMin and wordIndexMax will check the equality of the wordValue at that index. +// Leading to returning logs matching: data[wordIndexMin] == wordValue. +// +// This function is particularly useful for filtering logs by data word values and their positions within the event data. +// It returns an empty slice if no logs match the provided criteria. +func (lp *logPoller) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { + return lp.orm.SelectLogsDataWordBetween(address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) +} + // GetBlocksRange tries to get the specified block numbers from the log pollers // blocks table. It falls back to the RPC for any unfulfilled requested blocks. func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) { diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 01be5f7ba55..fe4ccc965cc 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -164,32 +164,32 @@ func (_m *LogPoller) IndexedLogsByBlockRange(start int64, end int64, eventSig co return r0, r1 } -// IndexedLogsByTxHash provides a mock function with given fields: eventSig, txHash, qopts -func (_m *LogPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]logpoller.Log, error) { +// IndexedLogsByTxHash provides a mock function with given fields: eventSig, address, txHash, qopts +func (_m *LogPoller) IndexedLogsByTxHash(eventSig common.Hash, address common.Address, txHash common.Hash, qopts ...pg.QOpt) ([]logpoller.Log, error) { _va := make([]interface{}, len(qopts)) for _i := range qopts { _va[_i] = qopts[_i] } var _ca []interface{} - _ca = append(_ca, eventSig, txHash) + _ca = append(_ca, eventSig, address, txHash) _ca = append(_ca, _va...) ret := _m.Called(_ca...) var r0 []logpoller.Log var r1 error - if rf, ok := ret.Get(0).(func(common.Hash, common.Hash, ...pg.QOpt) ([]logpoller.Log, error)); ok { - return rf(eventSig, txHash, qopts...) + if rf, ok := ret.Get(0).(func(common.Hash, common.Address, common.Hash, ...pg.QOpt) ([]logpoller.Log, error)); ok { + return rf(eventSig, address, txHash, qopts...) } - if rf, ok := ret.Get(0).(func(common.Hash, common.Hash, ...pg.QOpt) []logpoller.Log); ok { - r0 = rf(eventSig, txHash, qopts...) + if rf, ok := ret.Get(0).(func(common.Hash, common.Address, common.Hash, ...pg.QOpt) []logpoller.Log); ok { + r0 = rf(eventSig, address, txHash, qopts...) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]logpoller.Log) } } - if rf, ok := ret.Get(1).(func(common.Hash, common.Hash, ...pg.QOpt) error); ok { - r1 = rf(eventSig, txHash, qopts...) + if rf, ok := ret.Get(1).(func(common.Hash, common.Address, common.Hash, ...pg.QOpt) error); ok { + r1 = rf(eventSig, address, txHash, qopts...) } else { r1 = ret.Error(1) } @@ -522,6 +522,39 @@ func (_m *LogPoller) LogsCreatedAfter(eventSig common.Hash, address common.Addre return r0, r1 } +// LogsDataWordBetween provides a mock function with given fields: eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs, qopts +func (_m *LogPoller) LogsDataWordBetween(eventSig common.Hash, address common.Address, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs logpoller.Confirmations, qopts ...pg.QOpt) ([]logpoller.Log, error) { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 []logpoller.Log + var r1 error + if rf, ok := ret.Get(0).(func(common.Hash, common.Address, int, int, common.Hash, logpoller.Confirmations, ...pg.QOpt) ([]logpoller.Log, error)); ok { + return rf(eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) + } + if rf, ok := ret.Get(0).(func(common.Hash, common.Address, int, int, common.Hash, logpoller.Confirmations, ...pg.QOpt) []logpoller.Log); ok { + r0 = rf(eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]logpoller.Log) + } + } + + if rf, ok := ret.Get(1).(func(common.Hash, common.Address, int, int, common.Hash, logpoller.Confirmations, ...pg.QOpt) error); ok { + r1 = rf(eventSig, address, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LogsDataWordGreaterThan provides a mock function with given fields: eventSig, address, wordIndex, wordValueMin, confs, qopts func (_m *LogPoller) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs logpoller.Confirmations, qopts ...pg.QOpt) ([]logpoller.Log, error) { _va := make([]interface{}, len(qopts)) diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 9826d503b9f..a7a0d3c03d5 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -14,6 +14,14 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) +type queryType string + +const ( + create queryType = "create" + read queryType = "read" + del queryType = "delete" +) + var ( sqlLatencyBuckets = []float64{ float64(1 * time.Millisecond), @@ -41,47 +49,63 @@ var ( Name: "log_poller_query_duration", Help: "Measures duration of Log Poller's queries fetching logs", Buckets: sqlLatencyBuckets, - }, []string{"evmChainID", "query"}) + }, []string{"evmChainID", "query", "type"}) lpQueryDataSets = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "log_poller_query_dataset_size", Help: "Measures size of the datasets returned by Log Poller's queries", - }, []string{"evmChainID", "query"}) + }, []string{"evmChainID", "query", "type"}) + lpLogsInserted = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "log_poller_logs_inserted", + Help: "Counter to track number of logs inserted by Log Poller", + }, []string{"evmChainID"}) + lpBlockInserted = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "log_poller_blocks_inserted", + Help: "Counter to track number of blocks inserted by Log Poller", + }, []string{"evmChainID"}) ) // ObservedORM is a decorator layer for ORM used by LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for the queries. // It doesn't change internal logic, because all calls are delegated to the origin ORM type ObservedORM struct { ORM - queryDuration *prometheus.HistogramVec - datasetSize *prometheus.GaugeVec - chainId string + queryDuration *prometheus.HistogramVec + datasetSize *prometheus.GaugeVec + logsInserted *prometheus.CounterVec + blocksInserted *prometheus.CounterVec + chainId string } // NewObservedORM creates an observed version of log poller's ORM created by NewORM // Please see ObservedLogPoller for more details on how latencies are measured func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *ObservedORM { return &ObservedORM{ - ORM: NewORM(chainID, db, lggr, cfg), - queryDuration: lpQueryDuration, - datasetSize: lpQueryDataSets, - chainId: chainID.String(), + ORM: NewORM(chainID, db, lggr, cfg), + queryDuration: lpQueryDuration, + datasetSize: lpQueryDataSets, + logsInserted: lpLogsInserted, + blocksInserted: lpBlockInserted, + chainId: chainID.String(), } } func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { - return withObservedExec(o, "InsertLogs", func() error { + err := withObservedExec(o, "InsertLogs", create, func() error { return o.ORM.InsertLogs(logs, qopts...) }) + trackInsertedLogsAndBlock(o, logs, nil, err) + return err } func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error { - return withObservedExec(o, "InsertLogsWithBlock", func() error { + err := withObservedExec(o, "InsertLogsWithBlock", create, func() error { return o.ORM.InsertLogsWithBlock(logs, block, qopts...) }) + trackInsertedLogsAndBlock(o, logs, &block, err) + return err } func (o *ObservedORM) InsertFilter(filter Filter, qopts ...pg.QOpt) error { - return withObservedExec(o, "InsertFilter", func() error { + return withObservedExec(o, "InsertFilter", create, func() error { return o.ORM.InsertFilter(filter, qopts...) }) } @@ -93,25 +117,25 @@ func (o *ObservedORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) { } func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error { - return withObservedExec(o, "DeleteFilter", func() error { + return withObservedExec(o, "DeleteFilter", del, func() error { return o.ORM.DeleteFilter(name, qopts...) }) } func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { - return withObservedExec(o, "DeleteBlocksBefore", func() error { + return withObservedExec(o, "DeleteBlocksBefore", del, func() error { return o.ORM.DeleteBlocksBefore(end, qopts...) }) } func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error { - return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error { + return withObservedExec(o, "DeleteLogsAndBlocksAfter", del, func() error { return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...) }) } func (o *ObservedORM) DeleteExpiredLogs(qopts ...pg.QOpt) error { - return withObservedExec(o, "DeleteExpiredLogs", func() error { + return withObservedExec(o, "DeleteExpiredLogs", del, func() error { return o.ORM.DeleteExpiredLogs(qopts...) }) } @@ -176,9 +200,9 @@ func (o *ObservedORM) SelectLogs(start, end int64, address common.Address, event }) } -func (o *ObservedORM) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "IndexedLogsByTxHash", func() ([]Log, error) { - return o.ORM.SelectIndexedLogsByTxHash(eventSig, txHash, qopts...) +func (o *ObservedORM) SelectIndexedLogsByTxHash(address common.Address, eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectIndexedLogsByTxHash", func() ([]Log, error) { + return o.ORM.SelectIndexedLogsByTxHash(address, eventSig, txHash, qopts...) }) } @@ -212,6 +236,12 @@ func (o *ObservedORM) SelectLogsDataWordGreaterThan(address common.Address, even }) } +func (o *ObservedORM) SelectLogsDataWordBetween(address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLogsDataWordBetween", func() ([]Log, error) { + return o.ORM.SelectLogsDataWordBetween(address, eventSig, wordIndexMin, wordIndexMax, wordValue, confs, qopts...) + }) +} + func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { return withObservedQueryAndResults(o, "SelectIndexedLogsTopicGreaterThan", func() ([]Log, error) { return o.ORM.SelectIndexedLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...) @@ -228,7 +258,7 @@ func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query results, err := withObservedQuery(o, queryName, query) if err == nil { o.datasetSize. - WithLabelValues(o.chainId, queryName). + WithLabelValues(o.chainId, queryName, string(read)). Set(float64(len(results))) } return results, err @@ -238,18 +268,33 @@ func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T, queryStarted := time.Now() defer func() { o.queryDuration. - WithLabelValues(o.chainId, queryName). + WithLabelValues(o.chainId, queryName, string(read)). Observe(float64(time.Since(queryStarted))) }() return query() } -func withObservedExec(o *ObservedORM, query string, exec func() error) error { +func withObservedExec(o *ObservedORM, query string, queryType queryType, exec func() error) error { queryStarted := time.Now() defer func() { o.queryDuration. - WithLabelValues(o.chainId, query). + WithLabelValues(o.chainId, query, string(queryType)). Observe(float64(time.Since(queryStarted))) }() return exec() } + +func trackInsertedLogsAndBlock(o *ObservedORM, logs []Log, block *LogPollerBlock, err error) { + if err != nil { + return + } + o.logsInserted. + WithLabelValues(o.chainId). + Add(float64(len(logs))) + + if block != nil { + o.blocksInserted. + WithLabelValues(o.chainId). + Inc() + } +} diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 83bd60a5564..76575f46ca4 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/prometheus/client_golang/prometheus" io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -18,6 +19,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) func TestMultipleMetricsArePublished(t *testing.T) { @@ -54,7 +56,7 @@ func TestShouldPublishDurationInCaseOfError(t *testing.T) { require.Error(t, err) require.Equal(t, 1, testutil.CollectAndCount(orm.queryDuration)) - require.Equal(t, 1, counterFromHistogramByLabels(t, orm.queryDuration, "200", "SelectLatestLogByEventSigWithConfs")) + require.Equal(t, 1, counterFromHistogramByLabels(t, orm.queryDuration, "200", "SelectLatestLogByEventSigWithConfs", "read")) } func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) { @@ -68,14 +70,14 @@ func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) { require.NoError(t, err) } - require.Equal(t, expectedCount, counterFromHistogramByLabels(t, orm.queryDuration, "420", "query")) - require.Equal(t, expectedSize, counterFromGaugeByLabels(orm.datasetSize, "420", "query")) + require.Equal(t, expectedCount, counterFromHistogramByLabels(t, orm.queryDuration, "420", "query", "read")) + require.Equal(t, expectedSize, counterFromGaugeByLabels(orm.datasetSize, "420", "query", "read")) - require.Equal(t, 0, counterFromHistogramByLabels(t, orm.queryDuration, "420", "other_query")) - require.Equal(t, 0, counterFromHistogramByLabels(t, orm.queryDuration, "5", "query")) + require.Equal(t, 0, counterFromHistogramByLabels(t, orm.queryDuration, "420", "other_query", "read")) + require.Equal(t, 0, counterFromHistogramByLabels(t, orm.queryDuration, "5", "query", "read")) - require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "other_query")) - require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "5", "query")) + require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "other_query", "read")) + require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "5", "query", "read")) } func TestNotPublishingDatasetSizeInCaseOfError(t *testing.T) { @@ -84,16 +86,60 @@ func TestNotPublishingDatasetSizeInCaseOfError(t *testing.T) { _, err := withObservedQueryAndResults(orm, "errorQuery", func() ([]string, error) { return nil, fmt.Errorf("error") }) require.Error(t, err) - require.Equal(t, 1, counterFromHistogramByLabels(t, orm.queryDuration, "420", "errorQuery")) - require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "errorQuery")) + require.Equal(t, 1, counterFromHistogramByLabels(t, orm.queryDuration, "420", "errorQuery", "read")) + require.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "errorQuery", "read")) } func TestMetricsAreProperlyPopulatedForWrites(t *testing.T) { orm := createObservedORM(t, 420) - require.NoError(t, withObservedExec(orm, "execQuery", func() error { return nil })) - require.Error(t, withObservedExec(orm, "execQuery", func() error { return fmt.Errorf("error") })) + require.NoError(t, withObservedExec(orm, "execQuery", create, func() error { return nil })) + require.Error(t, withObservedExec(orm, "execQuery", create, func() error { return fmt.Errorf("error") })) - require.Equal(t, 2, counterFromHistogramByLabels(t, orm.queryDuration, "420", "execQuery")) + require.Equal(t, 2, counterFromHistogramByLabels(t, orm.queryDuration, "420", "execQuery", "create")) +} + +func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { + orm := createObservedORM(t, 420) + logs := generateRandomLogs(420, 20) + + // First insert 10 logs + require.NoError(t, orm.InsertLogs(logs[:10])) + assert.Equal(t, float64(10), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) + + // Insert 5 more logs with block + require.NoError(t, orm.InsertLogsWithBlock(logs[10:15], NewLogPollerBlock(utils.RandomBytes32(), 10, time.Now(), 5))) + assert.Equal(t, float64(15), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) + assert.Equal(t, float64(1), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420"))) + + // Insert 5 more logs with block + require.NoError(t, orm.InsertLogsWithBlock(logs[15:], NewLogPollerBlock(utils.RandomBytes32(), 15, time.Now(), 5))) + assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) + assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420"))) + + // Don't update counters in case of an error + require.Error(t, orm.InsertLogsWithBlock(logs, NewLogPollerBlock(utils.RandomBytes32(), 0, time.Now(), 0))) + assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) + assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420"))) +} + +func generateRandomLogs(chainId, count int) []Log { + logs := make([]Log, count) + for i := range logs { + logs[i] = Log{ + EvmChainId: utils.NewBigI(int64(chainId)), + LogIndex: int64(i + 1), + BlockHash: utils.RandomBytes32(), + BlockNumber: int64(i + 1), + BlockTimestamp: time.Now(), + Topics: [][]byte{}, + EventSig: utils.RandomBytes32(), + Address: utils.RandomAddress(), + TxHash: utils.RandomBytes32(), + Data: []byte{}, + CreatedAt: time.Now(), + } + } + return logs } func createObservedORM(t *testing.T, chainId int64) *ObservedORM { @@ -107,6 +153,8 @@ func createObservedORM(t *testing.T, chainId int64) *ObservedORM { func resetMetrics(lp ObservedORM) { lp.queryDuration.Reset() lp.datasetSize.Reset() + lp.logsInserted.Reset() + lp.blocksInserted.Reset() } func counterFromGaugeByLabels(gaugeVec *prometheus.GaugeVec, labels ...string) int { diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index c6134ed4b69..c24d423b253 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -48,9 +48,10 @@ type ORM interface { SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) SelectIndexedLogsTopicRange(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) - SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) + SelectIndexedLogsByTxHash(address common.Address, eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) SelectLogsDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) SelectLogsDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) + SelectLogsDataWordBetween(address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) } type DbORM struct { @@ -535,6 +536,32 @@ func (o *DbORM) SelectLogsDataWordGreaterThan(address common.Address, eventSig c return logs, nil } +func (o *DbORM) SelectLogsDataWordBetween(address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { + args, err := newQueryArgsForEvent(o.chainID, address, eventSig). + withWordIndexMin(wordIndexMin). + withWordIndexMax(wordIndexMax). + withWordValue(wordValue). + withConfs(confs). + toArgs() + if err != nil { + return nil, err + } + query := fmt.Sprintf(` + SELECT * FROM evm.logs + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index_min+1 for 32) <= :word_value + AND substring(data from 32*:word_index_max+1 for 32) >= :word_value + AND block_number <= %s + ORDER BY (block_number, log_index)`, nestedBlockNumberQuery(confs)) + var logs []Log + if err = o.q.WithOpts(qopts...).SelectNamed(&logs, query, args); err != nil { + return nil, err + } + return logs, nil +} + func (o *DbORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withTopicIndex(topicIndex). @@ -664,9 +691,10 @@ func (o *DbORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig c return logs, nil } -func (o *DbORM) SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectIndexedLogsByTxHash(address common.Address, eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { args, err := newQueryArgs(o.chainID). withTxHash(txHash). + withAddress(address). withEventSig(eventSig). toArgs() if err != nil { @@ -676,8 +704,9 @@ func (o *DbORM) SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Ha err = o.q.WithOpts(qopts...).SelectNamed(&logs, ` SELECT * FROM evm.logs WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig AND tx_hash = :tx_hash - AND event_sig = :event_sig ORDER BY (block_number, log_index)`, args) if err != nil { return nil, err diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index e55ebeccecf..2a6ebb2c04e 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight" @@ -49,6 +50,21 @@ func GenLogWithTimestamp(chainID *big.Int, logIndex int64, blockNum int64, block } } +func GenLogWithData(chainID *big.Int, address common.Address, eventSig common.Hash, logIndex int64, blockNum int64, data []byte) logpoller.Log { + return logpoller.Log{ + EvmChainId: utils.NewBig(chainID), + LogIndex: logIndex, + BlockHash: utils.RandomBytes32(), + BlockNumber: blockNum, + EventSig: eventSig, + Topics: [][]byte{}, + Address: address, + TxHash: utils.RandomBytes32(), + Data: data, + BlockTimestamp: time.Now(), + } +} + func TestLogPoller_Batching(t *testing.T) { t.Parallel() th := SetupTH(t, false, 2, 3, 2, 1000) @@ -568,7 +584,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) { } require.NoError(t, o1.InsertLogs(logs)) - retrievedLogs, err := o1.SelectIndexedLogsByTxHash(eventSig, txHash) + retrievedLogs, err := o1.SelectIndexedLogsByTxHash(addr, eventSig, txHash) require.NoError(t, err) require.Equal(t, 2, len(retrievedLogs)) @@ -1434,3 +1450,117 @@ func TestInsertLogsInTx(t *testing.T) { }) } } + +func TestSelectLogsDataWordBetween(t *testing.T) { + address := utils.RandomAddress() + eventSig := utils.RandomBytes32() + th := SetupTH(t, false, 2, 3, 2, 1000) + + firstLogData := make([]byte, 0, 64) + firstLogData = append(firstLogData, logpoller.EvmWord(1).Bytes()...) + firstLogData = append(firstLogData, logpoller.EvmWord(10).Bytes()...) + + secondLogData := make([]byte, 0, 64) + secondLogData = append(secondLogData, logpoller.EvmWord(5).Bytes()...) + secondLogData = append(secondLogData, logpoller.EvmWord(20).Bytes()...) + + err := th.ORM.InsertLogsWithBlock( + []logpoller.Log{ + GenLogWithData(th.ChainID, address, eventSig, 1, 1, firstLogData), + GenLogWithData(th.ChainID, address, eventSig, 2, 2, secondLogData), + }, + logpoller.NewLogPollerBlock(utils.RandomBytes32(), 10, time.Now(), 1), + ) + require.NoError(t, err) + + tests := []struct { + name string + wordValue uint64 + expectedLogs []int64 + }{ + { + name: "returns only first log", + wordValue: 2, + expectedLogs: []int64{1}, + }, + { + name: "returns only second log", + wordValue: 11, + expectedLogs: []int64{2}, + }, + { + name: "returns both logs if word value is between", + wordValue: 5, + expectedLogs: []int64{1, 2}, + }, + { + name: "returns no logs if word value is outside of the range", + wordValue: 21, + expectedLogs: []int64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logs, err1 := th.ORM.SelectLogsDataWordBetween(address, eventSig, 0, 1, logpoller.EvmWord(tt.wordValue), logpoller.Unconfirmed) + assert.NoError(t, err1) + assert.Len(t, logs, len(tt.expectedLogs)) + + for index := range logs { + assert.Equal(t, tt.expectedLogs[index], logs[index].BlockNumber) + } + }) + } +} + +func Benchmark_LogsDataWordBetween(b *testing.B) { + chainId := big.NewInt(137) + _, db := heavyweight.FullTestDBV2(b, nil) + o := logpoller.NewORM(chainId, db, logger.Test(b), pgtest.NewQConfig(false)) + + numberOfReports := 100_000 + numberOfMessagesPerReport := 256 + + commitStoreAddress := utils.RandomAddress() + commitReportAccepted := utils.RandomBytes32() + + var dbLogs []logpoller.Log + for i := 0; i < numberOfReports; i++ { + data := make([]byte, 64) + // MinSeqNr + data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*i+1)).Bytes()...) + // MaxSeqNr + data = append(data, logpoller.EvmWord(uint64(numberOfMessagesPerReport*(i+1))).Bytes()...) + + dbLogs = append(dbLogs, logpoller.Log{ + EvmChainId: utils.NewBig(chainId), + LogIndex: int64(i + 1), + BlockHash: utils.RandomBytes32(), + BlockNumber: int64(i + 1), + BlockTimestamp: time.Now(), + EventSig: commitReportAccepted, + Topics: [][]byte{}, + Address: commitStoreAddress, + TxHash: utils.RandomAddress().Hash(), + Data: data, + CreatedAt: time.Now(), + }) + } + require.NoError(b, o.InsertBlock(utils.RandomAddress().Hash(), int64(numberOfReports*numberOfMessagesPerReport), time.Now(), int64(numberOfReports*numberOfMessagesPerReport))) + require.NoError(b, o.InsertLogs(dbLogs)) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + logs, err := o.SelectLogsDataWordBetween( + commitStoreAddress, + commitReportAccepted, + 2, + 3, + logpoller.EvmWord(uint64(numberOfReports*numberOfMessagesPerReport/2)), // Pick the middle report + logpoller.Unconfirmed, + ) + assert.NoError(b, err) + assert.Len(b, logs, 1) + } +} diff --git a/core/chains/evm/logpoller/query.go b/core/chains/evm/logpoller/query.go index 7443a860a85..b7fbfade680 100644 --- a/core/chains/evm/logpoller/query.go +++ b/core/chains/evm/logpoller/query.go @@ -82,6 +82,18 @@ func (q *queryArgs) withWordValueMax(wordValueMax common.Hash) *queryArgs { return q.withCustomHashArg("word_value_max", wordValueMax) } +func (q *queryArgs) withWordIndexMin(wordIndex int) *queryArgs { + return q.withCustomArg("word_index_min", wordIndex) +} + +func (q *queryArgs) withWordIndexMax(wordIndex int) *queryArgs { + return q.withCustomArg("word_index_max", wordIndex) +} + +func (q *queryArgs) withWordValue(wordValue common.Hash) *queryArgs { + return q.withCustomHashArg("word_value", wordValue) +} + func (q *queryArgs) withConfs(confs Confirmations) *queryArgs { return q.withCustomArg("confs", confs) } diff --git a/core/store/migrate/migrations/0211_log_poller_word_indexes.sql b/core/store/migrate/migrations/0211_log_poller_word_indexes.sql new file mode 100644 index 00000000000..3d2e8bf8a4e --- /dev/null +++ b/core/store/migrate/migrations/0211_log_poller_word_indexes.sql @@ -0,0 +1,6 @@ +-- +goose Up +CREATE INDEX evm_logs_idx_data_word_four ON evm.logs (substring(data from 97 for 32)); + + +-- +goose Down +DROP INDEX IF EXISTS evm.evm_logs_idx_data_word_four;