From 7790ba885dd3d4cd8d9fcde1aa6a6b09ec8059f3 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Tue, 8 Oct 2024 15:16:41 -0700 Subject: [PATCH] Add metrics to rpcLogStreamer (#197) --- pkg/blockchain/rpcLogStreamer.go | 10 ++++- pkg/metrics/indexer.go | 65 ++++++++++++++++++++++++++++++++ pkg/metrics/metrics.go | 11 +++++- 3 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 pkg/metrics/indexer.go diff --git a/pkg/blockchain/rpcLogStreamer.go b/pkg/blockchain/rpcLogStreamer.go index 75b8b3df..42ed576d 100644 --- a/pkg/blockchain/rpcLogStreamer.go +++ b/pkg/blockchain/rpcLogStreamer.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" + "github.com/xmtp/xmtpd/pkg/metrics" "go.uber.org/zap" ) @@ -132,10 +133,12 @@ func (r *RpcLogStreamer) getNextPage( config contractConfig, fromBlock int, ) (logs []types.Log, nextBlock *int, err error) { + contractAddress := config.contractAddress.Hex() highestBlock, err := r.client.BlockNumber(r.ctx) if err != nil { return nil, nil, err } + metrics.EmitCurrentBlock(contractAddress, int(highestBlock)) highestBlockCanProcess := int(highestBlock) - LAG_FROM_HIGHEST_BLOCK numOfBlocksToProcess := highestBlockCanProcess - fromBlock + 1 @@ -152,13 +155,18 @@ func (r *RpcLogStreamer) getNextPage( // TODO:(nm) Use some more clever tactics to fetch the maximum number of logs at one times by parsing error messages // See: https://github.com/joshstevens19/rindexer/blob/master/core/src/indexer/fetch_logs.rs#L504 - logs, err = r.client.FilterLogs(r.ctx, buildFilterQuery(config, int64(fromBlock), int64(to))) + logs, err = metrics.MeasureGetLogs(contractAddress, func() ([]types.Log, error) { + return r.client.FilterLogs(r.ctx, buildFilterQuery(config, int64(fromBlock), int64(to))) + }) + if err != nil { return nil, nil, err } nextBlockNumber := to + 1 + metrics.EmitNumLogsFound(contractAddress, len(logs)) + return logs, &nextBlockNumber, nil } diff --git a/pkg/metrics/indexer.go b/pkg/metrics/indexer.go new file mode 100644 index 00000000..2eeeb0fb --- /dev/null +++ b/pkg/metrics/indexer.go @@ -0,0 +1,65 @@ +package metrics + +import ( + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var numLogsFound = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "xmtpd_log_streamer_logs", + Help: "Number of logs found by the log streamer", + }, + []string{"contract_address"}, +) + +var currentBlock = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "xmtpd_log_streamer_current_block", + Help: "Current block being processed by the log streamer", + }, + []string{"contract_address"}, +) + +var getLogsDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "xmtpd_log_streamer_get_logs_duration", + Help: "Duration of the get logs call", + Buckets: []float64{1, 10, 100, 500, 1000, 5000, 10000, 50000, 100000}, + }, + []string{"contract_address"}, +) + +var getLogsRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "xmtpd_log_streamer_get_logs_requests", + Help: "Number of get logs requests", + }, + []string{"contract_address", "success"}, +) + +func EmitNumLogsFound(contractAddress string, numLogs int) { + numLogsFound.With(prometheus.Labels{"contract_address": contractAddress}).Add(float64(numLogs)) +} + +func EmitCurrentBlock(contractAddress string, block int) { + currentBlock.With(prometheus.Labels{"contract_address": contractAddress}).Set(float64(block)) +} + +func EmitGetLogsDuration(contractAddress string, duration time.Duration) { + getLogsDuration.With(prometheus.Labels{"contract_address": contractAddress}). + Observe(float64(duration.Milliseconds())) +} + +func MeasureGetLogs[Return any](contractAddress string, fn func() (Return, error)) (Return, error) { + start := time.Now() + ret, err := fn() + if err == nil { + EmitGetLogsDuration(contractAddress, time.Since(start)) + } + getLogsRequests.With(prometheus.Labels{"contract_address": contractAddress, "success": strconv.FormatBool(err == nil)}). + Inc() + return ret, err +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 6b4818e3..0344e564 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -3,10 +3,11 @@ package metrics import ( "context" "fmt" - "github.com/xmtp/xmtpd/pkg/tracing" "net" "net/http" + "github.com/xmtp/xmtpd/pkg/tracing" + "github.com/pires/go-proxyproto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -60,7 +61,13 @@ func (s *Server) Close() error { func registerCollectors(reg prometheus.Registerer) { //TODO: add metrics here - var cols []prometheus.Collector + cols := []prometheus.Collector{ + numLogsFound, + currentBlock, + getLogsDuration, + getLogsRequests, + } + for _, col := range cols { reg.MustRegister(col) }