From cc8bc6c924812020ac28f4c7ef3bf0a13300920a Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 12 Feb 2024 15:36:40 -0500 Subject: [PATCH] [DVT-1266][DVT-1213] Fix sensor network memory leak and add prom metrics (#207) * update pprof addr * set block requests limit * use time based removal * use time.Since() * move cleanup inside getBlockData * add prom metrics * update docs --- cmd/p2p/sensor/sensor.go | 38 ++++++++++++++++++++++++++----- doc/polycli_p2p_sensor.md | 2 ++ p2p/protocol.go | 47 +++++++++++++++++++++++---------------- p2p/rlpx.go | 1 + 4 files changed, 64 insertions(+), 24 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index f2092ce3..472969bc 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -22,6 +22,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/nat" "github.com/ethereum/go-ethereum/rpc" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog/log" "github.com/spf13/cobra" @@ -47,6 +50,8 @@ type ( ShouldWriteTransactionEvents bool ShouldRunPprof bool PprofPort uint + ShouldRunPrometheus bool + PrometheusPort uint KeyFile string Port int DiscoveryPort int @@ -106,12 +111,23 @@ var SensorCmd = &cobra.Command{ if inputSensorParams.ShouldRunPprof { go func() { - if pprofErr := http.ListenAndServe(fmt.Sprintf("localhost:%v", inputSensorParams.PprofPort), nil); pprofErr != nil { + addr := fmt.Sprintf(":%v", inputSensorParams.PprofPort) + if pprofErr := http.ListenAndServe(addr, nil); pprofErr != nil { log.Error().Err(pprofErr).Msg("Failed to start pprof") } }() } + if inputSensorParams.ShouldRunPrometheus { + go func() { + http.Handle("/metrics", promhttp.Handler()) + addr := fmt.Sprintf(":%v", inputSensorParams.PrometheusPort) + if promErr := http.ListenAndServe(addr, nil); promErr != nil { + log.Error().Err(promErr).Msg("Failed to start Prometheus handler") + } + }() + } + inputSensorParams.privateKey, err = crypto.GenerateKey() if err != nil { return err @@ -173,6 +189,18 @@ var SensorCmd = &cobra.Command{ Number: block.Number.ToUint64(), } + peersGauge := promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "sensor", + Name: "peers", + Help: "The number of peers the sensor is connected to", + }) + + msgCounter := promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "sensor", + Name: "messages", + Help: "The number and type of messages the sensor has received", + }, []string{"code", "message"}) + opts := p2p.EthProtocolOptions{ Context: cmd.Context(), Database: db, @@ -184,8 +212,8 @@ var SensorCmd = &cobra.Command{ Peers: make(chan *enode.Node), Head: &head, HeadMutex: &sync.RWMutex{}, - Count: &p2p.MessageCount{}, ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, + MsgCounter: msgCounter, } config := ethp2p.Config{ @@ -242,9 +270,7 @@ var SensorCmd = &cobra.Command{ for { select { case <-ticker.C: - count := opts.Count.Load() - opts.Count.Clear() - log.Info().Interface("peers", server.PeerCount()).Interface("counts", count).Send() + peersGauge.Set(float64(server.PeerCount())) case peer := <-opts.Peers: // Update the peer list and the nodes file. if _, ok := peers[peer.ID()]; !ok { @@ -326,6 +352,8 @@ increase CPU and memory usage.`) significantly increase CPU and memory usage.`) SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "Whether to run pprof") SensorCmd.Flags().UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "Port pprof runs on") + SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "Whether to run Prometheus") + SensorCmd.Flags().UintVar(&inputSensorParams.PrometheusPort, "prom-port", 2112, "Port Prometheus runs on") SensorCmd.Flags().StringVarP(&inputSensorParams.KeyFile, "key-file", "k", "", "Private key file") SensorCmd.Flags().IntVar(&inputSensorParams.Port, "port", 30303, "TCP network listening port") SensorCmd.Flags().IntVar(&inputSensorParams.DiscoveryPort, "discovery-port", 30303, "UDP P2P discovery port") diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 0c785815..215bd623 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -43,6 +43,8 @@ If no nodes.json file exists, it will be created. --pprof Whether to run pprof --pprof-port uint Port pprof runs on (default 6060) -p, --project-id string GCP project ID + --prom Whether to run Prometheus (default true) + --prom-port uint Port Prometheus runs on (default 2112) --quick-start Whether to load the nodes.json as static nodes to quickly start the network. This produces faster development cycles but can prevent the sensor from being to connect to new peers if the nodes.json file is large. diff --git a/p2p/protocol.go b/p2p/protocol.go index f6a8c674..fd81639f 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -8,7 +8,6 @@ import ( "fmt" "math/big" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -18,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/eth" ethp2p "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -33,7 +33,7 @@ type conn struct { db database.Database head *HeadBlock headMutex *sync.RWMutex - count *MessageCount + counter *prometheus.CounterVec // requests is used to store the request ID and the block hash. This is used // when fetching block bodies because the eth protocol block bodies do not @@ -56,8 +56,8 @@ type EthProtocolOptions struct { SensorID string NetworkID uint64 Peers chan *enode.Node - Count *MessageCount ForkID forkid.ID + MsgCounter *prometheus.CounterVec // Head keeps track of the current head block of the chain. This is required // when doing the status exchange. @@ -91,7 +91,7 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { requestNum: 0, head: opts.Head, headMutex: opts.HeadMutex, - count: opts.Count, + counter: opts.MsgCounter, } c.headMutex.RLock() @@ -242,11 +242,21 @@ func (c *conn) getBlockData(hash common.Hash) error { return err } + for e := c.requests.Front(); e != nil; e = e.Next() { + r := e.Value.(request) + + if time.Since(r.time).Minutes() > 10 { + c.requests.Remove(e) + } + } + c.requestNum++ c.requests.PushBack(request{ requestID: c.requestNum, hash: hash, + time: time.Now(), }) + bodiesRequest := &GetBlockBodies{ RequestId: c.requestNum, GetBlockBodiesRequest: []common.Hash{hash}, @@ -286,7 +296,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { return err } - atomic.AddInt32(&c.count.BlockHashes, int32(len(packet))) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet))) hashes := make([]common.Hash, 0, len(packet)) for _, hash := range packet { @@ -307,7 +317,7 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { return err } - atomic.AddInt32(&c.count.Transactions, int32(len(txs))) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), txs.Name()).Add(float64(len(txs))) c.db.WriteTransactions(ctx, c.node, txs) @@ -320,7 +330,7 @@ func (c *conn) handleGetBlockHeaders(msg ethp2p.Msg) error { return err } - atomic.AddInt32(&c.count.BlockHeaderRequests, 1) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Inc() return ethp2p.Send( c.rw, @@ -336,7 +346,7 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error { } headers := packet.BlockHeadersRequest - atomic.AddInt32(&c.count.BlockHeaders, int32(len(headers))) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(headers))) for _, header := range headers { if err := c.getParentBlock(ctx, header); err != nil { @@ -355,7 +365,7 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error { return err } - atomic.AddInt32(&c.count.BlockBodiesRequests, int32(len(request.GetBlockBodiesRequest))) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Add(float64(len(request.GetBlockBodiesRequest))) return ethp2p.Send( c.rw, @@ -374,15 +384,11 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { return nil } - atomic.AddInt32(&c.count.BlockBodies, int32(len(packet.BlockBodiesResponse))) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet.BlockBodiesResponse))) var hash *common.Hash for e := c.requests.Front(); e != nil; e = e.Next() { - r, ok := e.Value.(request) - if !ok { - c.logger.Error().Msg("Request type assertion failed") - continue - } + r := e.Value.(request) if r.requestID == packet.RequestId { hash = &r.hash @@ -407,7 +413,7 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { return err } - atomic.AddInt32(&c.count.Blocks, 1) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), block.Name()).Inc() // Set the head block if newer. c.headMutex.Lock() @@ -438,7 +444,7 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error { return err } - atomic.AddInt32(&c.count.TransactionRequests, int32(len(request.GetPooledTransactionsRequest))) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Add(float64(len(request.GetPooledTransactionsRequest))) return ethp2p.Send( c.rw, @@ -448,6 +454,7 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error { func (c *conn) handleNewPooledTransactionHashes(ctx context.Context, version uint, msg ethp2p.Msg) error { var hashes []common.Hash + var name string switch version { case 66, 67: @@ -456,17 +463,19 @@ func (c *conn) handleNewPooledTransactionHashes(ctx context.Context, version uin return err } hashes = txs + name = txs.Name() case 68: var txs eth.NewPooledTransactionHashesPacket68 if err := msg.Decode(&txs); err != nil { return err } hashes = txs.Hashes + name = txs.Name() default: return errors.New("protocol version not found") } - atomic.AddInt32(&c.count.TransactionHashes, int32(len(hashes))) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), name).Add(float64(len(hashes))) if !c.db.ShouldWriteTransactions() || !c.db.ShouldWriteTransactionEvents() { return nil @@ -485,7 +494,7 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err return err } - atomic.AddInt32(&c.count.Transactions, int32(len(packet.PooledTransactionsResponse))) + c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet.PooledTransactionsResponse))) c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse) diff --git a/p2p/rlpx.go b/p2p/rlpx.go index 72417098..5cd822d8 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -142,6 +142,7 @@ loop: type request struct { requestID uint64 hash common.Hash + time time.Time } // ReadAndServe reads messages from peers and writes it to a database.