diff --git a/.gitignore b/.gitignore index e2bde68b..6ce9162a 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ coverage.out *.swo wallets.json + +*.key diff --git a/p2p/protocol.go b/p2p/protocol.go index 551b2559..ec2d0d47 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -41,6 +41,9 @@ type conn struct { requests *list.List requestNum uint64 + // Linked list of seen block hashes with timestamps. + blockHashes *list.List + // oldestBlock stores the first block the sensor has seen so when fetching // parent blocks, it does not request blocks older than this. oldestBlock *types.Header @@ -72,6 +75,14 @@ type HeadBlock struct { Time uint64 } +type BlockHashEntry struct { + hash common.Hash + time time.Time +} + +// blockHashTTL defines the time-to-live for block hash entries in blockHashes list. +var blockHashTTL = 10 * time.Minute + // NewEthProctocol creates the new eth protocol. This will handle writing the // status exchange, message handling, and writing blocks/txs to the database. func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { @@ -81,17 +92,18 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { Length: 17, Run: func(p *ethp2p.Peer, rw ethp2p.MsgReadWriter) error { c := conn{ - sensorID: opts.SensorID, - node: p.Node(), - logger: log.With().Str("peer", p.Node().URLv4()).Logger(), - rw: rw, - db: opts.Database, - requests: list.New(), - requestNum: 0, - head: opts.Head, - headMutex: opts.HeadMutex, - counter: opts.MsgCounter, - name: p.Fullname(), + sensorID: opts.SensorID, + node: p.Node(), + logger: log.With().Str("peer", p.Node().URLv4()).Logger(), + rw: rw, + db: opts.Database, + requests: list.New(), + requestNum: 0, + head: opts.Head, + headMutex: opts.HeadMutex, + counter: opts.MsgCounter, + name: p.Fullname(), + blockHashes: list.New(), } c.headMutex.RLock() @@ -304,19 +316,65 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { c.counter.WithLabelValues(packet.Name(), c.node.URLv4(), c.name).Add(float64(len(packet))) - hashes := make([]common.Hash, 0, len(packet)) - for _, hash := range packet { - hashes = append(hashes, hash.Hash) - if err := c.getBlockData(hash.Hash); err != nil { + // Collect unique hashes for database write. + uniqueHashes := make([]common.Hash, 0, len(packet)) + + for _, entry := range packet { + hash := entry.Hash + + // Check if we've seen the hash and remove old entries + if c.hasSeenBlockHash(hash) { + continue + } + + // Attempt to fetch block data first + if err := c.getBlockData(hash); err != nil { return err } + + // Now that we've successfully fetched, record the new block hash + c.addBlockHash(hash) + uniqueHashes = append(uniqueHashes, hash) } - c.db.WriteBlockHashes(ctx, c.node, hashes, tfs) + // Write only unique hashes to the database. + if len(uniqueHashes) > 0 { + c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs) + } return nil } +// addBlockHash adds a new block hash with a timestamp to the blockHashes list. +func (c *conn) addBlockHash(hash common.Hash) { + now := time.Now() + + // Add the new block hash entry to the list. + c.blockHashes.PushBack(BlockHashEntry{ + hash: hash, + time: now, + }) +} + +// Helper method to check if a block hash is already in blockHashes. +func (c *conn) hasSeenBlockHash(hash common.Hash) bool { + now := time.Now() + for e := c.blockHashes.Front(); e != nil; e = e.Next() { + entry := e.Value.(BlockHashEntry) + // Check if the hash matches. We can short circuit here because there will + // be block hashes that we haven't seen before, which will make a full + // iteration of the blockHashes linked list. + if entry.hash.Cmp(hash) == 0 { + return true + } + // Remove entries older than blockHashTTL. + if now.Sub(entry.time) > blockHashTTL { + c.blockHashes.Remove(e) + } + } + return false +} + func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { var txs eth.TransactionsPacket if err := msg.Decode(&txs); err != nil {