Skip to content

Commit

Permalink
Increase gossip size on first push (#2787)
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Buttolph <[email protected]>
Co-authored-by: Joshua Kim <[email protected]>
  • Loading branch information
StephenButtolph and joshua-kim authored Feb 29, 2024
1 parent d1312cb commit caef151
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 178 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ go 1.21
require (
github.com/DataDog/zstd v1.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.13.1-rc.3
github.com/ava-labs/coreth v0.13.1-rc.4
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.13.1-rc.3 h1:d32AzRI5HLwfFEevhR/RU4QPRjdl7LIMvPxTwlMlsmM=
github.com/ava-labs/coreth v0.13.1-rc.3/go.mod h1:J1boUw9u7S3JrisnJ81PvrhhyZUBnS4WuxSeMtTuVU0=
github.com/ava-labs/coreth v0.13.1-rc.4 h1:/3LsQi64oet6uCoUhEkgEXXcAAZFGPMUNJGdU03XH30=
github.com/ava-labs/coreth v0.13.1-rc.4/go.mod h1:4y1igTe/sFOIrpAtXoY+AdmfftNHrmrhBBRVfGCAPcw=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
133 changes: 74 additions & 59 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,19 @@ func NewPushGossiper[T Gossipable](
mempool Set[T],
client *p2p.Client,
metrics Metrics,
numValidators int,
numNonValidators int,
numPeers int,
gossipParams BranchingFactor,
regossipParams BranchingFactor,
discardedSize int,
targetGossipSize int,
maxRegossipFrequency time.Duration,
) (*PushGossiper[T], error) {
if err := gossipParams.Verify(); err != nil {
return nil, fmt.Errorf("invalid gossip params: %w", err)
}
if err := regossipParams.Verify(); err != nil {
return nil, fmt.Errorf("invalid regossip params: %w", err)
}
switch {
case numValidators < 0:
return nil, ErrInvalidNumValidators
case numNonValidators < 0:
return nil, ErrInvalidNumNonValidators
case numPeers < 0:
return nil, ErrInvalidNumPeers
case max(numValidators, numNonValidators, numPeers) == 0:
return nil, ErrInvalidNumToGossip
case discardedSize < 0:
return nil, ErrInvalidDiscardedSize
case targetGossipSize < 0:
Expand All @@ -292,9 +289,8 @@ func NewPushGossiper[T Gossipable](
set: mempool,
client: client,
metrics: metrics,
numValidators: numValidators,
numNonValidators: numNonValidators,
numPeers: numPeers,
gossipParams: gossipParams,
regossipParams: regossipParams,
targetGossipSize: targetGossipSize,
maxRegossipFrequency: maxRegossipFrequency,

Expand All @@ -312,9 +308,8 @@ type PushGossiper[T Gossipable] struct {
client *p2p.Client
metrics Metrics

numValidators int
numNonValidators int
numPeers int
gossipParams BranchingFactor
regossipParams BranchingFactor
targetGossipSize int
maxRegossipFrequency time.Duration

Expand All @@ -326,6 +321,27 @@ type PushGossiper[T Gossipable] struct {
discarded *cache.LRU[ids.ID, struct{}] // discarded attempts to avoid overgossiping transactions that are frequently dropped
}

type BranchingFactor struct {
Validators int
NonValidators int
Peers int
}

func (b *BranchingFactor) Verify() error {
switch {
case b.Validators < 0:
return ErrInvalidNumValidators
case b.NonValidators < 0:
return ErrInvalidNumNonValidators
case b.Peers < 0:
return ErrInvalidNumPeers
case max(b.Validators, b.NonValidators, b.Peers) == 0:
return ErrInvalidNumToGossip
default:
return nil
}
}

type tracking struct {
addedTime float64 // unix nanoseconds
lastGossiped time.Time
Expand All @@ -348,46 +364,46 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
return nil
}

var (
sentBytes = 0
gossip = make([][]byte, 0, defaultGossipableCount)
)

// Iterate over all unsent gossipables.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.toGossip.PopLeft()
if !ok {
break
}

// Ensure item is still in the set before we gossip.
gossipID := gossipable.GossipID()
tracking := p.tracking[gossipID]
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
continue
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
return err
}
if err := p.gossip(
ctx,
now,
p.gossipParams,
p.toGossip,
p.toRegossip,
&cache.Empty[ids.ID, struct{}]{}, // Don't mark dropped unsent transactions as discarded
); err != nil {
return fmt.Errorf("unexpected error during gossip: %w", err)
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.toRegossip.PushRight(gossipable)
tracking.lastGossiped = now
if err := p.gossip(
ctx,
now,
p.regossipParams,
p.toRegossip,
p.toRegossip,
p.discarded, // Mark dropped sent transactions as discarded
); err != nil {
return fmt.Errorf("unexpected error during regossip: %w", err)
}
return nil
}

maxLastGossipTimeToRegossip := now.Add(-p.maxRegossipFrequency)
func (p *PushGossiper[T]) gossip(
ctx context.Context,
now time.Time,
gossipParams BranchingFactor,
toGossip buffer.Deque[T],
toRegossip buffer.Deque[T],
discarded cache.Cacher[ids.ID, struct{}],
) error {
var (
sentBytes = 0
gossip = make([][]byte, 0, defaultGossipableCount)
maxLastGossipTimeToRegossip = now.Add(-p.maxRegossipFrequency)
)

// Iterate over all previously sent gossipables to fill any remaining space
// in the gossip batch.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.toRegossip.PopLeft()
gossipable, ok := toGossip.PopLeft()
if !ok {
break
}
Expand All @@ -398,29 +414,28 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
p.discarded.Put(gossipID, struct{}{}) // only add to discarded if previously sent
discarded.Put(gossipID, struct{}{}) // Cache that the item was dropped
continue
}

// Ensure we don't attempt to send a gossipable too frequently.
if maxLastGossipTimeToRegossip.Before(tracking.lastGossiped) {
// Put the gossipable on the front of the queue to keep items sorted
// by last issuance time.
p.toRegossip.PushLeft(gossipable)
toGossip.PushLeft(gossipable)
break
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
// Should never happen because we've already sent this once.
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.toRegossip.PushRight(gossipable)
toRegossip.PushRight(gossipable)
tracking.lastGossiped = now
}

Expand Down Expand Up @@ -448,9 +463,9 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
return p.client.AppGossip(
ctx,
msgBytes,
p.numValidators,
p.numNonValidators,
p.numPeers,
gossipParams.Validators,
gossipParams.NonValidators,
gossipParams.Peers,
)
}

Expand Down
Loading

0 comments on commit caef151

Please sign in to comment.