Skip to content

Commit

Permalink
Exposing ability to pass any query to the LogPoller
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Nov 13, 2023
1 parent 75b12e7 commit 71ddc06
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 124 deletions.
8 changes: 8 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,11 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A
func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs Confirmations, qopts ...pg.QOpt) (int64, error) {
return 0, ErrDisabled
}

func (d disabled) FetchAnyLogs(queryName string, query string, args QueryArgs, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) FetchAnyResults(queryName string, query string, args QueryArgs, dest interface{}, qopts ...pg.QOpt) error {
return ErrDisabled
}
18 changes: 18 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ type LogPoller interface {
IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs Confirmations, qopts ...pg.QOpt) ([]Log, error)

// FetchAnyLogs gets any logs matching the query and provided args. Data is decoded and returned as slice of Log.
// queryName is used for monitoring and debugging purposes. It's also pushed as a label to Prometheus, so use
// a fixed set of values for it (e.g. don't generate new value with every call).
// Otherwise, it will increase cardinality and affect Prometheus performance
FetchAnyLogs(queryName string, query string, args QueryArgs, qopts ...pg.QOpt) ([]Log, error)
// FetchAnyResults similar to FetchAnyLogs will get data from LogPoller but will unmarshall that according to the type passed as dest.
// This gives user more flexibility when using advanced queries that not necessary match Log struct. (e.g. pick only a single value from the log)
// Results will be returned in a dest interface{}, it's up to user to pass struct fitting the query result.
FetchAnyResults(queryName string, query string, args QueryArgs, dest interface{}, qopts ...pg.QOpt) error
}

type Confirmations int
Expand Down Expand Up @@ -1172,6 +1182,14 @@ func (lp *logPoller) IndexedLogsWithSigsExcluding(address common.Address, eventS
return lp.orm.SelectIndexedLogsWithSigsExcluding(eventSigA, eventSigB, topicIndex, address, fromBlock, toBlock, confs, qopts...)
}

func (lp *logPoller) FetchAnyLogs(queryName string, query string, args QueryArgs, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectAnyLogs(queryName, query, args, qopts...)
}

func (lp *logPoller) FetchAnyResults(queryName string, query string, args QueryArgs, dest interface{}, qopts ...pg.QOpt) error {
return lp.orm.SelectAnyRows(queryName, query, args, dest, qopts...)
}

func EvmWord(i uint64) common.Hash {
var b = make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
Expand Down
54 changes: 54 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(address common.Address, eventS
})
}

func (o *ObservedORM) SelectAnyLogs(queryName string, query string, args QueryArgs, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, queryName, func() ([]Log, error) {
return o.ORM.SelectAnyLogs(queryName, query, args, qopts...)
})
}

func (o *ObservedORM) SelectAnyRows(queryName string, query string, args QueryArgs, dest interface{}, qopts ...pg.QOpt) error {
return withObservedQueryAndMaybeResults(o, queryName, func() (interface{}, error) {
return dest, o.ORM.SelectAnyRows(queryName, query, args, dest, qopts...)
})
}

func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) {
results, err := withObservedQuery(o, queryName, query)
if err == nil {
Expand All @@ -234,6 +246,20 @@ func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query
return results, err
}

func withObservedQueryAndMaybeResults(o *ObservedORM, queryName string, query func() (interface{}, error)) error {
results, err := withObservedQuery(o, queryName, query)
if err != nil {
return err
}

if resultsSlice, ok := results.([]interface{}); ok {
o.datasetSize.
WithLabelValues(o.chainId, queryName).
Set(float64(len(resultsSlice)))
}
return nil
}

func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T, error)) (T, error) {
queryStarted := time.Now()
defer func() {
Expand Down
Loading

0 comments on commit 71ddc06

Please sign in to comment.