Skip to content

Commit

Permalink
Merge onflow#4174
Browse files Browse the repository at this point in the history
4174: [BFT] Reporting basic consensus protocol violations r=durkmurder a=durkmurder

### Context

In order to address [reporting of protocol violations](onflow#4125) this PR refactors existing code and introduces new interface for reporting basic consensus protocol violations that can and have to be detected by all node types. 

As part of this PR:
- Updated structuring of our pub-sub consumers. Now protocol violations are moved to a separate interface as well as finalization events, introduced `FollowerConsumer` which combines separate interfaces usable by consensus follower(`FinalizationConsumer` and `BaseProtocolViolationConsumer`). 
- Updated distributors to reflect new changes
- Updated tests and documentation
- Connected slashing violations to a specific consumer that for now logs them
- Updated all of compliance engines(follower, consensus, collection) to report invalid blocks if they were detected during processing

Co-authored-by: Yurii Oleksyshyn <[email protected]>
Co-authored-by: Alexander Hentschel <[email protected]>
  • Loading branch information
3 people authored May 12, 2023
2 parents 18a786a + bbc49b8 commit 0ae2b5a
Show file tree
Hide file tree
Showing 79 changed files with 1,724 additions and 945 deletions.
22 changes: 11 additions & 11 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ type FlowAccessNodeBuilder struct {
FollowerState protocol.FollowerState
SyncCore *chainsync.Core
RpcEng *rpc.Engine
FinalizationDistributor *consensuspubsub.FinalizationDistributor
FollowerDistributor *consensuspubsub.FollowerDistributor
CollectionRPC access.AccessAPIClient
TransactionTimings *stdmap.TransactionTimings
CollectionsToMarkFinalized *stdmap.Times
Expand Down Expand Up @@ -316,7 +316,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
node.Logger,
node.Storage.Headers,
final,
builder.FinalizationDistributor,
builder.FollowerDistributor,
node.RootBlock.Header,
node.RootQC,
builder.Finalized,
Expand Down Expand Up @@ -344,7 +344,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
node.Logger,
node.Metrics.Mempool,
heroCacheCollector,
builder.FinalizationDistributor,
builder.FollowerDistributor,
builder.FollowerState,
builder.FollowerCore,
builder.Validator,
Expand Down Expand Up @@ -540,7 +540,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
builder.executionDataConfig,
)

builder.FinalizationDistributor.AddOnBlockFinalizedConsumer(builder.ExecutionDataRequester.OnBlockFinalized)
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.ExecutionDataRequester.OnBlockFinalized)
builder.ExecutionDataRequester.AddOnExecutionDataReceivedConsumer(execDataDistributor.OnExecutionDataReceived)

return builder.ExecutionDataRequester, nil
Expand Down Expand Up @@ -593,12 +593,12 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
}

func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder {
dist := consensuspubsub.NewFinalizationDistributor()
dist.AddConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
dist := consensuspubsub.NewFollowerDistributor()
dist.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
return &FlowAccessNodeBuilder{
AccessNodeConfig: DefaultAccessNodeConfig(),
FlowNodeBuilder: nodeBuilder,
FinalizationDistributor: dist,
AccessNodeConfig: DefaultAccessNodeConfig(),
FlowNodeBuilder: nodeBuilder,
FollowerDistributor: dist,
}
}

Expand Down Expand Up @@ -967,7 +967,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
if err != nil {
return nil, err
}
builder.FinalizationDistributor.AddOnBlockFinalizedConsumer(builder.RpcEng.OnFinalizedBlock)
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.RpcEng.OnFinalizedBlock)

return builder.RpcEng, nil
}).
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, err
}
builder.RequestEng.WithHandle(builder.IngestEng.OnCollection)
builder.FinalizationDistributor.AddConsumer(builder.IngestEng)
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.IngestEng.OnFinalizedBlock)

return builder.IngestEng, nil
}).
Expand Down
14 changes: 7 additions & 7 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func main() {
rpcConf rpc.Config
clusterComplianceConfig modulecompliance.Config

pools *epochpool.TransactionPools // epoch-scoped transaction pools
finalizationDistributor *pubsub.FinalizationDistributor
pools *epochpool.TransactionPools // epoch-scoped transaction pools
followerDistributor *pubsub.FollowerDistributor

push *pusher.Engine
ing *ingest.Engine
Expand Down Expand Up @@ -173,9 +173,9 @@ func main() {

nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
Module("finalization distributor", func(node *cmd.NodeConfig) error {
finalizationDistributor = pubsub.NewFinalizationDistributor()
finalizationDistributor.AddConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
Module("follower distributor", func(node *cmd.NodeConfig) error {
followerDistributor = pubsub.NewFollowerDistributor()
followerDistributor.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
return nil
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -281,7 +281,7 @@ func main() {
node.Logger,
node.Storage.Headers,
finalizer,
finalizationDistributor,
followerDistributor,
node.RootBlock.Header,
node.RootQC,
finalized,
Expand All @@ -308,7 +308,7 @@ func main() {
node.Logger,
node.Metrics.Mempool,
heroCacheCollector,
finalizationDistributor,
followerDistributor,
followerState,
followerCore,
validator,
Expand Down
98 changes: 55 additions & 43 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,30 +101,30 @@ func main() {
insecureAccessAPI bool
accessNodeIDS []string

err error
mutableState protocol.ParticipantState
beaconPrivateKey *encodable.RandomBeaconPrivKey
guarantees mempool.Guarantees
receipts mempool.ExecutionTree
seals mempool.IncorporatedResultSeals
pendingReceipts mempool.PendingReceipts
receiptRequester *requester.Engine
syncCore *chainsync.Core
comp *compliance.Engine
hot module.HotStuff
conMetrics module.ConsensusMetrics
mainMetrics module.HotstuffMetrics
receiptValidator module.ReceiptValidator
chunkAssigner *chmodule.ChunkAssigner
finalizationDistributor *pubsub.FinalizationDistributor
dkgBrokerTunnel *dkgmodule.BrokerTunnel
blockTimer protocol.BlockTimer
committee *committees.Consensus
epochLookup *epochs.EpochLookup
hotstuffModules *consensus.HotstuffModules
dkgState *bstorage.DKGState
safeBeaconKeys *bstorage.SafeBeaconPrivateKeys
getSealingConfigs module.SealingConfigsGetter
err error
mutableState protocol.ParticipantState
beaconPrivateKey *encodable.RandomBeaconPrivKey
guarantees mempool.Guarantees
receipts mempool.ExecutionTree
seals mempool.IncorporatedResultSeals
pendingReceipts mempool.PendingReceipts
receiptRequester *requester.Engine
syncCore *chainsync.Core
comp *compliance.Engine
hot module.HotStuff
conMetrics module.ConsensusMetrics
mainMetrics module.HotstuffMetrics
receiptValidator module.ReceiptValidator
chunkAssigner *chmodule.ChunkAssigner
followerDistributor *pubsub.FollowerDistributor
dkgBrokerTunnel *dkgmodule.BrokerTunnel
blockTimer protocol.BlockTimer
committee *committees.Consensus
epochLookup *epochs.EpochLookup
hotstuffModules *consensus.HotstuffModules
dkgState *bstorage.DKGState
safeBeaconKeys *bstorage.SafeBeaconPrivateKeys
getSealingConfigs module.SealingConfigsGetter
)

nodeBuilder := cmd.FlowNode(flow.RoleConsensus.String())
Expand Down Expand Up @@ -367,9 +367,8 @@ func main() {
syncCore, err = chainsync.New(node.Logger, node.SyncCoreConfig, metrics.NewChainSyncCollector(node.RootChainID), node.RootChainID)
return err
}).
Module("finalization distributor", func(node *cmd.NodeConfig) error {
finalizationDistributor = pubsub.NewFinalizationDistributor()
finalizationDistributor.AddConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
Module("follower distributor", func(node *cmd.NodeConfig) error {
followerDistributor = pubsub.NewFollowerDistributor()
return nil
}).
Module("machine account config", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -435,8 +434,8 @@ func main() {
)

// subscribe for finalization events from hotstuff
finalizationDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)
finalizationDistributor.AddOnBlockIncorporatedConsumer(e.OnBlockIncorporated)
followerDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)
followerDistributor.AddOnBlockIncorporatedConsumer(e.OnBlockIncorporated)

return e, err
}).
Expand Down Expand Up @@ -490,8 +489,8 @@ func main() {

// subscribe engine to inputs from other node-internal components
receiptRequester.WithHandle(e.HandleReceipt)
finalizationDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)
finalizationDistributor.AddOnBlockIncorporatedConsumer(e.OnBlockIncorporated)
followerDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)
followerDistributor.AddOnBlockIncorporatedConsumer(e.OnBlockIncorporated)

return e, err
}).
Expand Down Expand Up @@ -557,13 +556,18 @@ func main() {
// create consensus logger
logger := createLogger(node.Logger, node.RootChainID)

telemetryConsumer := notifications.NewTelemetryConsumer(logger)
slashingViolationConsumer := notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger)
followerDistributor.AddProposalViolationConsumer(slashingViolationConsumer)

// initialize a logging notifier for hotstuff
notifier := createNotifier(
logger,
mainMetrics,
)

notifier.AddConsumer(finalizationDistributor)
notifier.AddParticipantConsumer(telemetryConsumer)
notifier.AddFollowerConsumer(followerDistributor)

// initialize the persister
persist := persister.New(node.DB, node.RootChainID)
Expand All @@ -585,27 +589,35 @@ func main() {
return nil, err
}

qcDistributor := pubsub.NewQCCreatedDistributor()
// create producer and connect it to consumers
voteAggregationDistributor := pubsub.NewVoteAggregationDistributor()
voteAggregationDistributor.AddVoteCollectorConsumer(telemetryConsumer)
voteAggregationDistributor.AddVoteAggregationViolationConsumer(slashingViolationConsumer)

validator := consensus.NewValidator(mainMetrics, wrappedCommittee)
voteProcessorFactory := votecollector.NewCombinedVoteProcessorFactory(wrappedCommittee, qcDistributor.OnQcConstructedFromVotes)
voteProcessorFactory := votecollector.NewCombinedVoteProcessorFactory(wrappedCommittee, voteAggregationDistributor.OnQcConstructedFromVotes)
lowestViewForVoteProcessing := finalizedBlock.View + 1
voteAggregator, err := consensus.NewVoteAggregator(
logger,
mainMetrics,
node.Metrics.Engine,
node.Metrics.Mempool,
lowestViewForVoteProcessing,
notifier,
voteAggregationDistributor,
voteProcessorFactory,
finalizationDistributor)
followerDistributor)
if err != nil {
return nil, fmt.Errorf("could not initialize vote aggregator: %w", err)
}

timeoutCollectorDistributor := pubsub.NewTimeoutCollectorDistributor()
// create producer and connect it to consumers
timeoutAggregationDistributor := pubsub.NewTimeoutAggregationDistributor()
timeoutAggregationDistributor.AddTimeoutCollectorConsumer(telemetryConsumer)
timeoutAggregationDistributor.AddTimeoutAggregationViolationConsumer(slashingViolationConsumer)

timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory(
logger,
timeoutCollectorDistributor,
timeoutAggregationDistributor,
committee,
validator,
msig.ConsensusTimeoutTag,
Expand All @@ -617,7 +629,7 @@ func main() {
node.Metrics.Mempool,
notifier,
timeoutProcessorFactory,
timeoutCollectorDistributor,
timeoutAggregationDistributor,
lowestViewForVoteProcessing,
)
if err != nil {
Expand All @@ -629,9 +641,8 @@ func main() {
Committee: wrappedCommittee,
Signer: signer,
Persist: persist,
QCCreatedDistributor: qcDistributor,
FinalizationDistributor: finalizationDistributor,
TimeoutCollectorDistributor: timeoutCollectorDistributor,
VoteCollectorDistributor: voteAggregationDistributor.VoteCollectorDistributor,
TimeoutCollectorDistributor: timeoutAggregationDistributor.TimeoutCollectorDistributor,
Forks: forks,
Validator: validator,
VoteAggregator: voteAggregator,
Expand Down Expand Up @@ -708,6 +719,7 @@ func main() {
node.Metrics.Mempool,
mainMetrics,
node.Metrics.Compliance,
followerDistributor,
node.Tracer,
node.Storage.Headers,
node.Storage.Payloads,
Expand All @@ -733,7 +745,7 @@ func main() {
if err != nil {
return nil, fmt.Errorf("could not initialize compliance engine: %w", err)
}
finalizationDistributor.AddConsumer(comp)
followerDistributor.AddOnBlockFinalizedConsumer(comp.OnFinalizedBlock)

return comp, nil
}).
Expand Down
2 changes: 0 additions & 2 deletions cmd/consensus/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ func createLogger(log zerolog.Logger, chainID flow.ChainID) zerolog.Logger {

// createNotifier creates a pubsub distributor and connects it to consensus consumers.
func createNotifier(log zerolog.Logger, metrics module.HotstuffMetrics) *pubsub.Distributor {
telemetryConsumer := notifications.NewTelemetryConsumer(log)
metricsConsumer := metricsconsumer.NewMetricsConsumer(metrics)
logsConsumer := notifications.NewLogConsumer(log)
dis := pubsub.NewDistributor()
dis.AddConsumer(telemetryConsumer)
dis.AddConsumer(metricsConsumer)
dis.AddConsumer(logsConsumer)
return dis
Expand Down
Loading

0 comments on commit 0ae2b5a

Please sign in to comment.