Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BFT] model.Proposal refactoring #6526

Merged
merged 17 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions consensus/hotstuff/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type VoteAggregationViolationConsumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnVoteForInvalidBlockDetected(vote *model.Vote, invalidProposal *model.Proposal)
OnVoteForInvalidBlockDetected(vote *model.Vote, invalidProposal *model.SignedProposal)
}

// TimeoutAggregationViolationConsumer consumes outbound notifications about Active Pacemaker violations specifically
Expand Down Expand Up @@ -138,7 +138,7 @@ type ParticipantConsumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnReceiveProposal(currentView uint64, proposal *model.Proposal)
OnReceiveProposal(currentView uint64, proposal *model.SignedProposal)

// OnReceiveQc notifications are produced by the EventHandler when it starts processing a
// QuorumCertificate [QC] constructed by the node's internal vote aggregator.
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type EventHandler interface {
// consensus participant.
// All inputs should be validated before feeding into this function. Assuming trusted data.
// No errors are expected during normal operation.
OnReceiveProposal(proposal *model.Proposal) error
OnReceiveProposal(proposal *model.SignedProposal) error

// OnLocalTimeout handles a local timeout event by creating a model.TimeoutObject and broadcasting it.
// No errors are expected during normal operation.
Expand Down
8 changes: 4 additions & 4 deletions consensus/hotstuff/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (e *EventHandler) OnReceiveTc(tc *flow.TimeoutCertificate) error {
// consensus participant.
// All inputs should be validated before feeding into this function. Assuming trusted data.
// No errors are expected during normal operation.
func (e *EventHandler) OnReceiveProposal(proposal *model.Proposal) error {
func (e *EventHandler) OnReceiveProposal(proposal *model.SignedProposal) error {
block := proposal.Block
curView := e.paceMaker.CurView()
log := e.log.With().
Expand Down Expand Up @@ -429,7 +429,7 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
lastViewTC = nil
}

// Construct Own Proposal
// Construct Own SignedProposal
// CAUTION, design constraints:
// (i) We cannot process our own proposal within the `EventHandler` right away.
// (ii) We cannot add our own proposal to Forks here right away.
Expand Down Expand Up @@ -491,7 +491,7 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
// It is called AFTER the block has been stored or found in Forks
// It checks whether to vote for this block.
// No errors are expected during normal operation.
func (e *EventHandler) processBlockForCurrentView(proposal *model.Proposal) error {
func (e *EventHandler) processBlockForCurrentView(proposal *model.SignedProposal) error {
// sanity check that block is really for the current view:
curView := e.paceMaker.CurView()
block := proposal.Block
Expand Down Expand Up @@ -526,7 +526,7 @@ func (e *EventHandler) processBlockForCurrentView(proposal *model.Proposal) erro
// ownVote generates and forwards the own vote, if we decide to vote.
// Any errors are potential symptoms of uncovered edge cases or corrupted internal state (fatal).
// No errors are expected during normal operation.
func (e *EventHandler) ownVote(proposal *model.Proposal, curView uint64, nextLeader flow.Identifier) error {
func (e *EventHandler) ownVote(proposal *model.SignedProposal, curView uint64, nextLeader flow.Identifier) error {
block := proposal.Block
log := e.log.With().
Uint64("block_view", block.View).
Expand Down
29 changes: 12 additions & 17 deletions consensus/hotstuff/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ func NewSafetyRules(t *testing.T) *SafetyRules {

// SafetyRules will not vote for any block, unless the blockID exists in votable map
safetyRules.On("ProduceVote", mock.Anything, mock.Anything).Return(
func(block *model.Proposal, _ uint64) *model.Vote {
func(block *model.SignedProposal, _ uint64) *model.Vote {
_, ok := safetyRules.votable[block.Block.BlockID]
if !ok {
return nil
}
return createVote(block.Block)
},
func(block *model.Proposal, _ uint64) error {
func(block *model.SignedProposal, _ uint64) error {
_, ok := safetyRules.votable[block.Block.BlockID]
if !ok {
return model.NewNoVoteErrorf("block not found")
Expand Down Expand Up @@ -179,7 +179,7 @@ func NewForks(t *testing.T, finalized uint64) *Forks {
}

f.On("AddValidatedBlock", mock.Anything).Return(func(proposal *model.Block) error {
log.Info().Msgf("forks.AddValidatedBlock received Proposal for view: %v, QC: %v\n", proposal.View, proposal.QC.View)
log.Info().Msgf("forks.AddValidatedBlock received SignedProposal for view: %v, QC: %v\n", proposal.View, proposal.QC.View)
durkmurder marked this conversation as resolved.
Show resolved Hide resolved
return f.addProposal(proposal)
}).Maybe()

Expand Down Expand Up @@ -228,14 +228,12 @@ type BlockProducer struct {
}

func (b *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error) {
return model.ProposalToFlow(&model.Proposal{
Block: helper.MakeBlock(
return model.SignedProposalToFlow(helper.MakeSignedProposal(helper.WithProposal(
helper.MakeProposal(helper.WithBlock(helper.MakeBlock(
helper.WithBlockView(view),
helper.WithBlockQC(qc),
helper.WithBlockProposer(b.proposerID),
),
LastViewTC: lastViewTC,
}), nil
helper.WithBlockProposer(b.proposerID))),
helper.WithLastViewTC(lastViewTC))))), nil
}

func TestEventHandler(t *testing.T) {
Expand All @@ -258,8 +256,8 @@ type EventHandlerSuite struct {

initView uint64 // the current view at the beginning of the test case
endView uint64 // the expected current view at the end of the test case
parentProposal *model.Proposal
votingProposal *model.Proposal
parentProposal *model.SignedProposal
votingProposal *model.SignedProposal
qc *flow.QuorumCertificate
tc *flow.TimeoutCertificate
newview *model.NewViewEvent
Expand Down Expand Up @@ -670,7 +668,7 @@ func (es *EventHandlerSuite) TestOnReceiveTc_NextLeaderProposes() {

// proposed block should contain valid newest QC and lastViewTC
expectedNewestQC := es.paceMaker.NewestQC()
proposal := model.ProposalFromFlow(header)
proposal := model.SignedProposalFromFlow(header)
require.Equal(es.T(), expectedNewestQC, proposal.Block.QC)
require.Equal(es.T(), es.paceMaker.LastViewTC(), proposal.LastViewTC)
}).Once()
Expand Down Expand Up @@ -1033,10 +1031,7 @@ func createVote(block *model.Block) *model.Vote {
}
}

func createProposal(view uint64, qcview uint64) *model.Proposal {
func createProposal(view uint64, qcview uint64) *model.SignedProposal {
block := createBlockWithQC(view, qcview)
return &model.Proposal{
Block: block,
SigData: nil,
}
return helper.MakeSignedProposal(helper.WithProposal(helper.MakeProposal(helper.WithBlock(block))))
}
4 changes: 2 additions & 2 deletions consensus/hotstuff/eventloop/event_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// it contains an attached insertionTime that is used to measure how long we have waited between queening proposal and
// actually processing by `EventHandler`.
type queuedProposal struct {
proposal *model.Proposal
proposal *model.SignedProposal
insertionTime time.Time
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (el *EventLoop) loop(ctx context.Context) error {
}

// SubmitProposal pushes the received block to the proposals channel
func (el *EventLoop) SubmitProposal(proposal *model.Proposal) {
func (el *EventLoop) SubmitProposal(proposal *model.SignedProposal) {
queueItem := queuedProposal{
proposal: proposal,
insertionTime: time.Now(),
Expand Down
8 changes: 4 additions & 4 deletions consensus/hotstuff/eventloop/event_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *EventLoopTestSuite) TestReadyDone() {

// Test_SubmitQC tests that submitted proposal is eventually sent to event handler for processing
func (s *EventLoopTestSuite) Test_SubmitProposal() {
proposal := helper.MakeProposal()
proposal := helper.MakeSignedProposal()
processed := atomic.NewBool(false)
s.eh.On("OnReceiveProposal", proposal).Run(func(args mock.Arguments) {
processed.Store(true)
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestEventLoop_Timeout(t *testing.T) {
go func() {
defer wg.Done()
for !processed.Load() {
eventLoop.SubmitProposal(helper.MakeProposal())
eventLoop.SubmitProposal(helper.MakeSignedProposal())
}
}()

Expand Down Expand Up @@ -258,7 +258,7 @@ func TestReadyDoneWithStartTime(t *testing.T) {
require.NoError(t, err)

done := make(chan struct{})
eh.On("OnReceiveProposal", mock.AnythingOfType("*model.Proposal")).Run(func(args mock.Arguments) {
eh.On("OnReceiveProposal", mock.AnythingOfType("*model.SignedProposal")).Run(func(args mock.Arguments) {
require.True(t, time.Now().After(startTime))
close(done)
}).Return(nil).Once()
Expand All @@ -271,7 +271,7 @@ func TestReadyDoneWithStartTime(t *testing.T) {

parentBlock := unittest.BlockHeaderFixture()
block := unittest.BlockHeaderWithParentFixture(parentBlock)
eventLoop.SubmitProposal(model.ProposalFromFlow(block))
eventLoop.SubmitProposal(model.SignedProposalFromFlow(block))

unittest.RequireCloseBefore(t, done, startTimeDuration+100*time.Millisecond, "proposal wasn't received")
cancel()
Expand Down
1 change: 0 additions & 1 deletion consensus/hotstuff/forks/block_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (bb *BlockBuilder) Proposals() ([]*model.Proposal, error) {
PayloadHash: payloadHash,
},
LastViewTC: lastViewTC,
SigData: nil,
}
proposal.Block.BlockID = makeBlockID(proposal.Block)

Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/forks/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (f *Forks) checkForAdvancingFinalization(certifiedBlock *model.CertifiedBlo
parentBlock := parentVertex.(*BlockContainer).Block()

// Note: we assume that all stored blocks pass Forks.EnsureBlockIsValidExtension(block);
// specifically, that Proposal's ViewNumber is strictly monotonically
// specifically, that SignedProposal's ViewNumber is strictly monotonically
durkmurder marked this conversation as resolved.
Show resolved Hide resolved
// increasing which is enforced by LevelledForest.VerifyVertex(...)
// We denote:
// * a DIRECT 1-chain as '<-'
Expand Down
25 changes: 21 additions & 4 deletions consensus/hotstuff/helper/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,42 @@ func WithBlockQC(qc *flow.QuorumCertificate) func(*model.Block) {
}
}

func MakeSignedProposal(options ...func(*model.SignedProposal)) *model.SignedProposal {
proposal := &model.SignedProposal{
Proposal: *MakeProposal(),
SigData: unittest.SignatureFixture(),
}
for _, option := range options {
option(proposal)
}
return proposal
}

func MakeProposal(options ...func(*model.Proposal)) *model.Proposal {
proposal := &model.Proposal{
Block: MakeBlock(),
SigData: unittest.SignatureFixture(),
Block: MakeBlock(),
LastViewTC: nil,
}
for _, option := range options {
option(proposal)
}
return proposal
}

func WithProposal(proposal *model.Proposal) func(*model.SignedProposal) {
return func(signedProposal *model.SignedProposal) {
signedProposal.Proposal = *proposal
}
}

func WithBlock(block *model.Block) func(*model.Proposal) {
return func(proposal *model.Proposal) {
proposal.Block = block
}
}

func WithSigData(sigData []byte) func(*model.Proposal) {
return func(proposal *model.Proposal) {
func WithSigData(sigData []byte) func(*model.SignedProposal) {
return func(proposal *model.SignedProposal) {
proposal.SigData = sigData
}
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/integration/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Connect(t *testing.T, instances []*Instance) {
}

// convert into proposal immediately
proposal := model.ProposalFromFlow(header)
proposal := model.SignedProposalFromFlow(header)

// store locally and loop back to engine for processing
sender.ProcessBlock(proposal)
Expand Down
14 changes: 7 additions & 7 deletions consensus/hotstuff/integration/filters_test.go
durkmurder marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@ func BlockVotesBy(voterID flow.Identifier) VoteFilter {
}

// ProposalFilter is a filter function for dropping Proposals.
// Return value `true` implies that the the given Proposal should be
// dropped, while `false` indicates that the Proposal should be received.
type ProposalFilter func(*model.Proposal) bool
// Return value `true` implies that the the given SignedProposal should be
durkmurder marked this conversation as resolved.
Show resolved Hide resolved
// dropped, while `false` indicates that the SignedProposal should be received.
type ProposalFilter func(*model.SignedProposal) bool

func BlockNoProposals(*model.Proposal) bool {
func BlockNoProposals(*model.SignedProposal) bool {
return false
}

func BlockAllProposals(*model.Proposal) bool {
func BlockAllProposals(*model.SignedProposal) bool {
return true
}

// BlockProposalRandomly drops proposals randomly with a probability of `dropProbability` ∈ [0,1]
func BlockProposalRandomly(dropProbability float64) ProposalFilter {
return func(*model.Proposal) bool {
return func(*model.SignedProposal) bool {
return rand.Float64() < dropProbability
}
}

// BlockProposalsBy drops all proposals originating from the specified `proposerID`
func BlockProposalsBy(proposerID flow.Identifier) ProposalFilter {
return func(proposal *model.Proposal) bool {
return func(proposal *model.SignedProposal) bool {
return proposal.Block.ProposerID == proposerID
}
}
Expand Down
14 changes: 7 additions & 7 deletions consensus/hotstuff/integration/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Instance struct {
queue chan interface{}
updatingBlocks sync.RWMutex
headers map[flow.Identifier]*flow.Header
pendings map[flow.Identifier]*model.Proposal // indexed by parent ID
pendings map[flow.Identifier]*model.SignedProposal // indexed by parent ID

// mocked dependencies
committee *mocks.DynamicCommittee
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
stop: cfg.StopCondition,

// instance data
pendings: make(map[flow.Identifier]*model.Proposal),
pendings: make(map[flow.Identifier]*model.SignedProposal),
headers: make(map[flow.Identifier]*flow.Header),
queue: make(chan interface{}, 1024),

Expand Down Expand Up @@ -294,7 +294,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
}

// convert into proposal immediately
proposal := model.ProposalFromFlow(header)
proposal := model.SignedProposalFromFlow(header)

// store locally and loop back to engine for processing
in.ProcessBlock(proposal)
Expand Down Expand Up @@ -403,7 +403,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
minRequiredWeight := committees.WeightThresholdToBuildQC(uint64(len(in.participants)) * weight)
voteProcessorFactory := mocks.NewVoteProcessorFactory(t)
voteProcessorFactory.On("Create", mock.Anything, mock.Anything).Return(
func(log zerolog.Logger, proposal *model.Proposal) hotstuff.VerifyingVoteProcessor {
func(log zerolog.Logger, proposal *model.SignedProposal) hotstuff.VerifyingVoteProcessor {
stakingSigAggtor := helper.MakeWeightedSignatureAggregator(weight)
stakingSigAggtor.On("Verify", mock.Anything, mock.Anything).Return(nil).Maybe()

Expand Down Expand Up @@ -597,7 +597,7 @@ func (in *Instance) Run() error {
}
case msg := <-in.queue:
switch m := msg.(type) {
case *model.Proposal:
case *model.SignedProposal:
// add block to aggregator
in.voteAggregator.AddBlock(m)
// then pass to event handler
Expand Down Expand Up @@ -629,15 +629,15 @@ func (in *Instance) Run() error {
}
}

func (in *Instance) ProcessBlock(proposal *model.Proposal) {
func (in *Instance) ProcessBlock(proposal *model.SignedProposal) {
in.updatingBlocks.Lock()
defer in.updatingBlocks.Unlock()
_, parentExists := in.headers[proposal.Block.QC.BlockID]

if parentExists {
next := proposal
for next != nil {
in.headers[next.Block.BlockID] = model.ProposalToFlow(next)
in.headers[next.Block.BlockID] = model.SignedProposalToFlow(next)

in.queue <- next
// keep processing the pending blocks
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/mocks/consumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions consensus/hotstuff/mocks/event_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading