Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace periodic push gossip with pull gossip for block discovery #2365

Closed
wants to merge 15 commits into from
38 changes: 21 additions & 17 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,8 @@ func (m *manager) createAvalancheChain(
return nil, fmt.Errorf("error while fetching weight for subnet %s: %w", ctx.SubnetID, err)
}

consensusParams := sb.Config().ConsensusParameters
subnetConfig := sb.Config()
consensusParams := subnetConfig.ConsensusParameters
sampleK := consensusParams.K
if uint64(sampleK) > bootstrapWeight {
sampleK = int(bootstrapWeight)
Expand Down Expand Up @@ -859,13 +860,14 @@ func (m *manager) createAvalancheChain(
// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
snowmanEngineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
Params: consensusParams,
Consensus: snowmanConsensus,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
Params: consensusParams,
AcceptedFrontierPollSize: int(subnetConfig.AcceptedFrontierPollSize),
Consensus: snowmanConsensus,
}
snowmanEngine, err := smeng.New(snowmanEngineConfig)
if err != nil {
Expand Down Expand Up @@ -1153,7 +1155,8 @@ func (m *manager) createSnowmanChain(
return nil, fmt.Errorf("error while fetching weight for subnet %s: %w", ctx.SubnetID, err)
}

consensusParams := sb.Config().ConsensusParameters
subnetConfig := sb.Config()
consensusParams := subnetConfig.ConsensusParameters
sampleK := consensusParams.K
if uint64(sampleK) > bootstrapWeight {
sampleK = int(bootstrapWeight)
Expand Down Expand Up @@ -1205,14 +1208,15 @@ func (m *manager) createSnowmanChain(
// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
engineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Params: consensusParams,
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Params: consensusParams,
AcceptedFrontierPollSize: int(subnetConfig.AcceptedFrontierPollSize),
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
}
engine, err := smeng.New(engineConfig)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ const (
subnetConfigFileExt = ".json"
ipResolutionTimeout = 30 * time.Second

ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
acceptedFrontierGossipDeprecationMsg = "push based accepted frontier gossip is deprecated"
)

var (
Expand All @@ -72,6 +73,9 @@ var (
IpcsChainIDsKey: ipcDeprecationMsg,
IpcsPathKey: ipcDeprecationMsg,
KeystoreAPIEnabledKey: keystoreDeprecationMsg,
ConsensusGossipAcceptedFrontierValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierPeerSizeKey: acceptedFrontierGossipDeprecationMsg,
}

errSybilProtectionDisabledStakerWeights = errors.New("sybil protection disabled weights must be positive")
Expand Down Expand Up @@ -304,6 +308,7 @@ func getAdaptiveTimeoutConfig(v *viper.Viper) (timer.AdaptiveTimeoutConfig, erro

func getGossipConfig(v *viper.Viper) subnets.GossipConfig {
return subnets.GossipConfig{
AcceptedFrontierPollSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierPollSizeKey)),
AcceptedFrontierValidatorSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierValidatorSizeKey)),
AcceptedFrontierNonValidatorSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierNonValidatorSizeKey)),
AcceptedFrontierPeerSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierPeerSizeKey)),
Expand Down Expand Up @@ -1320,9 +1325,9 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
}

// Gossiping
nodeConfig.AcceptedFrontierGossipFrequency = v.GetDuration(ConsensusAcceptedFrontierGossipFrequencyKey)
nodeConfig.AcceptedFrontierGossipFrequency = v.GetDuration(ConsensusGossipAcceptedFrontierFrequencyKey)
if nodeConfig.AcceptedFrontierGossipFrequency < 0 {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusAcceptedFrontierGossipFrequencyKey)
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusGossipAcceptedFrontierFrequencyKey)
}

// App handling
Expand Down
3 changes: 2 additions & 1 deletion config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ func addNodeFlags(fs *pflag.FlagSet) {
fs.Duration(BenchlistMinFailingDurationKey, constants.DefaultBenchlistMinFailingDuration, "Minimum amount of time messages to a peer must be failing before the peer is benched")

// Router
fs.Duration(ConsensusAcceptedFrontierGossipFrequencyKey, constants.DefaultAcceptedFrontierGossipFrequency, "Frequency of gossiping accepted frontiers")
fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain")
fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain")
fs.Duration(ConsensusGossipAcceptedFrontierFrequencyKey, constants.DefaultAcceptedFrontierGossipFrequency, "Frequency of polling and gossiping accepted frontiers")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Felt like this should be next to the related configs

fs.Uint(ConsensusGossipAcceptedFrontierPollSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPollSize, "Number of validators to query when polling for accepted frontier changes")
fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierPeerSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPeerSize, "Number of peers to gossip to when gossiping accepted frontier")
Expand Down
5 changes: 3 additions & 2 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ const (
IpcsChainIDsKey = "ipcs-chain-ids"
IpcsPathKey = "ipcs-path"
MeterVMsEnabledKey = "meter-vms-enabled"
ConsensusAcceptedFrontierGossipFrequencyKey = "consensus-accepted-frontier-gossip-frequency"
ConsensusAppConcurrencyKey = "consensus-app-concurrency"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ConsensusGossipAcceptedFrontierFrequencyKey = "consensus-accepted-frontier-gossip-frequency"
ConsensusGossipAcceptedFrontierPollSizeKey = "consensus-accepted-frontier-gossip-poll-size"
ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size"
ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size"
ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size"
Expand All @@ -154,7 +156,6 @@ const (
AppGossipValidatorSizeKey = "consensus-app-gossip-validator-size"
AppGossipNonValidatorSizeKey = "consensus-app-gossip-non-validator-size"
AppGossipPeerSizeKey = "consensus-app-gossip-peer-size"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ProposerVMUseCurrentHeightKey = "proposervm-use-current-height"
FdLimitKey = "fd-limit"
IndexEnabledKey = "index-enabled"
Expand Down
15 changes: 8 additions & 7 deletions snow/engine/snowman/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
type Config struct {
common.AllGetsServer

Ctx *snow.ConsensusContext
VM block.ChainVM
Sender common.Sender
Validators validators.Manager
Params snowball.Parameters
Consensus snowman.Consensus
PartialSync bool
Ctx *snow.ConsensusContext
VM block.ChainVM
Sender common.Sender
Validators validators.Manager
Params snowball.Parameters
AcceptedFrontierPollSize int
Consensus snowman.Consensus
PartialSync bool
}
3 changes: 2 additions & 1 deletion snow/engine/snowman/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func DefaultConfigs() Config {
MaxOutstandingItems: 1,
MaxItemProcessingTime: 1,
},
Consensus: &snowman.Topological{},
AcceptedFrontierPollSize: 1,
Consensus: &snowman.Topological{},
}
}
74 changes: 50 additions & 24 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type Transitive struct {
// list of NoOpsHandler for messages dropped by engine
common.StateSummaryFrontierHandler
common.AcceptedStateSummaryHandler
common.AcceptedFrontierHandler
common.AcceptedHandler
common.AncestorsHandler
common.AppHandler
Expand Down Expand Up @@ -130,7 +129,6 @@ func newTransitive(config Config) (*Transitive, error) {
Config: config,
StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log),
AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(config.Ctx.Log),
AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(config.Ctx.Log),
AcceptedHandler: common.NewNoOpAcceptedHandler(config.Ctx.Log),
AncestorsHandler: common.NewNoOpAncestorsHandler(config.Ctx.Log),
AppHandler: config.VM,
Expand All @@ -145,6 +143,55 @@ func newTransitive(config Config) (*Transitive, error) {
return t, t.metrics.Initialize("", config.Ctx.Registerer)
}

func (t *Transitive) Gossip(ctx context.Context) error {
t.Ctx.Log.Verbo("sampling from validators",
zap.Stringer("validators", t.Validators),
)

vdrIDs, err := t.Validators.Sample(t.Ctx.SubnetID, t.AcceptedFrontierPollSize)
if err != nil {
t.Ctx.Log.Error("dropped sample for block gossip",
zap.String("reason", "insufficient number of validators"),
zap.Int("size", t.AcceptedFrontierPollSize),
)
return nil
}

t.RequestID++
vdrSet := set.Of(vdrIDs...)
t.Sender.SendGetAcceptedFrontier(ctx, vdrSet, t.RequestID)

// TODO: Remove periodic push gossip after v1.11.x is activated
blkID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
}

blk, err := t.GetBlock(ctx, blkID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", blkID),
)
t.Sender.SendGossip(ctx, blk.Bytes())
return nil
}

func (t *Transitive) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, _ uint32, blkID ids.ID) error {
_, err := t.issueFromByID(ctx, nodeID, blkID)
return err
}

func (*Transitive) GetAcceptedFrontierFailed(context.Context, ids.NodeID, uint32) error {
return nil
}

func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkBytes []byte) error {
blk, err := t.VM.ParseBlock(ctx, blkBytes)
if err != nil {
Expand Down Expand Up @@ -346,28 +393,6 @@ func (*Transitive) Timeout(context.Context) error {
return nil
}

func (t *Transitive) Gossip(ctx context.Context) error {
blkID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
}

blk, err := t.GetBlock(ctx, blkID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", blkID),
)
t.Sender.SendGossip(ctx, blk.Bytes())
return nil
}

func (*Transitive) Halt(context.Context) {}

func (t *Transitive) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -801,6 +826,7 @@ func (t *Transitive) sendQuery(
t.Ctx.Log.Error("dropped query for block",
zap.String("reason", "insufficient number of validators"),
zap.Stringer("blkID", blkID),
zap.Int("size", t.Params.K),
)
return
}
Expand Down
16 changes: 12 additions & 4 deletions snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) {
func TestEngineGossip(t *testing.T) {
require := require.New(t)

_, _, sender, vm, te, gBlk := setupDefaultConfig(t)
nodeID, _, sender, vm, te, gBlk := setupDefaultConfig(t)

vm.LastAcceptedF = func(context.Context) (ids.ID, error) {
return gBlk.ID(), nil
Expand All @@ -1304,15 +1304,23 @@ func TestEngineGossip(t *testing.T) {
return gBlk, nil
}

called := new(bool)
var (
calledSendGetAcceptedFrontier bool
calledSendGossip bool
)
sender.SendGetAcceptedFrontierF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], _ uint32) {
calledSendGetAcceptedFrontier = true
require.Equal(set.Of(nodeID), nodeIDs)
}
sender.SendGossipF = func(_ context.Context, blkBytes []byte) {
*called = true
calledSendGossip = true
require.Equal(gBlk.Bytes(), blkBytes)
}

require.NoError(te.Gossip(context.Background()))

require.True(*called)
require.True(calledSendGetAcceptedFrontier)
require.True(calledSendGossip)
}

func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions subnets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
var errAllowedNodesWhenNotValidatorOnly = errors.New("allowedNodes can only be set when ValidatorOnly is true")

type GossipConfig struct {
AcceptedFrontierPollSize uint `json:"gossipAcceptedFrontierPollSize" yaml:"gossipAcceptedFrontierPollSize"`
AcceptedFrontierValidatorSize uint `json:"gossipAcceptedFrontierValidatorSize" yaml:"gossipAcceptedFrontierValidatorSize"`
AcceptedFrontierNonValidatorSize uint `json:"gossipAcceptedFrontierNonValidatorSize" yaml:"gossipAcceptedFrontierNonValidatorSize"`
AcceptedFrontierPeerSize uint `json:"gossipAcceptedFrontierPeerSize" yaml:"gossipAcceptedFrontierPeerSize"`
Expand Down
5 changes: 3 additions & 2 deletions utils/constants/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ const (
DefaultBenchlistMinFailingDuration = 2*time.Minute + 30*time.Second

// Router
DefaultAcceptedFrontierGossipFrequency = 10 * time.Second
DefaultConsensusAppConcurrency = 2
DefaultConsensusShutdownTimeout = time.Minute
DefaultAcceptedFrontierGossipFrequency = 500 * time.Millisecond
DefaultConsensusGossipAcceptedFrontierPollSize = 3
DefaultConsensusGossipAcceptedFrontierValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierPeerSize = 15
DefaultConsensusGossipAcceptedFrontierPeerSize = 1
DefaultConsensusGossipOnAcceptValidatorSize = 0
DefaultConsensusGossipOnAcceptNonValidatorSize = 0
DefaultConsensusGossipOnAcceptPeerSize = 10
Expand Down
3 changes: 2 additions & 1 deletion vms/platformvm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,8 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
MaxOutstandingItems: 1,
MaxItemProcessingTime: 1,
},
Consensus: &smcon.Topological{},
AcceptedFrontierPollSize: 1,
Consensus: &smcon.Topological{},
}
engine, err := smeng.New(engineConfig)
require.NoError(err)
Expand Down
Loading