From 8c71a3cf6c5ee57c70aacbe6f9bd2d2b486719da Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Thu, 19 Sep 2024 06:57:25 -0700 Subject: [PATCH] feat: write peers to datastore (#374) * write peers to datastore * update docs --- cmd/p2p/sensor/sensor.go | 6 +++++ doc/polycli_p2p_sensor.md | 1 + p2p/database/database.go | 5 ++++ p2p/database/datastore.go | 48 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 60 insertions(+) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index fe6e43da..c841c205 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -1,6 +1,7 @@ package sensor import ( + "context" "crypto/ecdsa" "encoding/json" "errors" @@ -50,6 +51,7 @@ type ( ShouldWriteBlockEvents bool ShouldWriteTransactions bool ShouldWriteTransactionEvents bool + ShouldWritePeers bool ShouldRunPprof bool PprofPort uint ShouldRunPrometheus bool @@ -166,6 +168,7 @@ var SensorCmd = &cobra.Command{ ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, + ShouldWritePeers: inputSensorParams.ShouldWritePeers, TTL: inputSensorParams.TTL, }) @@ -268,6 +271,8 @@ var SensorCmd = &cobra.Command{ if err := removePeerMessages(msgCounter, server.Peers()); err != nil { log.Error().Err(err).Msg("Failed to clean up peer messages") } + + db.WritePeers(context.Background(), server.Peers()) case peer := <-opts.Peers: // Update the peer list and the nodes file. if _, ok := peers[peer.ID()]; !ok { @@ -481,6 +486,7 @@ increase CPU and memory usage.`) SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, `Whether to write transaction events to the database. This option could significantly increase CPU and memory usage.`) + SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "Whether to write peers to the database") SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "Whether to run pprof") SensorCmd.Flags().UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "Port pprof runs on") SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "Whether to run Prometheus") diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 9fae2611..3ffd8126 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -55,6 +55,7 @@ If no nodes.json file exists, it will be created. --ttl duration Time to live (default 336h0m0s) --write-block-events Whether to write block events to the database (default true) -B, --write-blocks Whether to write blocks to the database (default true) + --write-peers Whether to write peers to the database (default true) --write-tx-events Whether to write transaction events to the database. This option could significantly increase CPU and memory usage. (default true) -t, --write-txs Whether to write transactions to the database. This option could significantly diff --git a/p2p/database/database.go b/p2p/database/database.go index d78afbd7..c5334551 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -35,6 +36,9 @@ type Database interface { // ShouldWriteTransactionEvents return true, respectively. WriteTransactions(context.Context, *enode.Node, []*types.Transaction) + // WritePeers will write the connected peers to the database. + WritePeers(context.Context, []*p2p.Peer) + // HasBlock will return whether the block is in the database. If the database // client has not been initialized this will always return true. HasBlock(context.Context, common.Hash) bool @@ -44,6 +48,7 @@ type Database interface { ShouldWriteBlockEvents() bool ShouldWriteTransactions() bool ShouldWriteTransactionEvents() bool + ShouldWritePeers() bool // NodeList will return a list of enode URLs. NodeList(ctx context.Context, limit int) ([]string, error) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index ba9fe685..3744bfa8 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/rs/zerolog/log" "google.golang.org/api/iterator" @@ -21,6 +22,7 @@ const ( BlockEventsKind = "block_events" TransactionsKind = "transactions" TransactionEventsKind = "transaction_events" + PeersKind = "peers" MaxAttempts = 10 ) @@ -34,6 +36,7 @@ type Datastore struct { shouldWriteBlockEvents bool shouldWriteTransactions bool shouldWriteTransactionEvents bool + shouldWritePeers bool jobs chan struct{} ttl time.Duration } @@ -101,6 +104,13 @@ type DatastoreTransaction struct { Type int16 } +type DatastorePeer struct { + URL string + LastSeenBy string + TimeLastSeen time.Time + TTL time.Time +} + // DatastoreOptions is used when creating a NewDatastore. type DatastoreOptions struct { ProjectID string @@ -111,6 +121,7 @@ type DatastoreOptions struct { ShouldWriteBlockEvents bool ShouldWriteTransactions bool ShouldWriteTransactionEvents bool + ShouldWritePeers bool TTL time.Duration } @@ -130,6 +141,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { shouldWriteBlockEvents: opts.ShouldWriteBlockEvents, shouldWriteTransactions: opts.ShouldWriteTransactions, shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, + shouldWritePeers: opts.ShouldWritePeers, jobs: make(chan struct{}, opts.MaxConcurrency), ttl: opts.TTL, } @@ -234,6 +246,38 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs } } +// WritePeers writes the connected peers to datastore. +func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer) { + if d.client == nil || !d.ShouldWritePeers() { + return + } + + d.jobs <- struct{}{} + go func() { + + keys := make([]*datastore.Key, 0, len(peers)) + dsPeers := make([]*DatastorePeer, 0, len(peers)) + now := time.Now() + + for _, peer := range peers { + keys = append(keys, datastore.NameKey(PeersKind, peer.ID().String(), nil)) + dsPeers = append(dsPeers, &DatastorePeer{ + URL: peer.Node().URLv4(), + LastSeenBy: d.sensorID, + TimeLastSeen: now, + TTL: now.Add(d.ttl), + }) + } + + _, err := d.client.PutMulti(ctx, keys, dsPeers) + if err != nil { + log.Error().Err(err).Msg("Failed to write peers") + } + + <-d.jobs + }() +} + func (d *Datastore) MaxConcurrentWrites() int { return d.maxConcurrency } @@ -254,6 +298,10 @@ func (d *Datastore) ShouldWriteTransactionEvents() bool { return d.shouldWriteTransactionEvents } +func (d *Datastore) ShouldWritePeers() bool { + return d.shouldWritePeers +} + func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool { if d.client == nil { return true