Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: block_event dedupe using linked list #423

Merged
merged 12 commits into from
Nov 14, 2024
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ coverage.out
*.swo

wallets.json

*.key
91 changes: 75 additions & 16 deletions p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type conn struct {
requests *list.List
requestNum uint64

// Linked list of seen block hashes with timestamps.
blockHashes *list.List

// oldestBlock stores the first block the sensor has seen so when fetching
// parent blocks, it does not request blocks older than this.
oldestBlock *types.Header
Expand Down Expand Up @@ -72,6 +75,14 @@ type HeadBlock struct {
Time uint64
}

type BlockHashEntry struct {
hash common.Hash
time time.Time
}

// blockHashTTL defines the time-to-live for block hash entries in blockHashes list.
var blockHashTTL = 10 * time.Minute

// NewEthProctocol creates the new eth protocol. This will handle writing the
// status exchange, message handling, and writing blocks/txs to the database.
func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
Expand All @@ -81,17 +92,18 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
Length: 17,
Run: func(p *ethp2p.Peer, rw ethp2p.MsgReadWriter) error {
c := conn{
sensorID: opts.SensorID,
node: p.Node(),
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
rw: rw,
db: opts.Database,
requests: list.New(),
requestNum: 0,
head: opts.Head,
headMutex: opts.HeadMutex,
counter: opts.MsgCounter,
name: p.Fullname(),
sensorID: opts.SensorID,
node: p.Node(),
logger: log.With().Str("peer", p.Node().URLv4()).Logger(),
rw: rw,
db: opts.Database,
requests: list.New(),
requestNum: 0,
head: opts.Head,
headMutex: opts.HeadMutex,
counter: opts.MsgCounter,
name: p.Fullname(),
blockHashes: list.New(),
}

c.headMutex.RLock()
Expand Down Expand Up @@ -304,19 +316,66 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {

c.counter.WithLabelValues(packet.Name(), c.node.URLv4(), c.name).Add(float64(len(packet)))

hashes := make([]common.Hash, 0, len(packet))
for _, hash := range packet {
hashes = append(hashes, hash.Hash)
if err := c.getBlockData(hash.Hash); err != nil {
// Collect unique hashes for database write.
uniqueHashes := make([]common.Hash, 0, len(packet))

for _, entry := range packet {
hash := entry.Hash

// Check if we've seen the hash and remove old entries
if c.hasSeenBlockHash(hash) {
c.logger.Info().Str("hash", hash.Hex()).Msg("Skipping duplicate block hash")
continue
}

// Attempt to fetch block data first
if err := c.getBlockData(hash); err != nil {
minhd-vu marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// Now that we've successfully fetched, record the new block hash
c.addBlockHash(hash)
uniqueHashes = append(uniqueHashes, hash)
}

c.db.WriteBlockHashes(ctx, c.node, hashes, tfs)
// Write only unique hashes to the database.
if len(uniqueHashes) > 0 {
c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs)
}

return nil
}

// addBlockHash adds a new block hash with a timestamp to the blockHashes list.
func (c *conn) addBlockHash(hash common.Hash) {
now := time.Now()

// Add the new block hash entry to the list.
c.blockHashes.PushBack(BlockHashEntry{
hash: hash,
time: now,
})
}

// Helper method to check if a block hash is already in blockHashes.
func (c *conn) hasSeenBlockHash(hash common.Hash) bool {
now := time.Now()
for e := c.blockHashes.Front(); e != nil; e = e.Next() {
entry := e.Value.(BlockHashEntry)
// Check if the hash matches. We can short circuit here because there will
// be block hashes that we haven't seen before, which will make a full
// iteration of the blockHashes linked list.
if entry.hash.Cmp(hash) == 0 {
return true
}
// Remove entries older than blockHashTTL.
if now.Sub(entry.time) > blockHashTTL {
c.blockHashes.Remove(e)
}
}
return false
}

func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
var txs eth.TransactionsPacket
if err := msg.Decode(&txs); err != nil {
Expand Down
Loading