From 1f8a298323cf2ad8ef7a26a556e6dcd309fe435c Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 15 Aug 2023 15:12:56 -0400 Subject: [PATCH] nits --- gossip/bloom.go | 8 +++--- gossip/gossip.go | 48 +++++++++++++++++++----------------- gossip/gossip_test.go | 17 ++++++++----- gossip/test_gossip.go | 4 +++ peer/network.go | 7 ++++-- plugin/evm/gossip_mempool.go | 7 ++++++ plugin/evm/mempool.go | 4 +++ plugin/evm/mempool_test.go | 35 ++++++++++++++++++++++++++ plugin/evm/message/codec.go | 10 ++++---- plugin/evm/tx_gossip_test.go | 9 ++++--- plugin/evm/vm.go | 10 ++++---- 11 files changed, 113 insertions(+), 46 deletions(-) create mode 100644 plugin/evm/mempool_test.go diff --git a/gossip/bloom.go b/gossip/bloom.go index 09b74e0ca4..453c0bbf89 100644 --- a/gossip/bloom.go +++ b/gossip/bloom.go @@ -8,6 +8,7 @@ import ( "hash" "time" + safemath "github.com/ava-labs/avalanchego/utils/math" bloomfilter "github.com/holiman/bloomfilter/v2" "golang.org/x/exp/rand" ) @@ -29,7 +30,8 @@ func NewBloomFilter(m uint64, p float64) (*BloomFilter, error) { type BloomFilter struct { Bloom *bloomfilter.Filter - Salt []byte + // Salt is provided to eventually unblock collisions in Bloom + Salt []byte } func (b *BloomFilter) Add(gossipable Gossipable) { @@ -80,8 +82,8 @@ type hasher struct { } func (h hasher) Sum64() uint64 { - for i, salt := range h.salt { - h.hash[i] ^= salt + for i := 0; i < safemath.Min(len(h.hash), len(h.salt)); i++ { + h.hash[i] ^= h.salt[i] } return binary.BigEndian.Uint64(h.hash[:]) diff --git a/gossip/gossip.go b/gossip/gossip.go index 5a4fe7f616..475531e1a8 100644 --- a/gossip/gossip.go +++ b/gossip/gossip.go @@ -60,28 +60,8 @@ func (g *Gossiper[T, U]) Gossip(shutdownChan chan struct{}, shutdownWg *sync.Wai for { select { case <-gossipTicker.C: - filter := g.set.GetFilter() - bloomBytes, err := filter.Bloom.MarshalBinary() - if err != nil { - log.Warn("failed to marshal bloom filter", "error", err) - continue - } - - request := PullGossipRequest{ - FilterBytes: bloomBytes, - SaltBytes: filter.Salt, - } - msgBytes, err := g.codec.Marshal(g.codecVersion, request) - if err != nil { - log.Warn("failed to marshal gossip request", "error", err) - continue - } - - for i := 0; i < g.config.PollSize; i++ { - if err := g.client.AppRequestAny(context.TODO(), msgBytes, g.handleResponse); err != nil { - log.Warn("failed to gossip", "error", err) - continue - } + if err := g.gossip(); err != nil { + log.Warn("failed to gossip", "error", err) } case <-shutdownChan: log.Debug("shutting down gossip") @@ -89,6 +69,30 @@ func (g *Gossiper[T, U]) Gossip(shutdownChan chan struct{}, shutdownWg *sync.Wai } } } +func (g *Gossiper[T, U]) gossip() error { + filter := g.set.GetFilter() + bloomBytes, err := filter.Bloom.MarshalBinary() + if err != nil { + return err + } + + request := PullGossipRequest{ + FilterBytes: bloomBytes, + SaltBytes: filter.Salt, + } + msgBytes, err := g.codec.Marshal(g.codecVersion, request) + if err != nil { + return err + } + + for i := 0; i < g.config.PollSize; i++ { + if err := g.client.AppRequestAny(context.TODO(), msgBytes, g.handleResponse); err != nil { + return err + } + } + + return nil +} func (g *Gossiper[T, U]) handleResponse(nodeID ids.NodeID, responseBytes []byte, err error) { if err != nil { diff --git a/gossip/gossip_test.go b/gossip/gossip_test.go index 346e31f696..44bfcc8010 100644 --- a/gossip/gossip_test.go +++ b/gossip/gossip_test.go @@ -131,10 +131,12 @@ func TestGossiperGossip(t *testing.T) { PollSize: 1, } gossiper := NewGossiper[testTx, *testTx](config, requestSet, requestClient, cc, 0) - done := make(chan struct{}) - wg := &sync.WaitGroup{} - wg.Add(1) - go gossiper.Gossip(done, wg) + received := set.Set[*testTx]{} + requestSet.onAdd = func(tx *testTx) { + received.Add(tx) + } + + require.NoError(gossiper.gossip()) <-gossiped require.Len(requestSet.set, len(tt.expected)) @@ -142,8 +144,11 @@ func TestGossiperGossip(t *testing.T) { require.Contains(requestSet.set, expected) } - close(done) - wg.Wait() + // we should not receive anything that we already had before we + // requested the gossip + for _, tx := range tt.requester { + require.NotContains(received, tx) + } }) } } diff --git a/gossip/test_gossip.go b/gossip/test_gossip.go index 873c04cc2d..e246c978a2 100644 --- a/gossip/test_gossip.go +++ b/gossip/test_gossip.go @@ -33,11 +33,15 @@ func (t *testTx) Unmarshal(bytes []byte) error { type testSet struct { set set.Set[*testTx] bloom *BloomFilter + onAdd func(tx *testTx) } func (t testSet) Add(gossipable *testTx) error { t.set.Add(gossipable) t.bloom.Add(gossipable) + if t.onAdd != nil { + t.onAdd(gossipable) + } return nil } diff --git a/peer/network.go b/peer/network.go index cec0a17af4..f9fda1f294 100644 --- a/peer/network.go +++ b/peer/network.go @@ -339,9 +339,12 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u var req message.Request if _, err := n.codec.Unmarshal(request, &req); err != nil { + log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) + // this might be a sdk request - if err := n.router.AppRequest(ctx, nodeID, requestID, deadline, request); err == nil { - log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) + if err := n.router.AppRequest(ctx, nodeID, requestID, deadline, request); err != nil { + log.Debug("failed to handle app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) + return nil } return nil diff --git a/plugin/evm/gossip_mempool.go b/plugin/evm/gossip_mempool.go index cfc90f9837..16266e2566 100644 --- a/plugin/evm/gossip_mempool.go +++ b/plugin/evm/gossip_mempool.go @@ -78,6 +78,13 @@ func (g *GossipEthTxPool) Subscribe(shutdownChan chan struct{}, shutdownWg *sync g.bloom.Add(tx) if gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipBloomMaxFilledRatio) { log.Debug("resetting bloom filter", "reason", "reached max filled ratio") + + pending, _ := g.mempool.Content() + for _, pendingTxs := range pending { + for _, pendingTx := range pendingTxs { + g.bloom.Add(&GossipEthTx{Tx: pendingTx}) + } + } } } g.lock.Unlock() diff --git a/plugin/evm/mempool.go b/plugin/evm/mempool.go index 94e3e6d7d2..1b555c023d 100644 --- a/plugin/evm/mempool.go +++ b/plugin/evm/mempool.go @@ -284,6 +284,10 @@ func (m *Mempool) addTx(tx *Tx, force bool) error { m.bloom.Add(&GossipAtomicTx{Tx: tx}) if gossip.ResetBloomFilterIfNeeded(m.bloom, txGossipBloomMaxFilledRatio) { log.Debug("resetting bloom filter", "reason", "reached max filled ratio") + + for _, pendingTx := range m.txHeap.minHeap.items { + m.bloom.Add(&GossipAtomicTx{Tx: pendingTx.tx}) + } } return nil diff --git a/plugin/evm/mempool_test.go b/plugin/evm/mempool_test.go new file mode 100644 index 0000000000..6d719a0ee2 --- /dev/null +++ b/plugin/evm/mempool_test.go @@ -0,0 +1,35 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "testing" + + "github.com/ava-labs/avalanchego/ids" + "github.com/stretchr/testify/require" +) + +func TestMempoolAddTx(t *testing.T) { + require := require.New(t) + m, err := NewMempool(ids.Empty, 5_000) + require.NoError(err) + + txs := make([]*GossipAtomicTx, 0) + for i := 0; i < 3_000; i++ { + tx := &GossipAtomicTx{ + Tx: &Tx{ + UnsignedAtomicTx: &TestUnsignedTx{ + IDV: ids.GenerateTestID(), + }, + }, + } + + txs = append(txs, tx) + require.NoError(m.Add(tx)) + } + + for _, tx := range txs { + require.True(m.bloom.Has(tx)) + } +} diff --git a/plugin/evm/message/codec.go b/plugin/evm/message/codec.go index 55d8da8a51..804fb3190e 100644 --- a/plugin/evm/message/codec.go +++ b/plugin/evm/message/codec.go @@ -20,7 +20,7 @@ const ( var ( Codec codec.Manager CrossChainCodec codec.Manager - SdkCodec codec.Manager + SDKCodec codec.Manager ) func init() { @@ -67,15 +67,15 @@ func init() { panic(errs.Err) } - SdkCodec = codec.NewManager(maxMessageSize) + SDKCodec = codec.NewManager(maxMessageSize) sdkc := linearcodec.NewDefault() errs = wrappers.Errs{} errs.Add( // p2p sdk gossip types - c.RegisterType(gossip.PullGossipRequest{}), - c.RegisterType(gossip.PullGossipResponse{}), - SdkCodec.RegisterCodec(Version, sdkc), + sdkc.RegisterType(gossip.PullGossipRequest{}), + sdkc.RegisterType(gossip.PullGossipResponse{}), + SDKCodec.RegisterCodec(Version, sdkc), ) if errs.Errored() { diff --git a/plugin/evm/tx_gossip_test.go b/plugin/evm/tx_gossip_test.go index f5c789c361..f1e73314cc 100644 --- a/plugin/evm/tx_gossip_test.go +++ b/plugin/evm/tx_gossip_test.go @@ -12,7 +12,8 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" - "github.com/ava-labs/avalanchego/snow/engine/common" + commonEng "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -58,7 +59,7 @@ func TestEthTxGossip(t *testing.T) { // sender for the peer requesting gossip from [vm] ctrl := gomock.NewController(t) - peerSender := common.NewMockSender(ctrl) + peerSender := commonEng.NewMockSender(ctrl) router := p2p.NewRouter(logging.NoLog{}, peerSender) // we're only making client requests, so we don't need a server handler @@ -71,7 +72,9 @@ func TestEthTxGossip(t *testing.T) { require.NoError(err) request := gossip.PullGossipRequest{ FilterBytes: emptyBloomFilterBytes, + SaltBytes: utils.RandomBytes(10), } + requestBytes, err := vm.networkCodec.Marshal(message.Version, request) require.NoError(err) @@ -150,7 +153,7 @@ func TestAtomicTxGossip(t *testing.T) { // sender for the peer requesting gossip from [vm] ctrl := gomock.NewController(t) - peerSender := common.NewMockSender(ctrl) + peerSender := commonEng.NewMockSender(ctrl) router := p2p.NewRouter(logging.NoLog{}, peerSender) // we're only making client requests, so we don't need a server handler diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index d0561c03ed..a0c4a1ec30 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -137,7 +137,7 @@ const ( // threshold on how full a tx gossip bloom filter can get before it's reset txGossipBloomMaxFilledRatio = 0.75 // maximum anticipated amount of entries in the tx gossip bloom filter - txGossipBloomMaxItems = 1_000 + txGossipBloomMaxItems = 4096 // maximum false positive rate for lookups txGossipBloomFalsePositiveRate = 0.001 ) @@ -979,14 +979,14 @@ func (vm *VM) initBlockBuilding() error { vm.shutdownWg.Add(1) go ethTxPool.Subscribe(vm.shutdownChan, &vm.shutdownWg) - ethTxGossipHandler := gossip.NewHandler[*GossipEthTx](ethTxPool, message.SdkCodec, message.Version) + ethTxGossipHandler := gossip.NewHandler[*GossipEthTx](ethTxPool, message.SDKCodec, message.Version) ethTxGossipClient, err := vm.router.RegisterAppProtocol(ethTxGossipProtocol, ethTxGossipHandler) if err != nil { return err } vm.ethTxGossipClient = ethTxGossipClient - atomicTxGossipHandler := gossip.NewHandler[*GossipAtomicTx](vm.mempool, message.SdkCodec, message.Version) + atomicTxGossipHandler := gossip.NewHandler[*GossipAtomicTx](vm.mempool, message.SDKCodec, message.Version) atomicTxGossipClient, err := vm.router.RegisterAppProtocol(atomicTxGossipProtocol, atomicTxGossipHandler) if err != nil { return err @@ -997,7 +997,7 @@ func (vm *VM) initBlockBuilding() error { txGossipConfig, ethTxPool, vm.ethTxGossipClient, - message.SdkCodec, + message.SDKCodec, message.Version, ) vm.shutdownWg.Add(1) @@ -1007,7 +1007,7 @@ func (vm *VM) initBlockBuilding() error { txGossipConfig, vm.mempool, vm.atomicTxGossipClient, - message.SdkCodec, + message.SDKCodec, message.Version, ) vm.shutdownWg.Add(1)