Skip to content

Commit

Permalink
Add metrics to rpcLogStreamer (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas authored Oct 8, 2024
1 parent 0c977d6 commit 7790ba8
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 3 deletions.
10 changes: 9 additions & 1 deletion pkg/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/xmtp/xmtpd/pkg/metrics"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -132,10 +133,12 @@ func (r *RpcLogStreamer) getNextPage(
config contractConfig,
fromBlock int,
) (logs []types.Log, nextBlock *int, err error) {
contractAddress := config.contractAddress.Hex()
highestBlock, err := r.client.BlockNumber(r.ctx)
if err != nil {
return nil, nil, err
}
metrics.EmitCurrentBlock(contractAddress, int(highestBlock))

highestBlockCanProcess := int(highestBlock) - LAG_FROM_HIGHEST_BLOCK
numOfBlocksToProcess := highestBlockCanProcess - fromBlock + 1
Expand All @@ -152,13 +155,18 @@ func (r *RpcLogStreamer) getNextPage(

// TODO:(nm) Use some more clever tactics to fetch the maximum number of logs at one times by parsing error messages
// See: https://github.com/joshstevens19/rindexer/blob/master/core/src/indexer/fetch_logs.rs#L504
logs, err = r.client.FilterLogs(r.ctx, buildFilterQuery(config, int64(fromBlock), int64(to)))
logs, err = metrics.MeasureGetLogs(contractAddress, func() ([]types.Log, error) {
return r.client.FilterLogs(r.ctx, buildFilterQuery(config, int64(fromBlock), int64(to)))
})

if err != nil {
return nil, nil, err
}

nextBlockNumber := to + 1

metrics.EmitNumLogsFound(contractAddress, len(logs))

return logs, &nextBlockNumber, nil
}

Expand Down
65 changes: 65 additions & 0 deletions pkg/metrics/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package metrics

import (
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
)

var numLogsFound = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtpd_log_streamer_logs",
Help: "Number of logs found by the log streamer",
},
[]string{"contract_address"},
)

var currentBlock = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "xmtpd_log_streamer_current_block",
Help: "Current block being processed by the log streamer",
},
[]string{"contract_address"},
)

var getLogsDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmtpd_log_streamer_get_logs_duration",
Help: "Duration of the get logs call",
Buckets: []float64{1, 10, 100, 500, 1000, 5000, 10000, 50000, 100000},
},
[]string{"contract_address"},
)

var getLogsRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtpd_log_streamer_get_logs_requests",
Help: "Number of get logs requests",
},
[]string{"contract_address", "success"},
)

func EmitNumLogsFound(contractAddress string, numLogs int) {
numLogsFound.With(prometheus.Labels{"contract_address": contractAddress}).Add(float64(numLogs))
}

func EmitCurrentBlock(contractAddress string, block int) {
currentBlock.With(prometheus.Labels{"contract_address": contractAddress}).Set(float64(block))
}

func EmitGetLogsDuration(contractAddress string, duration time.Duration) {
getLogsDuration.With(prometheus.Labels{"contract_address": contractAddress}).
Observe(float64(duration.Milliseconds()))
}

func MeasureGetLogs[Return any](contractAddress string, fn func() (Return, error)) (Return, error) {
start := time.Now()
ret, err := fn()
if err == nil {
EmitGetLogsDuration(contractAddress, time.Since(start))
}
getLogsRequests.With(prometheus.Labels{"contract_address": contractAddress, "success": strconv.FormatBool(err == nil)}).
Inc()
return ret, err
}
11 changes: 9 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package metrics
import (
"context"
"fmt"
"github.com/xmtp/xmtpd/pkg/tracing"
"net"
"net/http"

"github.com/xmtp/xmtpd/pkg/tracing"

"github.com/pires/go-proxyproto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -60,7 +61,13 @@ func (s *Server) Close() error {

func registerCollectors(reg prometheus.Registerer) {
//TODO: add metrics here
var cols []prometheus.Collector
cols := []prometheus.Collector{
numLogsFound,
currentBlock,
getLogsDuration,
getLogsRequests,
}

for _, col := range cols {
reg.MustRegister(col)
}
Expand Down

0 comments on commit 7790ba8

Please sign in to comment.