Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipelining #78

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1d1683e
add block to syncInfo
leandernikolaus Sep 26, 2022
3de76bd
simplify updateHighQC
leandernikolaus Sep 26, 2022
854d64a
changed updateHighQC
leandernikolaus Sep 26, 2022
95dcd52
pipelining in synchronizer
leandernikolaus Oct 2, 2022
f7519b0
pipelined commit rule
leandernikolaus Oct 2, 2022
0745f44
fix updatehighqc
leandernikolaus Oct 2, 2022
a1598c0
pass new leaf block to syncronizer
leandernikolaus Oct 2, 2022
38d65c3
collect certificates for multile blocks
leandernikolaus Oct 2, 2022
2e8d598
fix leafblock handling in syncronizer
leandernikolaus Oct 2, 2022
d2508b2
send vote to correct leader
leandernikolaus Oct 3, 2022
b7cd66b
fix bug on timeout
leandernikolaus Oct 3, 2022
600a0f7
reorder synchronizer
leandernikolaus Oct 4, 2022
7813a4e
fixed typos
leandernikolaus Oct 4, 2022
eb95aa1
add isInPipelineStretch function
leandernikolaus Oct 4, 2022
9b77e81
pipelinedViews flag
leandernikolaus Oct 4, 2022
e13b461
added some Twins tests and fixes for bugs found.
leandernikolaus Oct 7, 2022
df3fc32
fixing comments on synchronizer
leandernikolaus Oct 7, 2022
1c0b9a2
fix mock synchronizer
leandernikolaus Oct 7, 2022
b3c68c9
fix error handling in twins/network.go
leandernikolaus Oct 7, 2022
aae49db
0 is an ok value for a view.
leandernikolaus Oct 7, 2022
c563b9c
check leafblock for view and logging
leandernikolaus Oct 11, 2022
5a92037
fixing out of order proposals
leandernikolaus Oct 12, 2022
7e59d24
separate nextView and currentView to avoid advancing on local timeout
leandernikolaus Oct 13, 2022
fb04815
fix pr comments
leandernikolaus Oct 13, 2022
8420756
fix fhsbug test
leandernikolaus Oct 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions consensus/chainedhotstuff/chainedhotstuff.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,46 @@ func (hs *ChainedHotStuff) CommitRule(block *hotstuff.Block) *hotstuff.Block {
return nil
}

// normal hotstuff
if block1.Parent() == block2.Hash() && block2.Parent() == block3.Hash() {
hs.logger.Debug("DECIDE: ", block3)
return block3
}

// pipelined rule
// check if all blocks between b1 and b3 are confirmed
qcs := make(map[hotstuff.Hash]bool, block.View()-block3.View())
for loopblock := block; loopblock.View() > block1.View(); {
qcs[loopblock.QuorumCert().BlockHash()] = true
parent, ok := hs.blockChain.LocalGet(loopblock.Parent())
if !ok {
hs.logger.Info("CommitRule: unable to retrieve parent block")
return nil
}
loopblock = parent
}
for loopblock := block1; loopblock.View() > block3.View(); {
if !qcs[loopblock.Hash()] {
hs.logger.Info("CommitRule: unconfirmed parent block")
return nil
}
qcs[loopblock.QuorumCert().BlockHash()] = true
parent, ok := hs.blockChain.LocalGet(loopblock.Parent())
if !ok {
hs.logger.Info("CommitRule: unable to retrieve parent block")
return nil
}
if parent.View() != loopblock.View()-1 {
hs.logger.Info("CommitRule: found view without block")
return nil
}
if parent.Hash() == block3.Hash() {
hs.logger.Debug("DECIDE: ", block3)
return block3
}
loopblock = parent
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (cs *consensusBase) OnPropose(proposal hotstuff.ProposeMsg) { //nolint:gocy
cs.commit(b)
}
if !didAdvanceView {
cs.synchronizer.AdvanceView(hotstuff.NewSyncInfo().WithQC(block.QuorumCert()))
cs.synchronizer.AdvanceView(hotstuff.NewSyncInfo().WithQC(block.QuorumCert()).WithBlock(block))
}
}()

Expand All @@ -240,7 +240,7 @@ func (cs *consensusBase) OnPropose(proposal hotstuff.ProposeMsg) { //nolint:gocy
return
}

leaderID := cs.leaderRotation.GetLeader(cs.lastVote + 1)
leaderID := cs.leaderRotation.GetLeader(block.View() + cs.synchronizer.PipelinedViews())
if leaderID == cs.opts.ID() {
cs.eventLoop.AddEvent(hotstuff.VoteMsg{ID: cs.opts.ID(), PartialCert: pc})
return
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestVote(t *testing.T) {
ctrl := gomock.NewController(t)
bl := testutil.CreateBuilders(t, ctrl, n)
cs := mocks.NewMockConsensus(ctrl)
bl[0].Add(synchronizer.New(testutil.FixedTimeout(1000)), cs)
bl[0].Add(synchronizer.New(testutil.FixedTimeout(1000), 1), cs)
hl := bl.Build()
hs := hl[0]

Expand Down
4 changes: 2 additions & 2 deletions consensus/votingmachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (vm *VotingMachine) OnVote(vote hotstuff.VoteMsg) {
}
}

if block.View() <= vm.synchronizer.LeafBlock().View() {
if block.View() <= vm.synchronizer.HighQC().View() {
// too old
return
}
Expand All @@ -101,7 +101,7 @@ func (vm *VotingMachine) verifyCert(cert hotstuff.PartialCert, block *hotstuff.B
// delete any pending QCs with lower height than bLeaf
for k := range vm.verifiedVotes {
if block, ok := vm.blockChain.LocalGet(k); ok {
if block.View() <= vm.synchronizer.LeafBlock().View() {
if block.View() <= vm.synchronizer.HighQC().View() {
delete(vm.verifiedVotes, k)
}
} else {
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func init() {
runCmd.Flags().String("leader-rotation", "round-robin", "name of the leader rotation algorithm")
runCmd.Flags().Int64("shared-seed", 0, "Shared random number generator seed")
runCmd.Flags().StringSlice("modules", nil, "Name additional modules to be loaded.")
runCmd.Flags().Int("pipelined-views", 1, "number of blocks/views proposed concurrently")

runCmd.Flags().Bool("worker", false, "run a local worker")
runCmd.Flags().StringSlice("hosts", nil, "the remote hosts to run the experiment on via ssh")
Expand Down Expand Up @@ -116,6 +117,7 @@ func runController() {
MaxTimeout: durationpb.New(viper.GetDuration("max-timeout")),
SharedSeed: viper.GetInt64("shared-seed"),
Modules: viper.GetStringSlice("modules"),
PipelinedViews: viper.GetUint32("pipelined-views"),
},
ClientOpts: &orchestrationpb.ClientOpts{
UseTLS: true,
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestration/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (w *Worker) createReplica(opts *orchestrationpb.ReplicaOpts) (*replica.Repl
float64(opts.GetInitialTimeout().AsDuration().Nanoseconds())/float64(time.Millisecond),
float64(opts.GetMaxTimeout().AsDuration().Nanoseconds())/float64(time.Millisecond),
float64(opts.GetTimeoutMultiplier()),
))
), opts.GetPipelinedViews())

builder.Add(
eventloop.New(1000),
Expand Down
403 changes: 203 additions & 200 deletions internal/proto/orchestrationpb/orchestration.pb.go

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions internal/proto/orchestrationpb/orchestration.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ message ReplicaOpts {
// Determines whether TLS should be used.
bool UseTLS = 4;
// The replica's TLS certificate.
optional bytes Certificate = 5;
bytes Certificate = 5;
// The private key of the TLS certificate.
optional bytes CertificateKey = 6;
bytes CertificateKey = 6;
// The certificate authority that created the TLS certificates.
optional bytes CertificateAuthority = 7;
bytes CertificateAuthority = 7;
// The name of the crypto implementation to use.
string Crypto = 8;
// The name of the consensus implementation to use.
Expand All @@ -51,6 +51,8 @@ message ReplicaOpts {
int64 SharedSeed = 20;
// A list of modules to load.
repeated string Modules = 21;
// Number of blocks proposed concurrently
uint32 PipelinedViews = 22;
}

// ReplicaInfo is the information that the replicas need about each other.
Expand Down Expand Up @@ -124,7 +126,7 @@ message StartClientRequest {
// The clients to create.
map<uint32, ClientOpts> Clients = 1;
// The certificate authority that created the TLS certificates.
optional bytes CertificateAuthority = 7;
bytes CertificateAuthority = 7;
// The replicas to connect to.
map<uint32, ReplicaInfo> Configuration = 10;
}
Expand Down
2 changes: 2 additions & 0 deletions modules/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ type Synchronizer interface {
HighQC() hotstuff.QuorumCert
// LeafBlock returns the current leaf block.
LeafBlock() *hotstuff.Block
// PipelinedViews returns the number of concurrently executed views.
PipelinedViews() hotstuff.View
// Start starts the synchronizer with the given context.
Start(context.Context)
}
Expand Down
79 changes: 67 additions & 12 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type Synchronizer struct {
duration ViewDuration
timer *time.Timer

// number of concurrent views, 0 and 1 both give no concurrency.
pipelinedViews hotstuff.View

viewCtx context.Context // a context that is cancelled at the end of the current view
cancelCtx context.CancelFunc

Expand Down Expand Up @@ -85,12 +88,14 @@ func (s *Synchronizer) InitModule(mods *modules.Core) {
}

// New creates a new Synchronizer.
func New(viewDuration ViewDuration) modules.Synchronizer {
func New(viewDuration ViewDuration, pipelinedViews uint32) modules.Synchronizer {
ctx, cancel := context.WithCancel(context.Background())
return &Synchronizer{
leafBlock: hotstuff.GetGenesis(),
currentView: 1,

pipelinedViews: hotstuff.View(pipelinedViews),

viewCtx: ctx,
cancelCtx: cancel,

Expand Down Expand Up @@ -120,6 +125,14 @@ func (s *Synchronizer) Start(ctx context.Context) {
}
}

// PipelinedViews returns 1 or the number of concurrent views in the pipeline
func (s *Synchronizer) PipelinedViews() hotstuff.View {
if s.pipelinedViews < 1 {
return 1
}
return hotstuff.View(s.pipelinedViews)
}

// HighQC returns the highest known QC.
func (s *Synchronizer) HighQC() hotstuff.QuorumCert {
return s.highQC
Expand Down Expand Up @@ -192,6 +205,11 @@ func (s *Synchronizer) OnLocalTimeout() {
s.consensus.StopVoting(s.currentView)

s.configuration.Timeout(timeoutMsg)

if s.isInPipelineStretch(s.currentView) {
s.AdvanceView(hotstuff.NewSyncInfo().WithTimeoutView(s.currentView))
}

s.OnRemoteTimeout(timeoutMsg)
}

Expand Down Expand Up @@ -315,6 +333,20 @@ func (s *Synchronizer) AdvanceView(syncInfo hotstuff.SyncInfo) {
}
}

b, ok := syncInfo.Block()
meling marked this conversation as resolved.
Show resolved Hide resolved
meling marked this conversation as resolved.
Show resolved Hide resolved
if ok {
s.updateLeafBlock(b)
}

if s.isInPipelineStretch(s.leafBlock.View()) {
v = s.leafBlock.View()
}

if timeoutV, ok := syncInfo.TimeoutView(); ok &&
meling marked this conversation as resolved.
Show resolved Hide resolved
timeoutV > v && s.isInPipelineStretch(timeoutV) {
v = timeoutV
}

if v < s.currentView {
return
}
Expand All @@ -339,31 +371,54 @@ func (s *Synchronizer) AdvanceView(syncInfo hotstuff.SyncInfo) {

leader := s.leaderRotation.GetLeader(s.currentView)
if leader == s.opts.ID() {
s.consensus.Propose(syncInfo)
s.consensus.Propose(syncInfo.WithQC(s.highQC))
} else if replica, ok := s.configuration.Replica(leader); ok {
replica.NewView(syncInfo)
}
}

// isInPipelineStretch checks wether the given view lies
// less then the pipelinestretch from the highQC
func (s *Synchronizer) isInPipelineStretch(v hotstuff.View) bool {
meling marked this conversation as resolved.
Show resolved Hide resolved
return v > s.highQC.View() && v < s.highQC.View()+s.pipelinedViews
}

// updateHighQC attempts to update the highQC, but does not verify the qc first.
// This method is meant to be used instead of the exported UpdateHighQC internally
// in this package when the qc has already been verified.
// This method ensures, the block of the highQC is always available locally.
func (s *Synchronizer) updateHighQC(qc hotstuff.QuorumCert) {
newBlock, ok := s.blockChain.Get(qc.BlockHash())
if !ok {
s.logger.Info("updateHighQC: Could not find block referenced by new QC!")
return
if qc.View() > s.highQC.View() {
highQCBlock, ok := s.blockChain.Get(s.highQC.BlockHash())
if !ok {
s.logger.Info("updateHighQC: Could not find block referenced by new QC!")
return
}

s.highQC = qc
s.logger.Debug("HighQC updated")

if s.leafBlock.View() < s.highQC.View() || !s.blockChain.Extends(s.leafBlock, highQCBlock) {
s.leafBlock = highQCBlock
}
}
}

oldBlock, ok := s.blockChain.Get(s.highQC.BlockHash())
// updateLeafBlock attempts to update the leafBlock.
// This method ensures, leafblock extends highQC.
meling marked this conversation as resolved.
Show resolved Hide resolved
func (s *Synchronizer) updateLeafBlock(b *hotstuff.Block) {
highQCBlock, ok := s.blockChain.Get(s.highQC.BlockHash())
if !ok {
s.logger.Panic("Block from the old highQC missing from chain")
s.logger.Error("updateLeafBlock: Could not find block referenced by new QC!")
return
}

if newBlock.View() > oldBlock.View() {
s.highQC = qc
s.leafBlock = newBlock
s.logger.Debug("HighQC updated")
if !s.blockChain.Extends(b, highQCBlock) {
s.logger.Info("updateLeafBlock: new block did not extend highQC!")
return
}
if b.View() > s.leafBlock.View() {
s.leafBlock = b
}
}

Expand Down
4 changes: 2 additions & 2 deletions synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestAdvanceViewQC(t *testing.T) {
const n = 4
ctrl := gomock.NewController(t)
builders := testutil.CreateBuilders(t, ctrl, n)
s := New(testutil.FixedTimeout(1000))
s := New(testutil.FixedTimeout(1000), 1)
hs := mocks.NewMockConsensus(ctrl)
builders[0].Add(s, hs)

Expand Down Expand Up @@ -50,7 +50,7 @@ func TestAdvanceViewTC(t *testing.T) {
const n = 4
ctrl := gomock.NewController(t)
builders := testutil.CreateBuilders(t, ctrl, n)
s := New(testutil.FixedTimeout(100))
s := New(testutil.FixedTimeout(100), 1)
hs := mocks.NewMockConsensus(ctrl)
builders[0].Add(s, hs)

Expand Down
2 changes: 1 addition & 1 deletion twins/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (n *Network) createTwinsNodes(nodes []NodeID, scenario Scenario, consensusN
consensus.New(consensusModule),
consensus.NewVotingMachine(),
crypto.NewCache(ecdsa.New(), 100),
synchronizer.New(FixedTimeout(0)),
synchronizer.New(FixedTimeout(0), 1),
logging.NewWithDest(&node.log, fmt.Sprintf("r%dn%d", nodeID.ReplicaID, nodeID.NetworkID)),
// twins-specific:
&configuration{network: n, node: node},
Expand Down
27 changes: 22 additions & 5 deletions twins/scenario_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,23 @@ func TestBasicScenario(t *testing.T) {
for i := 1; i <= 4; i++ {
allNodesSet.Add(uint32(i))
}
lonely := make(NodeSet)
lonely.Add(3)

others := make(NodeSet)
for i := 1; i <= 4; i++ {
if !lonely.Contains(uint32(i)) {
others.Add(uint32(i))
}
}

s = append(s, View{Leader: 1, Partitions: []NodeSet{allNodesSet}})
s = append(s, View{Leader: 1, Partitions: []NodeSet{allNodesSet}})
s = append(s, View{Leader: 1, Partitions: []NodeSet{allNodesSet}})
s = append(s, View{Leader: 1, Partitions: []NodeSet{allNodesSet}})
s = append(s, View{Leader: 2, Partitions: []NodeSet{allNodesSet}})
s = append(s, View{Leader: 3, Partitions: []NodeSet{lonely, others}})
s = append(s, View{Leader: 4, Partitions: []NodeSet{allNodesSet}})
// s = append(s, View{Leader: 1, Partitions: []NodeSet{allNodesSet}})
// s = append(s, View{Leader: 2, Partitions: []NodeSet{allNodesSet}})
// s = append(s, View{Leader: 3, Partitions: []NodeSet{allNodesSet}})

result, err := ExecuteScenario(s, 4, 0, 100, "chainedhotstuff")
if err != nil {
Expand All @@ -26,7 +39,11 @@ func TestBasicScenario(t *testing.T) {
t.Errorf("Expected no safety violations")
}

if result.Commits != 1 {
t.Error("Expected one commit")
if result.Commits < 1 {
t.Error("Expected at least one commit")
}

if result.Commits > 1 {
t.Error("Expected only one commit")
}
}
Loading