Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: block_event dedupe using linked list #423

Merged
merged 12 commits into from
Nov 14, 2024
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ coverage.out
*.swo

wallets.json

*.key
94 changes: 78 additions & 16 deletions p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type conn struct {
requests *list.List
requestNum uint64

// Linked list of seen block hashes with timestamps
rebelArtists marked this conversation as resolved.
Show resolved Hide resolved
blockHashes *list.List

// oldestBlock stores the first block the sensor has seen so when fetching
// parent blocks, it does not request blocks older than this.
oldestBlock *types.Header
Expand Down Expand Up @@ -72,6 +75,14 @@ type HeadBlock struct {
Time uint64
}

type BlockHashEntry struct {
hash common.Hash
time time.Time
}

// blockHashTTL defines the time-to-live for block hash entries in blockHashes list.
var blockHashTTL = 10 * time.Minute

// NewEthProctocol creates the new eth protocol. This will handle writing the
// status exchange, message handling, and writing blocks/txs to the database.
func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
Expand All @@ -81,17 +92,18 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
Length: 17,
Run: func(p *ethp2p.Peer, rw ethp2p.MsgReadWriter) error {
c := conn{
sensorID: opts.SensorID,
node: p.Node(),
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
rw: rw,
db: opts.Database,
requests: list.New(),
requestNum: 0,
head: opts.Head,
headMutex: opts.HeadMutex,
counter: opts.MsgCounter,
name: p.Fullname(),
sensorID: opts.SensorID,
node: p.Node(),
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
rw: rw,
db: opts.Database,
requests: list.New(),
requestNum: 0,
head: opts.Head,
headMutex: opts.HeadMutex,
counter: opts.MsgCounter,
name: p.Fullname(),
blockHashes: list.New(),
}

c.headMutex.RLock()
Expand Down Expand Up @@ -304,19 +316,69 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {

c.counter.WithLabelValues(packet.Name(), c.node.URLv4(), c.name).Add(float64(len(packet)))

hashes := make([]common.Hash, 0, len(packet))
for _, hash := range packet {
hashes = append(hashes, hash.Hash)
if err := c.getBlockData(hash.Hash); err != nil {
// Collect unique hashes for database write.
uniqueHashes := make([]common.Hash, 0, len(packet))

for _, entry := range packet {
hash := entry.Hash
if c.hasSeenBlockHash(hash) {
c.logger.Info().Str("hash", hash.Hex()).Msg("Skipping duplicate block hash")
continue
}

// **Attempt to fetch block data first**
if err := c.getBlockData(hash); err != nil {
minhd-vu marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// **Now that we've successfully fetched, record the new block hash**
c.addBlockHash(hash)
uniqueHashes = append(uniqueHashes, hash)
}

c.db.WriteBlockHashes(ctx, c.node, hashes, tfs)
// Write only unique hashes to the database.
if len(uniqueHashes) > 0 {
c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs)
}

return nil
}

// addBlockHash adds a new block hash with a timestamp to the blockHashes list.
func (c *conn) addBlockHash(hash common.Hash) {
now := time.Now()

// Remove all entries older than blockHashTTL
for e := c.blockHashes.Front(); e != nil; {
next := e.Next() // Save the next element
entry := e.Value.(BlockHashEntry)

if now.Sub(entry.time) <= blockHashTTL {
break
}

c.blockHashes.Remove(e)
e = next
}
rebelArtists marked this conversation as resolved.
Show resolved Hide resolved

// Add the new block hash entry to the list.
c.blockHashes.PushBack(BlockHashEntry{
hash: hash,
time: now,
})
}

// Helper method to check if a block hash is already in blockHashes.
func (c *conn) hasSeenBlockHash(hash common.Hash) bool {
for e := c.blockHashes.Front(); e != nil; e = e.Next() {
entry := e.Value.(BlockHashEntry)
if entry.hash.Cmp(hash) == 0 {
return true
}
}
return false
}

func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
var txs eth.TransactionsPacket
if err := msg.Decode(&txs); err != nil {
Expand Down
Loading