Skip to content

Commit

Permalink
nits
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim committed Aug 15, 2023
1 parent 4fc0855 commit 1f8a298
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 46 deletions.
8 changes: 5 additions & 3 deletions gossip/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"hash"
"time"

safemath "github.com/ava-labs/avalanchego/utils/math"
bloomfilter "github.com/holiman/bloomfilter/v2"
"golang.org/x/exp/rand"
)
Expand All @@ -29,7 +30,8 @@ func NewBloomFilter(m uint64, p float64) (*BloomFilter, error) {

type BloomFilter struct {
Bloom *bloomfilter.Filter
Salt []byte
// Salt is provided to eventually unblock collisions in Bloom
Salt []byte
}

func (b *BloomFilter) Add(gossipable Gossipable) {
Expand Down Expand Up @@ -80,8 +82,8 @@ type hasher struct {
}

func (h hasher) Sum64() uint64 {
for i, salt := range h.salt {
h.hash[i] ^= salt
for i := 0; i < safemath.Min(len(h.hash), len(h.salt)); i++ {
h.hash[i] ^= h.salt[i]
}

return binary.BigEndian.Uint64(h.hash[:])
Expand Down
48 changes: 26 additions & 22 deletions gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,39 @@ func (g *Gossiper[T, U]) Gossip(shutdownChan chan struct{}, shutdownWg *sync.Wai
for {
select {
case <-gossipTicker.C:
filter := g.set.GetFilter()
bloomBytes, err := filter.Bloom.MarshalBinary()
if err != nil {
log.Warn("failed to marshal bloom filter", "error", err)
continue
}

request := PullGossipRequest{
FilterBytes: bloomBytes,
SaltBytes: filter.Salt,
}
msgBytes, err := g.codec.Marshal(g.codecVersion, request)
if err != nil {
log.Warn("failed to marshal gossip request", "error", err)
continue
}

for i := 0; i < g.config.PollSize; i++ {
if err := g.client.AppRequestAny(context.TODO(), msgBytes, g.handleResponse); err != nil {
log.Warn("failed to gossip", "error", err)
continue
}
if err := g.gossip(); err != nil {
log.Warn("failed to gossip", "error", err)
}
case <-shutdownChan:
log.Debug("shutting down gossip")
return
}
}
}
func (g *Gossiper[T, U]) gossip() error {
filter := g.set.GetFilter()
bloomBytes, err := filter.Bloom.MarshalBinary()
if err != nil {
return err
}

request := PullGossipRequest{
FilterBytes: bloomBytes,
SaltBytes: filter.Salt,
}
msgBytes, err := g.codec.Marshal(g.codecVersion, request)
if err != nil {
return err
}

for i := 0; i < g.config.PollSize; i++ {
if err := g.client.AppRequestAny(context.TODO(), msgBytes, g.handleResponse); err != nil {
return err
}
}

return nil
}

func (g *Gossiper[T, U]) handleResponse(nodeID ids.NodeID, responseBytes []byte, err error) {
if err != nil {
Expand Down
17 changes: 11 additions & 6 deletions gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,24 @@ func TestGossiperGossip(t *testing.T) {
PollSize: 1,
}
gossiper := NewGossiper[testTx, *testTx](config, requestSet, requestClient, cc, 0)
done := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go gossiper.Gossip(done, wg)
received := set.Set[*testTx]{}
requestSet.onAdd = func(tx *testTx) {
received.Add(tx)
}

require.NoError(gossiper.gossip())
<-gossiped

require.Len(requestSet.set, len(tt.expected))
for _, expected := range tt.expected {
require.Contains(requestSet.set, expected)
}

close(done)
wg.Wait()
// we should not receive anything that we already had before we
// requested the gossip
for _, tx := range tt.requester {
require.NotContains(received, tx)
}
})
}
}
4 changes: 4 additions & 0 deletions gossip/test_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@ func (t *testTx) Unmarshal(bytes []byte) error {
type testSet struct {
set set.Set[*testTx]
bloom *BloomFilter
onAdd func(tx *testTx)
}

func (t testSet) Add(gossipable *testTx) error {
t.set.Add(gossipable)
t.bloom.Add(gossipable)
if t.onAdd != nil {
t.onAdd(gossipable)
}
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,12 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)

// this might be a sdk request
if err := n.router.AppRequest(ctx, nodeID, requestID, deadline, request); err == nil {
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
if err := n.router.AppRequest(ctx, nodeID, requestID, deadline, request); err != nil {
log.Debug("failed to handle app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil
}

return nil
Expand Down
7 changes: 7 additions & 0 deletions plugin/evm/gossip_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ func (g *GossipEthTxPool) Subscribe(shutdownChan chan struct{}, shutdownWg *sync
g.bloom.Add(tx)
if gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipBloomMaxFilledRatio) {
log.Debug("resetting bloom filter", "reason", "reached max filled ratio")

pending, _ := g.mempool.Content()
for _, pendingTxs := range pending {
for _, pendingTx := range pendingTxs {
g.bloom.Add(&GossipEthTx{Tx: pendingTx})
}
}
}
}
g.lock.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions plugin/evm/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ func (m *Mempool) addTx(tx *Tx, force bool) error {
m.bloom.Add(&GossipAtomicTx{Tx: tx})
if gossip.ResetBloomFilterIfNeeded(m.bloom, txGossipBloomMaxFilledRatio) {
log.Debug("resetting bloom filter", "reason", "reached max filled ratio")

for _, pendingTx := range m.txHeap.minHeap.items {
m.bloom.Add(&GossipAtomicTx{Tx: pendingTx.tx})
}
}

return nil
Expand Down
35 changes: 35 additions & 0 deletions plugin/evm/mempool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"testing"

"github.com/ava-labs/avalanchego/ids"
"github.com/stretchr/testify/require"
)

func TestMempoolAddTx(t *testing.T) {
require := require.New(t)
m, err := NewMempool(ids.Empty, 5_000)
require.NoError(err)

txs := make([]*GossipAtomicTx, 0)
for i := 0; i < 3_000; i++ {
tx := &GossipAtomicTx{
Tx: &Tx{
UnsignedAtomicTx: &TestUnsignedTx{
IDV: ids.GenerateTestID(),
},
},
}

txs = append(txs, tx)
require.NoError(m.Add(tx))
}

for _, tx := range txs {
require.True(m.bloom.Has(tx))
}
}
10 changes: 5 additions & 5 deletions plugin/evm/message/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
var (
Codec codec.Manager
CrossChainCodec codec.Manager
SdkCodec codec.Manager
SDKCodec codec.Manager
)

func init() {
Expand Down Expand Up @@ -67,15 +67,15 @@ func init() {
panic(errs.Err)
}

SdkCodec = codec.NewManager(maxMessageSize)
SDKCodec = codec.NewManager(maxMessageSize)
sdkc := linearcodec.NewDefault()

errs = wrappers.Errs{}
errs.Add(
// p2p sdk gossip types
c.RegisterType(gossip.PullGossipRequest{}),
c.RegisterType(gossip.PullGossipResponse{}),
SdkCodec.RegisterCodec(Version, sdkc),
sdkc.RegisterType(gossip.PullGossipRequest{}),
sdkc.RegisterType(gossip.PullGossipResponse{}),
SDKCodec.RegisterCodec(Version, sdkc),
)

if errs.Errored() {
Expand Down
9 changes: 6 additions & 3 deletions plugin/evm/tx_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
commonEng "github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestEthTxGossip(t *testing.T) {

// sender for the peer requesting gossip from [vm]
ctrl := gomock.NewController(t)
peerSender := common.NewMockSender(ctrl)
peerSender := commonEng.NewMockSender(ctrl)
router := p2p.NewRouter(logging.NoLog{}, peerSender)

// we're only making client requests, so we don't need a server handler
Expand All @@ -71,7 +72,9 @@ func TestEthTxGossip(t *testing.T) {
require.NoError(err)
request := gossip.PullGossipRequest{
FilterBytes: emptyBloomFilterBytes,
SaltBytes: utils.RandomBytes(10),
}

requestBytes, err := vm.networkCodec.Marshal(message.Version, request)
require.NoError(err)

Expand Down Expand Up @@ -150,7 +153,7 @@ func TestAtomicTxGossip(t *testing.T) {

// sender for the peer requesting gossip from [vm]
ctrl := gomock.NewController(t)
peerSender := common.NewMockSender(ctrl)
peerSender := commonEng.NewMockSender(ctrl)
router := p2p.NewRouter(logging.NoLog{}, peerSender)

// we're only making client requests, so we don't need a server handler
Expand Down
10 changes: 5 additions & 5 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ const (
// threshold on how full a tx gossip bloom filter can get before it's reset
txGossipBloomMaxFilledRatio = 0.75
// maximum anticipated amount of entries in the tx gossip bloom filter
txGossipBloomMaxItems = 1_000
txGossipBloomMaxItems = 4096
// maximum false positive rate for lookups
txGossipBloomFalsePositiveRate = 0.001
)
Expand Down Expand Up @@ -979,14 +979,14 @@ func (vm *VM) initBlockBuilding() error {
vm.shutdownWg.Add(1)
go ethTxPool.Subscribe(vm.shutdownChan, &vm.shutdownWg)

ethTxGossipHandler := gossip.NewHandler[*GossipEthTx](ethTxPool, message.SdkCodec, message.Version)
ethTxGossipHandler := gossip.NewHandler[*GossipEthTx](ethTxPool, message.SDKCodec, message.Version)
ethTxGossipClient, err := vm.router.RegisterAppProtocol(ethTxGossipProtocol, ethTxGossipHandler)
if err != nil {
return err
}
vm.ethTxGossipClient = ethTxGossipClient

atomicTxGossipHandler := gossip.NewHandler[*GossipAtomicTx](vm.mempool, message.SdkCodec, message.Version)
atomicTxGossipHandler := gossip.NewHandler[*GossipAtomicTx](vm.mempool, message.SDKCodec, message.Version)
atomicTxGossipClient, err := vm.router.RegisterAppProtocol(atomicTxGossipProtocol, atomicTxGossipHandler)
if err != nil {
return err
Expand All @@ -997,7 +997,7 @@ func (vm *VM) initBlockBuilding() error {
txGossipConfig,
ethTxPool,
vm.ethTxGossipClient,
message.SdkCodec,
message.SDKCodec,
message.Version,
)
vm.shutdownWg.Add(1)
Expand All @@ -1007,7 +1007,7 @@ func (vm *VM) initBlockBuilding() error {
txGossipConfig,
vm.mempool,
vm.atomicTxGossipClient,
message.SdkCodec,
message.SDKCodec,
message.Version,
)
vm.shutdownWg.Add(1)
Expand Down

0 comments on commit 1f8a298

Please sign in to comment.