Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace periodic push gossip with pull gossip for block discovery #2365

Closed
wants to merge 15 commits into from
38 changes: 21 additions & 17 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,8 @@ func (m *manager) createAvalancheChain(
return nil, fmt.Errorf("error while fetching weight for subnet %s: %w", ctx.SubnetID, err)
}

consensusParams := sb.Config().ConsensusParameters
subnetConfig := sb.Config()
consensusParams := subnetConfig.ConsensusParameters
sampleK := consensusParams.K
if uint64(sampleK) > bootstrapWeight {
sampleK = int(bootstrapWeight)
Expand Down Expand Up @@ -859,13 +860,14 @@ func (m *manager) createAvalancheChain(
// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
snowmanEngineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
Params: consensusParams,
Consensus: snowmanConsensus,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
Params: consensusParams,
AcceptedFrontierPollSize: int(subnetConfig.AcceptedFrontierPollSize),
Consensus: snowmanConsensus,
}
snowmanEngine, err := smeng.New(snowmanEngineConfig)
if err != nil {
Expand Down Expand Up @@ -1153,7 +1155,8 @@ func (m *manager) createSnowmanChain(
return nil, fmt.Errorf("error while fetching weight for subnet %s: %w", ctx.SubnetID, err)
}

consensusParams := sb.Config().ConsensusParameters
subnetConfig := sb.Config()
consensusParams := subnetConfig.ConsensusParameters
sampleK := consensusParams.K
if uint64(sampleK) > bootstrapWeight {
sampleK = int(bootstrapWeight)
Expand Down Expand Up @@ -1205,14 +1208,15 @@ func (m *manager) createSnowmanChain(
// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
engineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Params: consensusParams,
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Params: consensusParams,
AcceptedFrontierPollSize: int(subnetConfig.AcceptedFrontierPollSize),
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
}
engine, err := smeng.New(engineConfig)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ const (
subnetConfigFileExt = ".json"
ipResolutionTimeout = 30 * time.Second

ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
acceptedFrontierGossipDeprecationMsg = "push based accepted frontier gossip is deprecated"
)

var (
Expand All @@ -72,6 +73,9 @@ var (
IpcsChainIDsKey: ipcDeprecationMsg,
IpcsPathKey: ipcDeprecationMsg,
KeystoreAPIEnabledKey: keystoreDeprecationMsg,
ConsensusGossipAcceptedFrontierValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierPeerSizeKey: acceptedFrontierGossipDeprecationMsg,
}

errSybilProtectionDisabledStakerWeights = errors.New("sybil protection disabled weights must be positive")
Expand Down Expand Up @@ -304,6 +308,7 @@ func getAdaptiveTimeoutConfig(v *viper.Viper) (timer.AdaptiveTimeoutConfig, erro

func getGossipConfig(v *viper.Viper) subnets.GossipConfig {
return subnets.GossipConfig{
AcceptedFrontierPollSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierPollSizeKey)),
AcceptedFrontierValidatorSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierValidatorSizeKey)),
AcceptedFrontierNonValidatorSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierNonValidatorSizeKey)),
AcceptedFrontierPeerSize: uint(v.GetUint32(ConsensusGossipAcceptedFrontierPeerSizeKey)),
Expand Down Expand Up @@ -1320,9 +1325,9 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
}

// Gossiping
nodeConfig.AcceptedFrontierGossipFrequency = v.GetDuration(ConsensusAcceptedFrontierGossipFrequencyKey)
nodeConfig.AcceptedFrontierGossipFrequency = v.GetDuration(ConsensusGossipAcceptedFrontierFrequencyKey)
if nodeConfig.AcceptedFrontierGossipFrequency < 0 {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusAcceptedFrontierGossipFrequencyKey)
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusGossipAcceptedFrontierFrequencyKey)
}

// App handling
Expand Down
3 changes: 2 additions & 1 deletion config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ func addNodeFlags(fs *pflag.FlagSet) {
fs.Duration(BenchlistMinFailingDurationKey, constants.DefaultBenchlistMinFailingDuration, "Minimum amount of time messages to a peer must be failing before the peer is benched")

// Router
fs.Duration(ConsensusAcceptedFrontierGossipFrequencyKey, constants.DefaultAcceptedFrontierGossipFrequency, "Frequency of gossiping accepted frontiers")
fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain")
fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain")
fs.Duration(ConsensusGossipAcceptedFrontierFrequencyKey, constants.DefaultAcceptedFrontierGossipFrequency, "Frequency of polling and gossiping accepted frontiers")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Felt like this should be next to the related configs

fs.Uint(ConsensusGossipAcceptedFrontierPollSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPollSize, "Number of validators to query when polling for accepted frontier changes")
fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierPeerSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPeerSize, "Number of peers to gossip to when gossiping accepted frontier")
Expand Down
5 changes: 3 additions & 2 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ const (
IpcsChainIDsKey = "ipcs-chain-ids"
IpcsPathKey = "ipcs-path"
MeterVMsEnabledKey = "meter-vms-enabled"
ConsensusAcceptedFrontierGossipFrequencyKey = "consensus-accepted-frontier-gossip-frequency"
ConsensusAppConcurrencyKey = "consensus-app-concurrency"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ConsensusGossipAcceptedFrontierFrequencyKey = "consensus-accepted-frontier-gossip-frequency"
ConsensusGossipAcceptedFrontierPollSizeKey = "consensus-accepted-frontier-gossip-poll-size"
ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size"
ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size"
ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size"
Expand All @@ -154,7 +156,6 @@ const (
AppGossipValidatorSizeKey = "consensus-app-gossip-validator-size"
AppGossipNonValidatorSizeKey = "consensus-app-gossip-non-validator-size"
AppGossipPeerSizeKey = "consensus-app-gossip-peer-size"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ProposerVMUseCurrentHeightKey = "proposervm-use-current-height"
FdLimitKey = "fd-limit"
IndexEnabledKey = "index-enabled"
Expand Down
15 changes: 13 additions & 2 deletions snow/consensus/snowman/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type metrics struct {
// before being accepted
latAccepted metric.Averager
blockSizeAcceptedSum prometheus.Gauge
buildLatencyAccepted prometheus.Gauge

// pollsRejected tracks the number of polls that a block was in processing
// for before being rejected
Expand Down Expand Up @@ -126,6 +127,11 @@ func newMetrics(
Name: "blks_accepted_container_size_sum",
Help: "cumulative size of all accepted blocks",
}),
buildLatencyAccepted: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "blks_build_accept_latency",
Help: "time (in ns) from the timestamp of a block to the time it was accepted",
}),

pollsRejected: metric.NewAveragerWithErrs(
namespace,
Expand Down Expand Up @@ -176,6 +182,7 @@ func newMetrics(
reg.Register(m.lastAcceptedTimestamp),
reg.Register(m.numProcessing),
reg.Register(m.blockSizeAcceptedSum),
reg.Register(m.buildLatencyAccepted),
reg.Register(m.blockSizeRejectedSum),
reg.Register(m.numSuccessfulPolls),
reg.Register(m.numFailedPolls),
Expand Down Expand Up @@ -218,10 +225,14 @@ func (m *metrics) Accepted(

m.pollsAccepted.Observe(float64(pollNumber - start.pollNumber))

duration := time.Since(start.time)
m.latAccepted.Observe(float64(duration))
now := time.Now()
processingDuration := now.Sub(start.time)
m.latAccepted.Observe(float64(processingDuration))

m.blockSizeAcceptedSum.Add(float64(blockSize))

builtDuration := now.Sub(timestamp)
m.buildLatencyAccepted.Add(float64(builtDuration))
}

func (m *metrics) Rejected(blkID ids.ID, pollNumber uint64, blockSize int) {
Expand Down
4 changes: 3 additions & 1 deletion snow/consensus/snowman/topological.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,9 +622,11 @@ func (ts *Topological) acceptPreferredChild(ctx context.Context, n *snowmanBlock
}

height := child.Height()
timestamp := child.Timestamp()
ts.ctx.Log.Trace("accepting block",
zap.Stringer("blkID", pref),
zap.Uint64("height", height),
zap.Time("timestamp", timestamp),
)
if err := child.Accept(ctx); err != nil {
return err
Expand All @@ -641,7 +643,7 @@ func (ts *Topological) acceptPreferredChild(ctx context.Context, n *snowmanBlock
ts.metrics.Accepted(
pref,
height,
child.Timestamp(),
timestamp,
ts.pollNumber,
len(bytes),
)
Expand Down
3 changes: 2 additions & 1 deletion snow/engine/snowman/bootstrap/block_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func (b *blockJob) Execute(ctx context.Context) error {
b.numAccepted.Inc()
b.log.Trace("accepting block in bootstrapping",
zap.Stringer("blkID", blkID),
zap.Uint64("blkHeight", b.blk.Height()),
zap.Uint64("height", b.blk.Height()),
zap.Time("timestamp", b.blk.Timestamp()),
)
if err := b.blk.Accept(ctx); err != nil {
b.log.Debug("failed to accept block during bootstrapping",
Expand Down
15 changes: 8 additions & 7 deletions snow/engine/snowman/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
type Config struct {
common.AllGetsServer

Ctx *snow.ConsensusContext
VM block.ChainVM
Sender common.Sender
Validators validators.Manager
Params snowball.Parameters
Consensus snowman.Consensus
PartialSync bool
Ctx *snow.ConsensusContext
VM block.ChainVM
Sender common.Sender
Validators validators.Manager
Params snowball.Parameters
AcceptedFrontierPollSize int
Consensus snowman.Consensus
PartialSync bool
}
72 changes: 50 additions & 22 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,55 @@ func newTransitive(config Config) (*Transitive, error) {
return t, t.metrics.Initialize("", config.Ctx.Registerer)
}

func (t *Transitive) Gossip(ctx context.Context) error {
t.Ctx.Log.Verbo("sampling from validators",
zap.Stringer("validators", t.Validators),
)

vdrIDs, err := t.Validators.Sample(t.Ctx.SubnetID, t.AcceptedFrontierPollSize)
if err != nil {
t.Ctx.Log.Error("dropped sample for block gossip",
zap.String("reason", "insufficient number of validators"),
zap.Int("size", t.AcceptedFrontierPollSize),
)
return nil
}

t.RequestID++
vdrSet := set.Of(vdrIDs...)
t.Sender.SendGetAcceptedFrontier(ctx, vdrSet, t.RequestID)

// TODO: Remove periodic push gossip after v1.11.x is activated
blkID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
}

blk, err := t.GetBlock(ctx, blkID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", blkID),
)
t.Sender.SendGossip(ctx, blk.Bytes())
return nil
}

func (t *Transitive) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, _ uint32, blkID ids.ID) error {
_, err := t.issueFromByID(ctx, nodeID, blkID)
return err
}

func (*Transitive) GetAcceptedFrontierFailed(context.Context, ids.NodeID, uint32) error {
return nil
}

func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkBytes []byte) error {
blk, err := t.VM.ParseBlock(ctx, blkBytes)
if err != nil {
Expand Down Expand Up @@ -346,28 +395,6 @@ func (*Transitive) Timeout(context.Context) error {
return nil
}

func (t *Transitive) Gossip(ctx context.Context) error {
blkID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
}

blk, err := t.GetBlock(ctx, blkID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", blkID),
)
t.Sender.SendGossip(ctx, blk.Bytes())
return nil
}

func (*Transitive) Halt(context.Context) {}

func (t *Transitive) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -801,6 +828,7 @@ func (t *Transitive) sendQuery(
t.Ctx.Log.Error("dropped query for block",
zap.String("reason", "insufficient number of validators"),
zap.Stringer("blkID", blkID),
zap.Int("size", t.Params.K),
)
return
}
Expand Down
1 change: 1 addition & 0 deletions subnets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
var errAllowedNodesWhenNotValidatorOnly = errors.New("allowedNodes can only be set when ValidatorOnly is true")

type GossipConfig struct {
AcceptedFrontierPollSize uint `json:"gossipAcceptedFrontierPollSize" yaml:"gossipAcceptedFrontierPollSize"`
AcceptedFrontierValidatorSize uint `json:"gossipAcceptedFrontierValidatorSize" yaml:"gossipAcceptedFrontierValidatorSize"`
AcceptedFrontierNonValidatorSize uint `json:"gossipAcceptedFrontierNonValidatorSize" yaml:"gossipAcceptedFrontierNonValidatorSize"`
AcceptedFrontierPeerSize uint `json:"gossipAcceptedFrontierPeerSize" yaml:"gossipAcceptedFrontierPeerSize"`
Expand Down
5 changes: 3 additions & 2 deletions utils/constants/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ const (
DefaultBenchlistMinFailingDuration = 2*time.Minute + 30*time.Second

// Router
DefaultAcceptedFrontierGossipFrequency = 10 * time.Second
DefaultConsensusAppConcurrency = 2
DefaultConsensusShutdownTimeout = time.Minute
DefaultAcceptedFrontierGossipFrequency = 500 * time.Millisecond
DefaultConsensusGossipAcceptedFrontierPollSize = 1
DefaultConsensusGossipAcceptedFrontierValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierPeerSize = 15
DefaultConsensusGossipAcceptedFrontierPeerSize = 1
DefaultConsensusGossipOnAcceptValidatorSize = 0
DefaultConsensusGossipOnAcceptNonValidatorSize = 0
DefaultConsensusGossipOnAcceptPeerSize = 10
Expand Down
3 changes: 2 additions & 1 deletion vms/platformvm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,8 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
MaxOutstandingItems: 1,
MaxItemProcessingTime: 1,
},
Consensus: &smcon.Topological{},
AcceptedFrontierPollSize: 1,
Consensus: &smcon.Topological{},
}
engine, err := smeng.New(engineConfig)
require.NoError(err)
Expand Down
Loading