From 896553ac792cb6bf35dce747b57d6a163e9d6e19 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 6 Aug 2024 07:07:31 -0700 Subject: [PATCH] feat: peer message counts (#334) * feat: peer message counts * add remove peer messages --- cmd/p2p/sensor/sensor.go | 102 ++++++++++++++++++++++++++++++++++++--- p2p/log.go | 76 ++++++++++++++--------------- p2p/protocol.go | 20 ++++---- p2p/rlpx.go | 28 +++++------ 4 files changed, 158 insertions(+), 68 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 8dc484d4..93661669 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "os/signal" + "slices" "sync" "syscall" "time" @@ -17,6 +18,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/protocols/eth" ethp2p "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/nat" @@ -24,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" + dto "github.com/prometheus/client_model/go" "github.com/rs/zerolog/log" "github.com/spf13/cobra" @@ -180,7 +183,7 @@ var SensorCmd = &cobra.Command{ Namespace: "sensor", Name: "messages", Help: "The number and type of messages the sensor has received", - }, []string{"code", "message"}) + }, []string{"message", "url"}) opts := p2p.EthProtocolOptions{ Context: cmd.Context(), @@ -247,12 +250,15 @@ var SensorCmd = &cobra.Command{ peers[node.ID()] = node.URLv4() } - go handleAPI(&server) + go handleAPI(&server, msgCounter) for { select { case <-ticker.C: peersGauge.Set(float64(server.PeerCount())) + if err := removePeerMessages(msgCounter, server.Peers()); err != nil { + log.Error().Err(err).Msg("Failed to clean up peer messages") + } case peer := <-opts.Peers: // Update the peer list and the nodes file. if _, ok := peers[peer.ID()]; !ok { @@ -276,6 +282,10 @@ var SensorCmd = &cobra.Command{ }, } +// handlePprof starts a server for performance profiling using pprof on the +// specified port. This allows for real-time monitoring and analysis of the +// sensor's performance. The port number is configured through +// inputSensorParams.PprofPort. An error is logged if the server fails to start. func handlePprof() { addr := fmt.Sprintf(":%d", inputSensorParams.PprofPort) if err := http.ListenAndServe(addr, nil); err != nil { @@ -283,6 +293,11 @@ func handlePprof() { } } +// handlePrometheus starts a server to expose Prometheus metrics at the /metrics +// endpoint. This enables Prometheus to scrape and collect metrics data for +// monitoring purposes. The port number is configured through +// inputSensorParams.PrometheusPort. An error is logged if the server fails to +// start. func handlePrometheus() { http.Handle("/metrics", promhttp.Handler()) addr := fmt.Sprintf(":%d", inputSensorParams.PrometheusPort) @@ -291,7 +306,10 @@ func handlePrometheus() { } } -func handleAPI(server *ethp2p.Server) { +// handleAPI sets up the API for interacting with the sensor. The `/peers` +// endpoint returns a list of all peers connected to the sensor, including the +// types and counts of eth packets sent by each peer. +func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) { http.HandleFunc("/peers", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) @@ -301,12 +319,13 @@ func handleAPI(server *ethp2p.Server) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - urls := []string{} + peers := make(map[string]p2p.MessageCount) for _, peer := range server.Peers() { - urls = append(urls, peer.Node().URLv4()) + url := peer.Node().URLv4() + peers[url] = getPeerMessages(url, counter) } - err := json.NewEncoder(w).Encode(urls) + err := json.NewEncoder(w).Encode(peers) if err != nil { log.Error().Err(err).Msg("Failed to encode peers") } @@ -318,6 +337,77 @@ func handleAPI(server *ethp2p.Server) { } } +// getPeerMessages retrieves the count of various types of eth packets sent by a +// peer. +func getPeerMessages(url string, counter *prometheus.CounterVec) p2p.MessageCount { + return p2p.MessageCount{ + BlockHeaders: getCounterValue(new(eth.BlockHeadersPacket), url, counter), + BlockBodies: getCounterValue(new(eth.BlockBodiesPacket), url, counter), + Blocks: getCounterValue(new(eth.NewBlockPacket), url, counter), + BlockHashes: getCounterValue(new(eth.NewBlockHashesPacket), url, counter), + BlockHeaderRequests: getCounterValue(new(eth.GetBlockHeadersPacket), url, counter), + BlockBodiesRequests: getCounterValue(new(eth.GetBlockBodiesPacket), url, counter), + Transactions: getCounterValue(new(eth.TransactionsPacket), url, counter) + + getCounterValue(new(eth.PooledTransactionsPacket), url, counter), + TransactionHashes: getCounterValue(new(eth.NewPooledTransactionHashesPacket67), url, counter) + + getCounterValue(new(eth.NewPooledTransactionHashesPacket68), url, counter), + TransactionRequests: getCounterValue(new(eth.GetPooledTransactionsRequest), url, counter), + } +} + +// getCounterValue retrieves the count of packets for a specific type from the +// Prometheus counter. +func getCounterValue(packet eth.Packet, url string, counter *prometheus.CounterVec) int64 { + metric := &dto.Metric{} + + err := counter.WithLabelValues(packet.Name(), url).Write(metric) + if err != nil { + log.Error().Err(err).Send() + return 0 + } + + return int64(metric.GetCounter().GetValue()) +} + +// removePeerMessages removes all the counters of peers that disconnected from +// the sensor. This prevents the metrics list from infinitely growing. +func removePeerMessages(counter *prometheus.CounterVec, peers []*ethp2p.Peer) error { + urls := []string{} + for _, peer := range peers { + urls = append(urls, peer.Node().URLv4()) + } + + families, err := prometheus.DefaultGatherer.Gather() + if err != nil { + return err + } + + var family *dto.MetricFamily + for _, f := range families { + if f.GetName() == "sensor_messages" { + family = f + break + } + } + + if family == nil { + return errors.New("could not find sensor_messages metric family") + } + + for _, metric := range family.GetMetric() { + for _, label := range metric.GetLabel() { + url := label.GetValue() + if label.GetName() != "url" || slices.Contains(urls, url) { + continue + } + + counter.DeletePartialMatch(prometheus.Labels{"url": url}) + } + } + + return nil +} + // getLatestBlock will get the latest block from an RPC provider. func getLatestBlock(url string) (*rpctypes.RawBlockResponse, error) { client, err := rpc.Dial(url) diff --git a/p2p/log.go b/p2p/log.go index 06d227eb..c814e230 100644 --- a/p2p/log.go +++ b/p2p/log.go @@ -9,53 +9,53 @@ import ( // logging. It can be used to count the different types of messages received // across all peer connections to provide a summary. type MessageCount struct { - BlockHeaders int32 `json:",omitempty"` - BlockBodies int32 `json:",omitempty"` - Blocks int32 `json:",omitempty"` - BlockHashes int32 `json:",omitempty"` - BlockHeaderRequests int32 `json:",omitempty"` - BlockBodiesRequests int32 `json:",omitempty"` - Transactions int32 `json:",omitempty"` - TransactionHashes int32 `json:",omitempty"` - TransactionRequests int32 `json:",omitempty"` - Pings int32 `json:",omitempty"` - Errors int32 `json:",omitempty"` - Disconnects int32 `json:",omitempty"` + BlockHeaders int64 `json:"block_headers,omitempty"` + BlockBodies int64 `json:"block_bodies,omitempty"` + Blocks int64 `json:"blocks,omitempty"` + BlockHashes int64 `json:"block_hashes,omitempty"` + BlockHeaderRequests int64 `json:"block_header_requests,omitempty"` + BlockBodiesRequests int64 `json:"block_bodies_requests,omitempty"` + Transactions int64 `json:"transactions,omitempty"` + TransactionHashes int64 `json:"transaction_hashes,omitempty"` + TransactionRequests int64 `json:"transaction_requests,omitempty"` + Pings int64 `json:"pings,omitempty"` + Errors int64 `json:"errors,omitempty"` + Disconnects int64 `json:"disconnects,omitempty"` } // Load takes a snapshot of all the counts in a thread-safe manner. Make sure // you call this and read from the returned object. func (count *MessageCount) Load() MessageCount { return MessageCount{ - BlockHeaders: atomic.LoadInt32(&count.BlockHeaders), - BlockBodies: atomic.LoadInt32(&count.BlockBodies), - Blocks: atomic.LoadInt32(&count.Blocks), - BlockHashes: atomic.LoadInt32(&count.BlockHashes), - BlockHeaderRequests: atomic.LoadInt32(&count.BlockHeaderRequests), - BlockBodiesRequests: atomic.LoadInt32(&count.BlockBodiesRequests), - Transactions: atomic.LoadInt32(&count.Transactions), - TransactionHashes: atomic.LoadInt32(&count.TransactionHashes), - TransactionRequests: atomic.LoadInt32(&count.TransactionRequests), - Pings: atomic.LoadInt32(&count.Pings), - Errors: atomic.LoadInt32(&count.Errors), - Disconnects: atomic.LoadInt32(&count.Disconnects), + BlockHeaders: atomic.LoadInt64(&count.BlockHeaders), + BlockBodies: atomic.LoadInt64(&count.BlockBodies), + Blocks: atomic.LoadInt64(&count.Blocks), + BlockHashes: atomic.LoadInt64(&count.BlockHashes), + BlockHeaderRequests: atomic.LoadInt64(&count.BlockHeaderRequests), + BlockBodiesRequests: atomic.LoadInt64(&count.BlockBodiesRequests), + Transactions: atomic.LoadInt64(&count.Transactions), + TransactionHashes: atomic.LoadInt64(&count.TransactionHashes), + TransactionRequests: atomic.LoadInt64(&count.TransactionRequests), + Pings: atomic.LoadInt64(&count.Pings), + Errors: atomic.LoadInt64(&count.Errors), + Disconnects: atomic.LoadInt64(&count.Disconnects), } } // Clear clears all of the counts from the message counter. func (count *MessageCount) Clear() { - atomic.StoreInt32(&count.BlockHeaders, 0) - atomic.StoreInt32(&count.BlockBodies, 0) - atomic.StoreInt32(&count.Blocks, 0) - atomic.StoreInt32(&count.BlockHashes, 0) - atomic.StoreInt32(&count.BlockHeaderRequests, 0) - atomic.StoreInt32(&count.BlockBodiesRequests, 0) - atomic.StoreInt32(&count.Transactions, 0) - atomic.StoreInt32(&count.TransactionHashes, 0) - atomic.StoreInt32(&count.TransactionRequests, 0) - atomic.StoreInt32(&count.Pings, 0) - atomic.StoreInt32(&count.Errors, 0) - atomic.StoreInt32(&count.Disconnects, 0) + atomic.StoreInt64(&count.BlockHeaders, 0) + atomic.StoreInt64(&count.BlockBodies, 0) + atomic.StoreInt64(&count.Blocks, 0) + atomic.StoreInt64(&count.BlockHashes, 0) + atomic.StoreInt64(&count.BlockHeaderRequests, 0) + atomic.StoreInt64(&count.BlockBodiesRequests, 0) + atomic.StoreInt64(&count.Transactions, 0) + atomic.StoreInt64(&count.TransactionHashes, 0) + atomic.StoreInt64(&count.TransactionRequests, 0) + atomic.StoreInt64(&count.Pings, 0) + atomic.StoreInt64(&count.Errors, 0) + atomic.StoreInt64(&count.Disconnects, 0) } // IsEmpty checks whether the sum of all the counts is empty. Make sure to call @@ -76,8 +76,8 @@ func (c *MessageCount) IsEmpty() bool { ) == 0 } -func sum(ints ...int32) int32 { - var sum int32 = 0 +func sum(ints ...int64) int64 { + var sum int64 = 0 for _, i := range ints { sum += i } diff --git a/p2p/protocol.go b/p2p/protocol.go index ceb604ff..da56963c 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -294,7 +294,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { return err } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet))) + c.counter.WithLabelValues(packet.Name(), c.node.URLv4()).Add(float64(len(packet))) hashes := make([]common.Hash, 0, len(packet)) for _, hash := range packet { @@ -315,7 +315,7 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { return err } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), txs.Name()).Add(float64(len(txs))) + c.counter.WithLabelValues(txs.Name(), c.node.URLv4()).Add(float64(len(txs))) c.db.WriteTransactions(ctx, c.node, txs) @@ -328,7 +328,7 @@ func (c *conn) handleGetBlockHeaders(msg ethp2p.Msg) error { return err } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Inc() + c.counter.WithLabelValues(request.Name(), c.node.URLv4()).Inc() return ethp2p.Send( c.rw, @@ -344,7 +344,7 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error { } headers := packet.BlockHeadersRequest - c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(headers))) + c.counter.WithLabelValues(packet.Name(), c.node.URLv4()).Add(float64(len(headers))) for _, header := range headers { if err := c.getParentBlock(ctx, header); err != nil { @@ -363,7 +363,7 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error { return err } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Add(float64(len(request.GetBlockBodiesRequest))) + c.counter.WithLabelValues(request.Name(), c.node.URLv4()).Add(float64(len(request.GetBlockBodiesRequest))) return ethp2p.Send( c.rw, @@ -382,7 +382,7 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { return nil } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet.BlockBodiesResponse))) + c.counter.WithLabelValues(packet.Name(), c.node.URLv4()).Add(float64(len(packet.BlockBodiesResponse))) var hash *common.Hash for e := c.requests.Front(); e != nil; e = e.Next() { @@ -411,7 +411,7 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { return err } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), block.Name()).Inc() + c.counter.WithLabelValues(block.Name(), c.node.URLv4()).Inc() // Set the head block if newer. c.headMutex.Lock() @@ -442,7 +442,7 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error { return err } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Add(float64(len(request.GetPooledTransactionsRequest))) + c.counter.WithLabelValues(request.Name(), c.node.URLv4()).Add(float64(len(request.GetPooledTransactionsRequest))) return ethp2p.Send( c.rw, @@ -473,7 +473,7 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er return errors.New("protocol version not found") } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), name).Add(float64(len(hashes))) + c.counter.WithLabelValues(name, c.node.URLv4()).Add(float64(len(hashes))) if !c.db.ShouldWriteTransactions() || !c.db.ShouldWriteTransactionEvents() { return nil @@ -492,7 +492,7 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err return err } - c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet.PooledTransactionsResponse))) + c.counter.WithLabelValues(packet.Name(), c.node.URLv4()).Add(float64(len(packet.PooledTransactionsResponse))) c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse) diff --git a/p2p/rlpx.go b/p2p/rlpx.go index 7bba4ae7..ce95876d 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -157,17 +157,17 @@ func (c *rlpxConn) ReadAndServe(count *MessageCount) error { switch msg := c.Read().(type) { case *Ping: - atomic.AddInt32(&count.Pings, 1) + atomic.AddInt64(&count.Pings, 1) c.logger.Trace().Msg("Received Ping") if err := c.Write(&Pong{}); err != nil { c.logger.Error().Err(err).Msg("Failed to write Pong response") } case *BlockHeaders: - atomic.AddInt32(&count.BlockHeaders, int32(len(msg.BlockHeadersRequest))) + atomic.AddInt64(&count.BlockHeaders, int64(len(msg.BlockHeadersRequest))) c.logger.Trace().Msgf("Received %v BlockHeaders", len(msg.BlockHeadersRequest)) case *GetBlockHeaders: - atomic.AddInt32(&count.BlockHeaderRequests, 1) + atomic.AddInt64(&count.BlockHeaderRequests, 1) c.logger.Trace().Msgf("Received GetBlockHeaders request") res := &BlockHeaders{ @@ -178,10 +178,10 @@ func (c *rlpxConn) ReadAndServe(count *MessageCount) error { return err } case *BlockBodies: - atomic.AddInt32(&count.BlockBodies, int32(len(msg.BlockBodiesResponse))) + atomic.AddInt64(&count.BlockBodies, int64(len(msg.BlockBodiesResponse))) c.logger.Trace().Msgf("Received %v BlockBodies", len(msg.BlockBodiesResponse)) case *GetBlockBodies: - atomic.AddInt32(&count.BlockBodiesRequests, int32(len(msg.GetBlockBodiesRequest))) + atomic.AddInt64(&count.BlockBodiesRequests, int64(len(msg.GetBlockBodiesRequest))) c.logger.Trace().Msgf("Received %v GetBlockBodies request", len(msg.GetBlockBodiesRequest)) res := &BlockBodies{ @@ -191,7 +191,7 @@ func (c *rlpxConn) ReadAndServe(count *MessageCount) error { c.logger.Error().Err(err).Msg("Failed to write BlockBodies response") } case *NewBlockHashes: - atomic.AddInt32(&count.BlockHashes, int32(len(*msg))) + atomic.AddInt64(&count.BlockHashes, int64(len(*msg))) c.logger.Trace().Msgf("Received %v NewBlockHashes", len(*msg)) for _, hash := range *msg { @@ -218,13 +218,13 @@ func (c *rlpxConn) ReadAndServe(count *MessageCount) error { } case *NewBlock: - atomic.AddInt32(&count.Blocks, 1) + atomic.AddInt64(&count.Blocks, 1) c.logger.Trace().Str("hash", msg.Block.Hash().Hex()).Msg("Received NewBlock") case *Transactions: - atomic.AddInt32(&count.Transactions, int32(len(*msg))) + atomic.AddInt64(&count.Transactions, int64(len(*msg))) c.logger.Trace().Msgf("Received %v Transactions", len(*msg)) case *PooledTransactions: - atomic.AddInt32(&count.Transactions, int32(len(msg.PooledTransactionsResponse))) + atomic.AddInt64(&count.Transactions, int64(len(msg.PooledTransactionsResponse))) c.logger.Trace().Msgf("Received %v PooledTransactions", len(msg.PooledTransactionsResponse)) case *NewPooledTransactionHashes: if err := c.processNewPooledTransactionHashes(count, msg.Hashes); err != nil { @@ -235,7 +235,7 @@ func (c *rlpxConn) ReadAndServe(count *MessageCount) error { return err } case *GetPooledTransactions: - atomic.AddInt32(&count.TransactionRequests, int32(len(msg.GetPooledTransactionsRequest))) + atomic.AddInt64(&count.TransactionRequests, int64(len(msg.GetPooledTransactionsRequest))) c.logger.Trace().Msgf("Received %v GetPooledTransactions request", len(msg.GetPooledTransactionsRequest)) res := &PooledTransactions{ @@ -245,17 +245,17 @@ func (c *rlpxConn) ReadAndServe(count *MessageCount) error { c.logger.Error().Err(err).Msg("Failed to write PooledTransactions response") } case *Error: - atomic.AddInt32(&count.Errors, 1) + atomic.AddInt64(&count.Errors, 1) c.logger.Trace().Err(msg.Unwrap()).Msg("Received Error") if !strings.Contains(msg.Error(), "timeout") { return msg.Unwrap() } case *Disconnect: - atomic.AddInt32(&count.Disconnects, 1) + atomic.AddInt64(&count.Disconnects, 1) c.logger.Debug().Msgf("Disconnect received: %v", msg) case *Disconnects: - atomic.AddInt32(&count.Disconnects, 1) + atomic.AddInt64(&count.Disconnects, 1) c.logger.Debug().Msgf("Disconnect received: %v", msg) default: c.logger.Info().Interface("msg", msg).Int("code", msg.Code()).Msg("Received message") @@ -267,7 +267,7 @@ func (c *rlpxConn) ReadAndServe(count *MessageCount) error { // processNewPooledTransactionHashes processes NewPooledTransactionHashes // messages by requesting the transaction bodies. func (c *rlpxConn) processNewPooledTransactionHashes(count *MessageCount, hashes []common.Hash) error { - atomic.AddInt32(&count.TransactionHashes, int32(len(hashes))) + atomic.AddInt64(&count.TransactionHashes, int64(len(hashes))) c.logger.Trace().Msgf("Received %v NewPooledTransactionHashes", len(hashes)) req := &GetPooledTransactions{