Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BFT] model.Proposal refactoring #6526

Merged
merged 17 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions consensus/hotstuff/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type VoteAggregationViolationConsumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnVoteForInvalidBlockDetected(vote *model.Vote, invalidProposal *model.Proposal)
OnVoteForInvalidBlockDetected(vote *model.Vote, invalidProposal *model.SignedProposal)
}

// TimeoutAggregationViolationConsumer consumes outbound notifications about Active Pacemaker violations specifically
Expand Down Expand Up @@ -138,7 +138,7 @@ type ParticipantConsumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnReceiveProposal(currentView uint64, proposal *model.Proposal)
OnReceiveProposal(currentView uint64, proposal *model.SignedProposal)

// OnReceiveQc notifications are produced by the EventHandler when it starts processing a
// QuorumCertificate [QC] constructed by the node's internal vote aggregator.
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type EventHandler interface {
// consensus participant.
// All inputs should be validated before feeding into this function. Assuming trusted data.
// No errors are expected during normal operation.
OnReceiveProposal(proposal *model.Proposal) error
OnReceiveProposal(proposal *model.SignedProposal) error

// OnLocalTimeout handles a local timeout event by creating a model.TimeoutObject and broadcasting it.
// No errors are expected during normal operation.
Expand Down
8 changes: 4 additions & 4 deletions consensus/hotstuff/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (e *EventHandler) OnReceiveTc(tc *flow.TimeoutCertificate) error {
// consensus participant.
// All inputs should be validated before feeding into this function. Assuming trusted data.
// No errors are expected during normal operation.
func (e *EventHandler) OnReceiveProposal(proposal *model.Proposal) error {
func (e *EventHandler) OnReceiveProposal(proposal *model.SignedProposal) error {
block := proposal.Block
curView := e.paceMaker.CurView()
log := e.log.With().
Expand Down Expand Up @@ -429,7 +429,7 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
lastViewTC = nil
}

// Construct Own Proposal
// Construct Own SignedProposal
// CAUTION, design constraints:
// (i) We cannot process our own proposal within the `EventHandler` right away.
// (ii) We cannot add our own proposal to Forks here right away.
Expand Down Expand Up @@ -491,7 +491,7 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
// It is called AFTER the block has been stored or found in Forks
// It checks whether to vote for this block.
// No errors are expected during normal operation.
func (e *EventHandler) processBlockForCurrentView(proposal *model.Proposal) error {
func (e *EventHandler) processBlockForCurrentView(proposal *model.SignedProposal) error {
// sanity check that block is really for the current view:
curView := e.paceMaker.CurView()
block := proposal.Block
Expand Down Expand Up @@ -526,7 +526,7 @@ func (e *EventHandler) processBlockForCurrentView(proposal *model.Proposal) erro
// ownVote generates and forwards the own vote, if we decide to vote.
// Any errors are potential symptoms of uncovered edge cases or corrupted internal state (fatal).
// No errors are expected during normal operation.
func (e *EventHandler) ownVote(proposal *model.Proposal, curView uint64, nextLeader flow.Identifier) error {
func (e *EventHandler) ownVote(proposal *model.SignedProposal, curView uint64, nextLeader flow.Identifier) error {
block := proposal.Block
log := e.log.With().
Uint64("block_view", block.View).
Expand Down
29 changes: 12 additions & 17 deletions consensus/hotstuff/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ func NewSafetyRules(t *testing.T) *SafetyRules {

// SafetyRules will not vote for any block, unless the blockID exists in votable map
safetyRules.On("ProduceVote", mock.Anything, mock.Anything).Return(
func(block *model.Proposal, _ uint64) *model.Vote {
func(block *model.SignedProposal, _ uint64) *model.Vote {
_, ok := safetyRules.votable[block.Block.BlockID]
if !ok {
return nil
}
return createVote(block.Block)
},
func(block *model.Proposal, _ uint64) error {
func(block *model.SignedProposal, _ uint64) error {
_, ok := safetyRules.votable[block.Block.BlockID]
if !ok {
return model.NewNoVoteErrorf("block not found")
Expand Down Expand Up @@ -179,7 +179,7 @@ func NewForks(t *testing.T, finalized uint64) *Forks {
}

f.On("AddValidatedBlock", mock.Anything).Return(func(proposal *model.Block) error {
log.Info().Msgf("forks.AddValidatedBlock received Proposal for view: %v, QC: %v\n", proposal.View, proposal.QC.View)
log.Info().Msgf("forks.AddValidatedBlock received Block proposal for view: %v, QC: %v\n", proposal.View, proposal.QC.View)
return f.addProposal(proposal)
}).Maybe()

Expand Down Expand Up @@ -228,14 +228,12 @@ type BlockProducer struct {
}

func (b *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error) {
return model.ProposalToFlow(&model.Proposal{
Block: helper.MakeBlock(
return helper.SignedProposalToFlow(helper.MakeSignedProposal(helper.WithProposal(
helper.MakeProposal(helper.WithBlock(helper.MakeBlock(
helper.WithBlockView(view),
helper.WithBlockQC(qc),
helper.WithBlockProposer(b.proposerID),
),
LastViewTC: lastViewTC,
}), nil
helper.WithBlockProposer(b.proposerID))),
helper.WithLastViewTC(lastViewTC))))), nil
}

func TestEventHandler(t *testing.T) {
Expand All @@ -258,8 +256,8 @@ type EventHandlerSuite struct {

initView uint64 // the current view at the beginning of the test case
endView uint64 // the expected current view at the end of the test case
parentProposal *model.Proposal
votingProposal *model.Proposal
parentProposal *model.SignedProposal
votingProposal *model.SignedProposal
qc *flow.QuorumCertificate
tc *flow.TimeoutCertificate
newview *model.NewViewEvent
Expand Down Expand Up @@ -670,7 +668,7 @@ func (es *EventHandlerSuite) TestOnReceiveTc_NextLeaderProposes() {

// proposed block should contain valid newest QC and lastViewTC
expectedNewestQC := es.paceMaker.NewestQC()
proposal := model.ProposalFromFlow(header)
proposal := model.SignedProposalFromFlow(header)
require.Equal(es.T(), expectedNewestQC, proposal.Block.QC)
require.Equal(es.T(), es.paceMaker.LastViewTC(), proposal.LastViewTC)
}).Once()
Expand Down Expand Up @@ -1033,10 +1031,7 @@ func createVote(block *model.Block) *model.Vote {
}
}

func createProposal(view uint64, qcview uint64) *model.Proposal {
func createProposal(view uint64, qcview uint64) *model.SignedProposal {
block := createBlockWithQC(view, qcview)
return &model.Proposal{
Block: block,
SigData: nil,
}
return helper.MakeSignedProposal(helper.WithProposal(helper.MakeProposal(helper.WithBlock(block))))
}
4 changes: 2 additions & 2 deletions consensus/hotstuff/eventloop/event_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// it contains an attached insertionTime that is used to measure how long we have waited between queening proposal and
// actually processing by `EventHandler`.
type queuedProposal struct {
proposal *model.Proposal
proposal *model.SignedProposal
insertionTime time.Time
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (el *EventLoop) loop(ctx context.Context) error {
}

// SubmitProposal pushes the received block to the proposals channel
func (el *EventLoop) SubmitProposal(proposal *model.Proposal) {
func (el *EventLoop) SubmitProposal(proposal *model.SignedProposal) {
queueItem := queuedProposal{
proposal: proposal,
insertionTime: time.Now(),
Expand Down
8 changes: 4 additions & 4 deletions consensus/hotstuff/eventloop/event_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *EventLoopTestSuite) TestReadyDone() {

// Test_SubmitQC tests that submitted proposal is eventually sent to event handler for processing
func (s *EventLoopTestSuite) Test_SubmitProposal() {
proposal := helper.MakeProposal()
proposal := helper.MakeSignedProposal()
processed := atomic.NewBool(false)
s.eh.On("OnReceiveProposal", proposal).Run(func(args mock.Arguments) {
processed.Store(true)
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestEventLoop_Timeout(t *testing.T) {
go func() {
defer wg.Done()
for !processed.Load() {
eventLoop.SubmitProposal(helper.MakeProposal())
eventLoop.SubmitProposal(helper.MakeSignedProposal())
}
}()

Expand Down Expand Up @@ -258,7 +258,7 @@ func TestReadyDoneWithStartTime(t *testing.T) {
require.NoError(t, err)

done := make(chan struct{})
eh.On("OnReceiveProposal", mock.AnythingOfType("*model.Proposal")).Run(func(args mock.Arguments) {
eh.On("OnReceiveProposal", mock.AnythingOfType("*model.SignedProposal")).Run(func(args mock.Arguments) {
require.True(t, time.Now().After(startTime))
close(done)
}).Return(nil).Once()
Expand All @@ -271,7 +271,7 @@ func TestReadyDoneWithStartTime(t *testing.T) {

parentBlock := unittest.BlockHeaderFixture()
block := unittest.BlockHeaderWithParentFixture(parentBlock)
eventLoop.SubmitProposal(model.ProposalFromFlow(block))
eventLoop.SubmitProposal(model.SignedProposalFromFlow(block))

unittest.RequireCloseBefore(t, done, startTimeDuration+100*time.Millisecond, "proposal wasn't received")
cancel()
Expand Down
1 change: 0 additions & 1 deletion consensus/hotstuff/forks/block_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (bb *BlockBuilder) Proposals() ([]*model.Proposal, error) {
PayloadHash: payloadHash,
},
LastViewTC: lastViewTC,
SigData: nil,
}
proposal.Block.BlockID = makeBlockID(proposal.Block)

Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/forks/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (f *Forks) checkForAdvancingFinalization(certifiedBlock *model.CertifiedBlo
parentBlock := parentVertex.(*BlockContainer).Block()

// Note: we assume that all stored blocks pass Forks.EnsureBlockIsValidExtension(block);
// specifically, that Proposal's ViewNumber is strictly monotonically
// specifically, that block's ViewNumber is strictly monotonically
// increasing which is enforced by LevelledForest.VerifyVertex(...)
// We denote:
// * a DIRECT 1-chain as '<-'
Expand Down
6 changes: 3 additions & 3 deletions consensus/hotstuff/forks/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestFinalize_Multiple2Chains(t *testing.T) {
}

// TestFinalize_OrphanedFork tests that we can finalize a block which causes a conflicting fork to be orphaned.
// We ingest the the following block tree:
// We ingest the following block tree:
//
// [◄(1) 2] [◄(2) 3]
// [◄(2) 4] [◄(4) 5] [◄(5) 6]
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestIgnoreBlocksBelowFinalizedView(t *testing.T) {
}

// TestDoubleProposal tests that the DoubleProposal notification is emitted when two different
// blocks for the same view are added. We ingest the the following block tree:
// blocks for the same view are added. We ingest the following block tree:
//
// / [◄(1) 2]
// [1]
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestConflictingQCs(t *testing.T) {
}

// TestConflictingFinalizedForks checks that finalizing 2 conflicting forks should return model.ByzantineThresholdExceededError
// We ingest the the following block tree:
// We ingest the following block tree:
//
// [◄(1) 2] [◄(2) 3] [◄(3) 4] [◄(4) 5]
// [◄(2) 6] [◄(6) 7] [◄(7) 8]
Expand Down
51 changes: 47 additions & 4 deletions consensus/hotstuff/helper/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,42 @@ func WithBlockQC(qc *flow.QuorumCertificate) func(*model.Block) {
}
}

func MakeSignedProposal(options ...func(*model.SignedProposal)) *model.SignedProposal {
proposal := &model.SignedProposal{
Proposal: *MakeProposal(),
SigData: unittest.SignatureFixture(),
}
for _, option := range options {
option(proposal)
}
return proposal
}

func MakeProposal(options ...func(*model.Proposal)) *model.Proposal {
proposal := &model.Proposal{
Block: MakeBlock(),
SigData: unittest.SignatureFixture(),
Block: MakeBlock(),
LastViewTC: nil,
}
for _, option := range options {
option(proposal)
}
return proposal
}

func WithProposal(proposal *model.Proposal) func(*model.SignedProposal) {
return func(signedProposal *model.SignedProposal) {
signedProposal.Proposal = *proposal
}
}

func WithBlock(block *model.Block) func(*model.Proposal) {
return func(proposal *model.Proposal) {
proposal.Block = block
}
}

func WithSigData(sigData []byte) func(*model.Proposal) {
return func(proposal *model.Proposal) {
func WithSigData(sigData []byte) func(*model.SignedProposal) {
return func(proposal *model.SignedProposal) {
proposal.SigData = sigData
}
}
Expand All @@ -84,3 +101,29 @@ func WithLastViewTC(lastViewTC *flow.TimeoutCertificate) func(*model.Proposal) {
proposal.LastViewTC = lastViewTC
}
}

// SignedProposalToFlow turns a block proposal into a flow header.
//
// CAUTION: This function is only suitable for TESTING purposes ONLY.
// In the conversion from `flow.Header` to HoStuff's `model.Block` we loose information
// (e.g. `ChainID` and `Height` are not included in `model.Block`) and hence the conversion
// is *not reversible*. This is on purpose, because we wanted to only expose data to
// HotStuff that HotStuff really needs.
func SignedProposalToFlow(proposal *model.SignedProposal) *flow.Header {

block := proposal.Block
header := &flow.Header{
ParentID: block.QC.BlockID,
PayloadHash: block.PayloadHash,
Timestamp: block.Timestamp,
View: block.View,
ParentView: block.QC.View,
ParentVoterIndices: block.QC.SignerIndices,
ParentVoterSigData: block.QC.SigData,
ProposerID: block.ProposerID,
ProposerSigData: proposal.SigData,
LastViewTC: proposal.LastViewTC,
}

return header
}
2 changes: 1 addition & 1 deletion consensus/hotstuff/integration/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Connect(t *testing.T, instances []*Instance) {
}

// convert into proposal immediately
proposal := model.ProposalFromFlow(header)
proposal := model.SignedProposalFromFlow(header)

// store locally and loop back to engine for processing
sender.ProcessBlock(proposal)
Expand Down
18 changes: 9 additions & 9 deletions consensus/hotstuff/integration/filters_test.go
durkmurder marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// VoteFilter is a filter function for dropping Votes.
// Return value `true` implies that the the given Vote should be
// Return value `true` implies that the given Vote should be
// dropped, while `false` indicates that the Vote should be received.
type VoteFilter func(*model.Vote) bool

Expand All @@ -34,34 +34,34 @@ func BlockVotesBy(voterID flow.Identifier) VoteFilter {
}

// ProposalFilter is a filter function for dropping Proposals.
// Return value `true` implies that the the given Proposal should be
// dropped, while `false` indicates that the Proposal should be received.
type ProposalFilter func(*model.Proposal) bool
// Return value `true` implies that the given SignedProposal should be
// dropped, while `false` indicates that the SignedProposal should be received.
type ProposalFilter func(*model.SignedProposal) bool

func BlockNoProposals(*model.Proposal) bool {
func BlockNoProposals(*model.SignedProposal) bool {
return false
}

func BlockAllProposals(*model.Proposal) bool {
func BlockAllProposals(*model.SignedProposal) bool {
return true
}

// BlockProposalRandomly drops proposals randomly with a probability of `dropProbability` ∈ [0,1]
func BlockProposalRandomly(dropProbability float64) ProposalFilter {
return func(*model.Proposal) bool {
return func(*model.SignedProposal) bool {
return rand.Float64() < dropProbability
}
}

// BlockProposalsBy drops all proposals originating from the specified `proposerID`
func BlockProposalsBy(proposerID flow.Identifier) ProposalFilter {
return func(proposal *model.Proposal) bool {
return func(proposal *model.SignedProposal) bool {
return proposal.Block.ProposerID == proposerID
}
}

// TimeoutObjectFilter is a filter function for dropping TimeoutObjects.
// Return value `true` implies that the the given TimeoutObject should be
// Return value `true` implies that the given TimeoutObject should be
// dropped, while `false` indicates that the TimeoutObject should be received.
type TimeoutObjectFilter func(*model.TimeoutObject) bool

Expand Down
Loading
Loading