Skip to content

Commit

Permalink
more gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Nov 25, 2023
1 parent 5891bba commit 23988e1
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 82 deletions.
38 changes: 17 additions & 21 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,7 @@ func (m *manager) createAvalancheChain(
return nil, fmt.Errorf("error while fetching weight for subnet %s: %w", ctx.SubnetID, err)
}

subnetConfig := sb.Config()
consensusParams := subnetConfig.ConsensusParameters
consensusParams := sb.Config().ConsensusParameters
sampleK := consensusParams.K
if uint64(sampleK) > bootstrapWeight {
sampleK = int(bootstrapWeight)
Expand Down Expand Up @@ -860,14 +859,13 @@ func (m *manager) createAvalancheChain(
// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
snowmanEngineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
Params: consensusParams,
FrontierPollSize: int(subnetConfig.FrontierPollSize),
Consensus: snowmanConsensus,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
Params: consensusParams,
Consensus: snowmanConsensus,
}
snowmanEngine, err := smeng.New(snowmanEngineConfig)
if err != nil {
Expand Down Expand Up @@ -1155,8 +1153,7 @@ func (m *manager) createSnowmanChain(
return nil, fmt.Errorf("error while fetching weight for subnet %s: %w", ctx.SubnetID, err)
}

subnetConfig := sb.Config()
consensusParams := subnetConfig.ConsensusParameters
consensusParams := sb.Config().ConsensusParameters
sampleK := consensusParams.K
if uint64(sampleK) > bootstrapWeight {
sampleK = int(bootstrapWeight)
Expand Down Expand Up @@ -1208,15 +1205,14 @@ func (m *manager) createSnowmanChain(
// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
engineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Params: consensusParams,
FrontierPollSize: int(subnetConfig.FrontierPollSize),
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Params: consensusParams,
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
}
engine, err := smeng.New(engineConfig)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ func getAdaptiveTimeoutConfig(v *viper.Viper) (timer.AdaptiveTimeoutConfig, erro

func getGossipConfig(v *viper.Viper) subnets.GossipConfig {
return subnets.GossipConfig{
FrontierPollSize: uint(v.GetUint32(ConsensusGossipFrontierPollSizeKey)),
AcceptedFrontierValidatorSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierValidatorSizeKey)),
AcceptedFrontierNonValidatorSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierNonValidatorSizeKey)),
AcceptedFrontierPeerSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierPeerSizeKey)),
Expand Down
3 changes: 1 addition & 2 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ func addNodeFlags(fs *pflag.FlagSet) {
// Router
fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain")
fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain")
fs.Duration(ConsensusGossipFrontierFrequencyKey, constants.DefaultFrontierGossipFrequency, "Frequency of polling and gossiping frontiers")
fs.Uint(ConsensusGossipFrontierPollSizeKey, constants.DefaultConsensusGossipFrontierPollSize, "Number of validators to query when polling for frontier changes")
fs.Duration(ConsensusGossipFrontierFrequencyKey, constants.DefaultFrontierGossipFrequency, "Frequency of polling and 10%% of gossiping of frontiers")
fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierPeerSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPeerSize, "Number of peers to gossip to when gossiping accepted frontier")
Expand Down
1 change: 0 additions & 1 deletion config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ const (
ConsensusAppConcurrencyKey = "consensus-app-concurrency"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ConsensusGossipFrontierFrequencyKey = "consensus-frontier-gossip-frequency"
ConsensusGossipFrontierPollSizeKey = "consensus-frontier-gossip-poll-size"
ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size"
ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size"
ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size"
Expand Down
15 changes: 7 additions & 8 deletions snow/engine/snowman/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ import (
type Config struct {
common.AllGetsServer

Ctx *snow.ConsensusContext
VM block.ChainVM
Sender common.Sender
Validators validators.Manager
Params snowball.Parameters
FrontierPollSize int
Consensus snowman.Consensus
PartialSync bool
Ctx *snow.ConsensusContext
VM block.ChainVM
Sender common.Sender
Validators validators.Manager
Params snowball.Parameters
Consensus snowman.Consensus
PartialSync bool
}
3 changes: 1 addition & 2 deletions snow/engine/snowman/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func DefaultConfigs() Config {
MaxOutstandingItems: 1,
MaxItemProcessingTime: 1,
},
FrontierPollSize: 1,
Consensus: &snowman.Topological{},
Consensus: &snowman.Topological{},
}
}
75 changes: 42 additions & 33 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (
"github.com/ava-labs/avalanchego/utils/wrappers"
)

const nonVerifiedCacheSize = 64 * units.MiB
const (
nonVerifiedCacheSize = 64 * units.MiB
putGossipPeriod = 10
)

var _ Engine = (*Transitive)(nil)

Expand All @@ -58,7 +61,9 @@ type Transitive struct {
common.AppHandler
validators.Connector

RequestID uint32
requestID uint32

gossipCounter int

// track outstanding preference requests
polls poll.Set
Expand Down Expand Up @@ -146,23 +151,7 @@ func newTransitive(config Config) (*Transitive, error) {
}

func (t *Transitive) Gossip(ctx context.Context) error {
// TODO: Remove periodic push gossip after v1.11.x is activated
lastAcceptedID, lastAcceptedHeight := t.Consensus.LastAccepted()

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", lastAcceptedID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", lastAcceptedID),
)
t.Sender.SendGossip(ctx, lastAccepted.Bytes())

if numProcessing := t.Consensus.NumProcessing(); numProcessing > 0 {
t.Ctx.Log.Debug("skipping block gossip",
zap.String("reason", "blocks currently processing"),
Expand All @@ -175,11 +164,10 @@ func (t *Transitive) Gossip(ctx context.Context) error {
zap.Stringer("validators", t.Validators),
)

vdrIDs, err := t.Validators.Sample(t.Ctx.SubnetID, t.FrontierPollSize)
vdrIDs, err := t.Validators.Sample(t.Ctx.SubnetID, 1)
if err != nil {
t.Ctx.Log.Error("skipping block gossip",
zap.String("reason", "insufficient number of validators"),
zap.Int("size", t.FrontierPollSize),
zap.String("reason", "no validators"),
)
return nil
}
Expand All @@ -195,10 +183,31 @@ func (t *Transitive) Gossip(ctx context.Context) error {
return nil
}

t.RequestID++
t.requestID++
vdrSet := set.Of(vdrIDs...)
preferredID := t.Consensus.Preference()
t.Sender.SendPullQuery(ctx, vdrSet, t.RequestID, preferredID, nextHeightToAccept)
t.Sender.SendPullQuery(ctx, vdrSet, t.requestID, preferredID, nextHeightToAccept)

// TODO: Remove periodic push gossip after v1.11.x is activated
t.gossipCounter++
t.gossipCounter %= putGossipPeriod
if t.gossipCounter > 0 {
return nil
}

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", lastAcceptedID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", lastAcceptedID),
)
t.Sender.SendGossip(ctx, lastAccepted.Bytes())
return nil
}

Expand Down Expand Up @@ -436,7 +445,7 @@ func (t *Transitive) Context() *snow.ConsensusContext {
}

func (t *Transitive) Start(ctx context.Context, startReqID uint32) error {
t.RequestID = startReqID
t.requestID = startReqID
lastAcceptedID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -805,14 +814,14 @@ func (t *Transitive) sendRequest(ctx context.Context, nodeID ids.NodeID, blkID i
return
}

t.RequestID++
t.blkReqs.Add(nodeID, t.RequestID, blkID)
t.requestID++
t.blkReqs.Add(nodeID, t.requestID, blkID)
t.Ctx.Log.Verbo("sending Get request",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", t.RequestID),
zap.Uint32("requestID", t.requestID),
zap.Stringer("blkID", blkID),
)
t.Sender.SendGet(ctx, nodeID, t.RequestID, blkID)
t.Sender.SendGet(ctx, nodeID, t.requestID, blkID)

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
Expand Down Expand Up @@ -854,21 +863,21 @@ func (t *Transitive) sendQuery(
}

vdrBag := bag.Of(vdrIDs...)
t.RequestID++
if !t.polls.Add(t.RequestID, vdrBag) {
t.requestID++
if !t.polls.Add(t.requestID, vdrBag) {
t.Ctx.Log.Error("dropped query for block",
zap.String("reason", "failed to add poll"),
zap.Stringer("blkID", blkID),
zap.Uint32("requestID", t.RequestID),
zap.Uint32("requestID", t.requestID),
)
return
}

vdrSet := set.Of(vdrIDs...)
if push {
t.Sender.SendPushQuery(ctx, vdrSet, t.RequestID, blkBytes, nextHeightToAccept)
t.Sender.SendPushQuery(ctx, vdrSet, t.requestID, blkBytes, nextHeightToAccept)
} else {
t.Sender.SendPullQuery(ctx, vdrSet, t.RequestID, blkID, nextHeightToAccept)
t.Sender.SendPullQuery(ctx, vdrSet, t.requestID, blkID, nextHeightToAccept)
}
}

Expand Down
10 changes: 1 addition & 9 deletions snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,23 +1304,15 @@ func TestEngineGossip(t *testing.T) {
return gBlk, nil
}

var (
calledSendPullQuery bool
calledSendGossip bool
)
var calledSendPullQuery bool
sender.SendPullQueryF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], _ uint32, _ ids.ID, _ uint64) {
calledSendPullQuery = true
require.Equal(set.Of(nodeID), nodeIDs)
}
sender.SendGossipF = func(_ context.Context, blkBytes []byte) {
calledSendGossip = true
require.Equal(gBlk.Bytes(), blkBytes)
}

require.NoError(te.Gossip(context.Background()))

require.True(calledSendPullQuery)
require.True(calledSendGossip)
}

func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion subnets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
var errAllowedNodesWhenNotValidatorOnly = errors.New("allowedNodes can only be set when ValidatorOnly is true")

type GossipConfig struct {
FrontierPollSize uint `json:"gossipFrontierPollSize" yaml:"gossipFrontierPollSize"`
AcceptedFrontierValidatorSize uint `json:"gossipAcceptedFrontierValidatorSize" yaml:"gossipAcceptedFrontierValidatorSize"`
AcceptedFrontierNonValidatorSize uint `json:"gossipAcceptedFrontierNonValidatorSize" yaml:"gossipAcceptedFrontierNonValidatorSize"`
AcceptedFrontierPeerSize uint `json:"gossipAcceptedFrontierPeerSize" yaml:"gossipAcceptedFrontierPeerSize"`
Expand Down
3 changes: 1 addition & 2 deletions utils/constants/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ const (
// Router
DefaultConsensusAppConcurrency = 2
DefaultConsensusShutdownTimeout = time.Minute
DefaultFrontierGossipFrequency = 500 * time.Millisecond
DefaultConsensusGossipFrontierPollSize = 5
DefaultFrontierGossipFrequency = 100 * time.Millisecond
DefaultConsensusGossipAcceptedFrontierValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierPeerSize = 1
Expand Down
3 changes: 1 addition & 2 deletions vms/platformvm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,8 +1540,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
MaxOutstandingItems: 1,
MaxItemProcessingTime: 1,
},
FrontierPollSize: 1,
Consensus: &smcon.Topological{},
Consensus: &smcon.Topological{},
}
engine, err := smeng.New(engineConfig)
require.NoError(err)
Expand Down

0 comments on commit 23988e1

Please sign in to comment.