diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 1dbb6eb8..4a9587d8 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -6,17 +6,19 @@ import ( "sync" "github.com/relab/hotstuff/consensus" + "github.com/relab/hotstuff/internal/proto/hotstuffpb" ) // blockChain stores a limited amount of blocks in a map. // blocks are evicted in LRU order. type blockChain struct { - mods *consensus.Modules - mut sync.Mutex - pruneHeight consensus.View - blocks map[consensus.Hash]*consensus.Block - blockAtHeight map[consensus.View]*consensus.Block - pendingFetch map[consensus.Hash]context.CancelFunc // allows a pending fetch operation to be cancelled + mods *consensus.Modules + mut sync.Mutex + pruneHeight consensus.View + previousCheckPointedView consensus.View + blocks map[consensus.Hash]*consensus.Block + blockAtHeight map[consensus.View]*consensus.Block + pendingFetch map[consensus.Hash]context.CancelFunc // allows a pending fetch operation to be cancelled } // InitConsensusModule gives the module a reference to the Modules object. @@ -29,14 +31,30 @@ func (chain *blockChain) InitConsensusModule(mods *consensus.Modules, _ *consens // Blocks are dropped in least recently used order. func New() consensus.BlockChain { bc := &blockChain{ - blocks: make(map[consensus.Hash]*consensus.Block), - blockAtHeight: make(map[consensus.View]*consensus.Block), - pendingFetch: make(map[consensus.Hash]context.CancelFunc), + blocks: make(map[consensus.Hash]*consensus.Block), + blockAtHeight: make(map[consensus.View]*consensus.Block), + pendingFetch: make(map[consensus.Hash]context.CancelFunc), + previousCheckPointedView: consensus.GetGenesis().View(), } bc.Store(consensus.GetGenesis()) return bc } +// CreateSnapShot is invoked to indicate the application to create a snapshot for the current +// committed blocks. This is a sample implementation and a placeholder. +func (chain *blockChain) CreateSnapShot() { + currentCommittedView := chain.mods.Consensus().CommittedBlock().View() + for h := chain.previousCheckPointedView; h < currentCommittedView; h++ { + block := chain.blockAtHeight[h] + if block == nil { + continue + } + chain.mods.MetricsLogger().Log(hotstuffpb.BlockToProto(chain.blockAtHeight[h])) + } + chain.mods.Logger().Info("Checkpoint completed until View ", currentCommittedView) + chain.previousCheckPointedView = currentCommittedView +} + // Store stores a block in the blockchain func (chain *blockChain) Store(block *consensus.Block) { chain.mut.Lock() diff --git a/commandcache/checkpointcc.go b/commandcache/checkpointcc.go new file mode 100644 index 00000000..0467ef08 --- /dev/null +++ b/commandcache/checkpointcc.go @@ -0,0 +1,78 @@ +package commandcache + +import ( + "context" + + "github.com/relab/hotstuff/consensus" + "github.com/relab/hotstuff/internal/proto/clientpb" +) + +// CheckPointCmdCache piggybacks the command cache implementation to also provide checkpoint service for the replicas. +// Though it is implemented as separate file to the existing commandcache, it is planned to merge them. +type CheckPointCmdCache struct { + mods *consensus.Modules + cmdCache *CmdCache + checkPointRotationIndex int + highestCheckPointViewIndex uint64 +} + +// NewCC initializes the checkpoint command cache. +func NewCC(cc *CmdCache, checkPointIndex int) *CheckPointCmdCache { + return &CheckPointCmdCache{ + cmdCache: cc, + checkPointRotationIndex: checkPointIndex, + } +} + +// InitConsensusModule gives the module access to the other modules. +func (c *CheckPointCmdCache) InitConsensusModule(mods *consensus.Modules, _ *consensus.OptionsBuilder) { + c.mods = mods +} + +// AddCommand adds the command to the cache +func (c *CheckPointCmdCache) AddCommand(cmd *clientpb.Command) { + c.cmdCache.AddCommand(cmd) +} + +// Get returns a batch of commands to propose. +func (c *CheckPointCmdCache) Get(ctx context.Context) (cmd consensus.Command, ok bool) { + view := uint64(c.mods.Synchronizer().View()) + batch, ok := c.cmdCache.getBatch(ctx) + if !ok { + return cmd, ok + } + if view%uint64(c.checkPointRotationIndex) == 0 || + view-uint64(c.checkPointRotationIndex) > c.highestCheckPointViewIndex { + batch.IsCheckPointRequest = true + view = view - (view % uint64(c.checkPointRotationIndex)) + batch.CheckPointViewNumber = uint64(view) + } + return c.cmdCache.marshalBatch(batch) +} + +// Accept returns true if the replica can accept the batch. +func (c *CheckPointCmdCache) Accept(cmd consensus.Command) bool { + batch, ok := c.cmdCache.unmarshalCommand(cmd) + if !ok { + c.mods.Logger().Info("Failed to unmarshal a command to batch") + return false + } + if batch.IsCheckPointRequest && batch.CheckPointViewNumber > c.highestCheckPointViewIndex { + c.mods.BlockChain().CreateSnapShot() + c.highestCheckPointViewIndex = batch.CheckPointViewNumber + } + return c.cmdCache.Accept(cmd) +} + +// Proposed updates the serial numbers such that we will not accept the given batch again. +func (c *CheckPointCmdCache) Proposed(cmd consensus.Command) { + c.cmdCache.Proposed(cmd) +} + +// GetHighestCheckPointedView returns the view in which the checkpoint is completed. +func (c *CheckPointCmdCache) GetHighestCheckPointedView() consensus.View { + return consensus.View(c.highestCheckPointViewIndex) +} + +var _ consensus.Acceptor = (*CheckPointCmdCache)(nil) +var _ consensus.CommandQueue = (*CheckPointCmdCache)(nil) diff --git a/replica/cmdcache.go b/commandcache/commandcache.go similarity index 60% rename from replica/cmdcache.go rename to commandcache/commandcache.go index 6e25b4f8..e84848ab 100644 --- a/replica/cmdcache.go +++ b/commandcache/commandcache.go @@ -1,4 +1,4 @@ -package replica +package commandcache import ( "container/list" @@ -7,13 +7,13 @@ import ( "github.com/relab/hotstuff/consensus" "github.com/relab/hotstuff/internal/proto/clientpb" - "github.com/relab/hotstuff/modules" "google.golang.org/protobuf/proto" ) -type cmdCache struct { +// CmdCache caches the commands to be processed by the protocol. +type CmdCache struct { mut sync.Mutex - mods *modules.Modules + mods *consensus.Modules c chan struct{} batchSize int serialNumbers map[uint32]uint64 // highest proposed serial number per client ID @@ -22,8 +22,9 @@ type cmdCache struct { unmarshaler proto.UnmarshalOptions } -func newCmdCache(batchSize int) *cmdCache { - return &cmdCache{ +// New returns the initialized command cache +func New(batchSize int) *CmdCache { + return &CmdCache{ c: make(chan struct{}), batchSize: batchSize, serialNumbers: make(map[uint32]uint64), @@ -32,12 +33,13 @@ func newCmdCache(batchSize int) *cmdCache { } } -// InitModule gives the module access to the other modules. -func (c *cmdCache) InitModule(mods *modules.Modules) { +// InitConsensusModule gives the module access to the other modules. +func (c *CmdCache) InitConsensusModule(mods *consensus.Modules, _ *consensus.OptionsBuilder) { c.mods = mods } -func (c *cmdCache) addCommand(cmd *clientpb.Command) { +// AddCommand adds the commands to the cache. +func (c *CmdCache) AddCommand(cmd *clientpb.Command) { c.mut.Lock() defer c.mut.Unlock() if serialNo := c.serialNumbers[cmd.GetClientID()]; serialNo >= cmd.GetSequenceNumber() { @@ -54,9 +56,10 @@ func (c *cmdCache) addCommand(cmd *clientpb.Command) { } } -// Get returns a batch of commands to propose. -func (c *cmdCache) Get(ctx context.Context) (cmd consensus.Command, ok bool) { - batch := new(clientpb.Batch) +// getBatch: fetches the batch, available for processing +func (c *CmdCache) getBatch(ctx context.Context) (batch *clientpb.Batch, ok bool) { + + batch = new(clientpb.Batch) c.mut.Lock() awaitBatch: @@ -80,7 +83,7 @@ awaitBatch: } c.cache.Remove(elem) cmd := elem.Value.(*clientpb.Command) - if serialNo := c.serialNumbers[cmd.GetClientID()]; serialNo >= cmd.GetSequenceNumber() { + if serialNo := c.serialNumbers[uint32(cmd.ClientID)]; serialNo >= cmd.SequenceNumber { // command is too old i-- continue @@ -92,51 +95,63 @@ awaitBatch: if len(batch.Commands) == 0 { goto awaitBatch } - defer c.mut.Unlock() + return batch, true +} + +// Get returns a batch of commands to propose. +func (c *CmdCache) Get(ctx context.Context) (cmd consensus.Command, ok bool) { + batch, _ := c.getBatch(ctx) + return c.marshalBatch(batch) +} +// marshalBatch: Internal method used to marshal a batch of commands to a single command string. +func (c *CmdCache) marshalBatch(batch *clientpb.Batch) (cmd consensus.Command, ok bool) { // otherwise, we should have at least one command b, err := c.marshaler.Marshal(batch) if err != nil { c.mods.Logger().Errorf("Failed to marshal batch: %v", err) return "", false } - cmd = consensus.Command(b) return cmd, true } -// Accept returns true if the replica can accept the batch. -func (c *cmdCache) Accept(cmd consensus.Command) bool { - batch := new(clientpb.Batch) +// unmarshalCommand: Internal method used to unmarshal a string of command to the underlying batch. +func (c *CmdCache) unmarshalCommand(cmd consensus.Command) (batch *clientpb.Batch, ok bool) { + // otherwise, we should have at least one command + batch = new(clientpb.Batch) err := c.unmarshaler.Unmarshal([]byte(cmd), batch) if err != nil { c.mods.Logger().Errorf("Failed to unmarshal batch: %v", err) - return false + return batch, false } + return batch, true +} +// Accept returns true if the replica can accept the batch. +func (c *CmdCache) Accept(cmd consensus.Command) bool { + batch, ok := c.unmarshalCommand(cmd) + if !ok { + return false + } c.mut.Lock() defer c.mut.Unlock() - for _, cmd := range batch.GetCommands() { if serialNo := c.serialNumbers[cmd.GetClientID()]; serialNo >= cmd.GetSequenceNumber() { // command is too old, can't accept return false } } - return true } // Proposed updates the serial numbers such that we will not accept the given batch again. -func (c *cmdCache) Proposed(cmd consensus.Command) { - batch := new(clientpb.Batch) - err := c.unmarshaler.Unmarshal([]byte(cmd), batch) - if err != nil { - c.mods.Logger().Errorf("Failed to unmarshal batch: %v", err) +func (c *CmdCache) Proposed(cmd consensus.Command) { + batch, ok := c.unmarshalCommand(cmd) + if !ok { return } - c.mut.Lock() defer c.mut.Unlock() @@ -147,4 +162,10 @@ func (c *cmdCache) Proposed(cmd consensus.Command) { } } -var _ consensus.Acceptor = (*cmdCache)(nil) +// GetHighestCheckPointedView returns the View ID in which the checkpoint completed. +func (c *CmdCache) GetHighestCheckPointedView() consensus.View { + return consensus.GetGenesis().View() +} + +var _ consensus.Acceptor = (*CmdCache)(nil) +var _ consensus.CommandQueue = (*CmdCache)(nil) diff --git a/consensus/consensus.go b/consensus/consensus.go index 62206d62..44636328 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -169,7 +169,6 @@ func (cs *consensusBase) OnPropose(proposal ProposeMsg) { cs.mods.Logger().Info("OnPropose: command not accepted") return } - // block is safe and was accepted cs.mods.BlockChain().Store(block) diff --git a/consensus/modules.go b/consensus/modules.go index e776c422..230b6e5f 100644 --- a/consensus/modules.go +++ b/consensus/modules.go @@ -4,6 +4,7 @@ import ( "context" "github.com/relab/hotstuff" + "github.com/relab/hotstuff/internal/proto/clientpb" "github.com/relab/hotstuff/modules" ) @@ -199,6 +200,8 @@ type CommandQueue interface { // It may run until the context is cancelled. // If no command is available, the 'ok' return value should be false. Get(ctx context.Context) (cmd Command, ok bool) + + AddCommand(*clientpb.Command) } //go:generate mockgen -destination=../internal/mocks/acceptor_mock.go -package=mocks . Acceptor @@ -210,6 +213,9 @@ type Acceptor interface { // Proposed tells the acceptor that the propose phase for the given command succeeded, and it should no longer be // accepted in the future. Proposed(Command) + // Should we create a new interface for this? + // GetHighestCheckPointedView returns the view in which the checkpoint is performed + GetHighestCheckPointedView() View } //go:generate mockgen -destination=../internal/mocks/executor_mock.go -package=mocks . Executor @@ -314,6 +320,9 @@ type BlockChain interface { // Prunes blocks from the in-memory tree up to the specified height. // Returns a set of forked blocks (blocks that were on a different branch, and thus not committed). PruneToHeight(height View) (forkedBlocks []*Block) + + // CreateSnapShot invokes the application to create the snapshot of the application + CreateSnapShot() } //go:generate mockgen -destination=../internal/mocks/replica_mock.go -package=mocks . Replica diff --git a/internal/orchestration/worker.go b/internal/orchestration/worker.go index 68a4654e..4603ec0b 100644 --- a/internal/orchestration/worker.go +++ b/internal/orchestration/worker.go @@ -15,6 +15,7 @@ import ( "github.com/relab/hotstuff/backend" "github.com/relab/hotstuff/blockchain" "github.com/relab/hotstuff/client" + "github.com/relab/hotstuff/commandcache" "github.com/relab/hotstuff/consensus" "github.com/relab/hotstuff/consensus/byzantine" "github.com/relab/hotstuff/crypto" @@ -192,6 +193,8 @@ func (w *Worker) createReplica(opts *orchestrationpb.ReplicaOpts) (*replica.Repl float64(opts.GetTimeoutMultiplier()), )) + commandCache := commandcache.New(int(opts.BatchSize)) + //checkpointCC := commandcache.NewCC(commandCache, 1000) builder.Register( consensus.New(consensusRules), crypto.NewCache(cryptoImpl, 100), // TODO: consider making this configurable @@ -199,6 +202,7 @@ func (w *Worker) createReplica(opts *orchestrationpb.ReplicaOpts) (*replica.Repl sync, w.metricsLogger, blockchain.New(), + commandCache, logging.New("hs"+strconv.Itoa(int(opts.GetID()))), ) diff --git a/internal/proto/clientpb/client.pb.go b/internal/proto/clientpb/client.pb.go index 888c2ac8..a2924198 100644 --- a/internal/proto/clientpb/client.pb.go +++ b/internal/proto/clientpb/client.pb.go @@ -93,7 +93,9 @@ type Batch struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Commands []*Command `protobuf:"bytes,1,rep,name=Commands,proto3" json:"Commands,omitempty"` + Commands []*Command `protobuf:"bytes,1,rep,name=Commands,proto3" json:"Commands,omitempty"` + IsCheckPointRequest bool `protobuf:"varint,2,opt,name=IsCheckPointRequest,proto3" json:"IsCheckPointRequest,omitempty"` + CheckPointViewNumber uint64 `protobuf:"varint,3,opt,name=CheckPointViewNumber,proto3" json:"CheckPointViewNumber,omitempty"` } func (x *Batch) Reset() { @@ -135,6 +137,20 @@ func (x *Batch) GetCommands() []*Command { return nil } +func (x *Batch) GetIsCheckPointRequest() bool { + if x != nil { + return x.IsCheckPointRequest + } + return false +} + +func (x *Batch) GetCheckPointViewNumber() uint64 { + if x != nil { + return x.CheckPointViewNumber + } + return 0 +} + var File_internal_proto_clientpb_client_proto protoreflect.FileDescriptor var file_internal_proto_clientpb_client_proto_rawDesc = []byte{ @@ -149,20 +165,26 @@ var file_internal_proto_clientpb_client_proto_rawDesc = []byte{ 0x49, 0x44, 0x12, 0x26, 0x0a, 0x0e, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0e, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, - 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x36, - 0x0a, 0x05, 0x42, 0x61, 0x74, 0x63, 0x68, 0x12, 0x2d, 0x0a, 0x08, 0x43, 0x6f, 0x6d, 0x6d, 0x61, - 0x6e, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x63, 0x6c, 0x69, 0x65, - 0x6e, 0x74, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x08, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x32, 0x4c, 0x0a, 0x06, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, - 0x12, 0x42, 0x0a, 0x0b, 0x45, 0x78, 0x65, 0x63, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, - 0x11, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, - 0x6e, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x08, 0xa0, 0xb5, 0x18, 0x01, - 0xd0, 0xb5, 0x18, 0x01, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x62, 0x2f, 0x68, 0x6f, 0x74, 0x73, 0x74, 0x75, 0x66, - 0x66, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x9c, + 0x01, 0x0a, 0x05, 0x42, 0x61, 0x74, 0x63, 0x68, 0x12, 0x2d, 0x0a, 0x08, 0x43, 0x6f, 0x6d, 0x6d, + 0x61, 0x6e, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x63, 0x6c, 0x69, + 0x65, 0x6e, 0x74, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x08, 0x43, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x12, 0x30, 0x0a, 0x13, 0x49, 0x73, 0x43, 0x68, 0x65, + 0x63, 0x6b, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x13, 0x49, 0x73, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x50, 0x6f, 0x69, + 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x32, 0x0a, 0x14, 0x43, 0x68, 0x65, + 0x63, 0x6b, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x56, 0x69, 0x65, 0x77, 0x4e, 0x75, 0x6d, 0x62, 0x65, + 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x14, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x50, 0x6f, + 0x69, 0x6e, 0x74, 0x56, 0x69, 0x65, 0x77, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x32, 0x4c, 0x0a, + 0x06, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x12, 0x42, 0x0a, 0x0b, 0x45, 0x78, 0x65, 0x63, 0x43, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x11, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x70, + 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x08, 0xa0, 0xb5, 0x18, 0x01, 0xd0, 0xb5, 0x18, 0x01, 0x42, 0x33, 0x5a, 0x31, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x62, 0x2f, + 0x68, 0x6f, 0x74, 0x73, 0x74, 0x75, 0x66, 0x66, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/proto/clientpb/client.proto b/internal/proto/clientpb/client.proto index 05badc4b..9d089447 100644 --- a/internal/proto/clientpb/client.proto +++ b/internal/proto/clientpb/client.proto @@ -26,4 +26,8 @@ message Command { } // Batch is a list of commands to be executed -message Batch { repeated Command Commands = 1; } +message Batch { + repeated Command Commands = 1; + bool IsCheckPointRequest = 2; + uint64 CheckPointViewNumber = 3; +} diff --git a/internal/proto/hotstuffpb/convert_test.go b/internal/testutil/convert_test.go similarity index 68% rename from internal/proto/hotstuffpb/convert_test.go rename to internal/testutil/convert_test.go index 927ee7bd..af0767e8 100644 --- a/internal/proto/hotstuffpb/convert_test.go +++ b/internal/testutil/convert_test.go @@ -1,4 +1,4 @@ -package hotstuffpb +package testutil import ( "bytes" @@ -8,13 +8,13 @@ import ( "github.com/relab/hotstuff/consensus" "github.com/relab/hotstuff/crypto" "github.com/relab/hotstuff/crypto/bls12" - "github.com/relab/hotstuff/internal/testutil" + "github.com/relab/hotstuff/internal/proto/hotstuffpb" ) func TestConvertPartialCert(t *testing.T) { ctrl := gomock.NewController(t) - builder := testutil.TestModules(t, ctrl, 1, testutil.GenerateECDSAKey(t)) + builder := TestModules(t, ctrl, 1, GenerateECDSAKey(t)) hs := builder.Build() signer := hs.Crypto() @@ -23,8 +23,8 @@ func TestConvertPartialCert(t *testing.T) { t.Fatal(err) } - pb := PartialCertToProto(want) - got := PartialCertFromProto(pb) + pb := hotstuffpb.PartialCertToProto(want) + got := hotstuffpb.PartialCertFromProto(pb) if !bytes.Equal(want.ToBytes(), got.ToBytes()) { t.Error("Certificates don't match.") @@ -34,20 +34,20 @@ func TestConvertPartialCert(t *testing.T) { func TestConvertQuorumCert(t *testing.T) { ctrl := gomock.NewController(t) - builders := testutil.CreateBuilders(t, ctrl, 4) + builders := CreateBuilders(t, ctrl, 4) hl := builders.Build() b1 := consensus.NewBlock(consensus.GetGenesis().Hash(), consensus.NewQuorumCert(nil, 0, consensus.GetGenesis().Hash()), "", 1, 1) - signatures := testutil.CreatePCs(t, b1, hl.Signers()) + signatures := CreatePCs(t, b1, hl.Signers()) want, err := hl[0].Crypto().CreateQuorumCert(b1, signatures) if err != nil { t.Fatal(err) } - pb := QuorumCertToProto(want) - got := QuorumCertFromProto(pb) + pb := hotstuffpb.QuorumCertToProto(want) + got := hotstuffpb.QuorumCertFromProto(pb) if !bytes.Equal(want.ToBytes(), got.ToBytes()) { t.Error("Certificates don't match.") @@ -57,8 +57,8 @@ func TestConvertQuorumCert(t *testing.T) { func TestConvertBlock(t *testing.T) { qc := consensus.NewQuorumCert(nil, 0, consensus.Hash{}) want := consensus.NewBlock(consensus.GetGenesis().Hash(), qc, "", 1, 1) - pb := BlockToProto(want) - got := BlockFromProto(pb) + pb := hotstuffpb.BlockToProto(want) + got := hotstuffpb.BlockFromProto(pb) if want.Hash() != got.Hash() { t.Error("Hashes don't match.") @@ -68,16 +68,16 @@ func TestConvertBlock(t *testing.T) { func TestConvertTimeoutCertBLS12(t *testing.T) { ctrl := gomock.NewController(t) - builders := testutil.CreateBuilders(t, ctrl, 4, testutil.GenerateKeys(t, 4, testutil.GenerateBLS12Key)...) + builders := CreateBuilders(t, ctrl, 4, GenerateKeys(t, 4, GenerateBLS12Key)...) for i := range builders { builders[i].Register(crypto.New(bls12.New())) } hl := builders.Build() - tc1 := testutil.CreateTC(t, 1, hl.Signers()) + tc1 := CreateTC(t, 1, hl.Signers()) - pb := TimeoutCertToProto(tc1) - tc2 := TimeoutCertFromProto(pb) + pb := hotstuffpb.TimeoutCertToProto(tc1) + tc2 := hotstuffpb.TimeoutCertFromProto(pb) if !hl[0].Crypto().VerifyTimeoutCert(tc2) { t.Fatal("Failed to verify timeout cert") diff --git a/replica/clientsrv.go b/replica/clientsrv.go index 4bce35d5..2ee03ee3 100644 --- a/replica/clientsrv.go +++ b/replica/clientsrv.go @@ -18,12 +18,12 @@ import ( // clientSrv serves a client. type clientSrv struct { - mut sync.Mutex - mods *modules.Modules - srv *gorums.Server - awaitingCmds map[cmdID]chan<- error - cmdCache *cmdCache - hash hash.Hash + mut sync.Mutex + mods *modules.Modules + consensusMods *consensus.Modules + srv *gorums.Server + awaitingCmds map[cmdID]chan<- error + hash hash.Hash } // newClientServer returns a new client server. @@ -31,17 +31,19 @@ func newClientServer(conf Config, srvOpts []gorums.ServerOption) (srv *clientSrv srv = &clientSrv{ awaitingCmds: make(map[cmdID]chan<- error), srv: gorums.NewServer(srvOpts...), - cmdCache: newCmdCache(int(conf.BatchSize)), hash: sha256.New(), } clientpb.RegisterClientServer(srv.srv, srv) return srv } +func (srv *clientSrv) SetConsensusModules(consensusMods *consensus.Modules) { + srv.consensusMods = consensusMods +} + // InitModule gives the module access to the other modules. func (srv *clientSrv) InitModule(mods *modules.Modules) { srv.mods = mods - srv.cmdCache.InitModule(mods) } func (srv *clientSrv) Start(addr string) error { @@ -73,8 +75,7 @@ func (srv *clientSrv) ExecCommand(ctx gorums.ServerCtx, cmd *clientpb.Command) ( srv.mut.Lock() srv.awaitingCmds[id] = c srv.mut.Unlock() - - srv.cmdCache.addCommand(cmd) + srv.consensusMods.CommandQueue().AddCommand(cmd) ctx.Release() err := <-c return &emptypb.Empty{}, err diff --git a/replica/replica.go b/replica/replica.go index 5f986157..730b190c 100644 --- a/replica/replica.go +++ b/replica/replica.go @@ -46,11 +46,10 @@ type Config struct { // Replica is a participant in the consensus protocol. type Replica struct { - clientSrv *clientSrv - cfg *backend.Config - hsSrv *backend.Server - hs *consensus.Modules - + clientSrv *clientSrv + cfg *backend.Config + hsSrv *backend.Server + hs *consensus.Modules execHandlers map[cmdID]func(*emptypb.Empty, error) cancel context.CancelFunc done chan struct{} @@ -99,13 +98,13 @@ func New(conf Config, builder consensus.Builder) (replica *Replica) { srv.cfg = backend.NewConfig(creds, managerOpts...) builder.Register( - srv.cfg, // configuration - srv.hsSrv, // event handling - srv.clientSrv, // executor - srv.clientSrv.cmdCache, // acceptor and command queue + srv.cfg, // configuration + srv.hsSrv, // event handling + srv.clientSrv, // executor + //srv.clientSrv.cmdCache, // acceptor and command queue ) srv.hs = builder.Build() - + srv.clientSrv.SetConsensusModules(srv.hs) return srv } diff --git a/twins/scenario.go b/twins/scenario.go index ff465096..af95c4dd 100644 --- a/twins/scenario.go +++ b/twins/scenario.go @@ -9,6 +9,7 @@ import ( "github.com/relab/hotstuff" "github.com/relab/hotstuff/consensus" + "github.com/relab/hotstuff/internal/proto/clientpb" ) // View specifies the leader id an the partition scenario for a single round of consensus. @@ -146,6 +147,10 @@ func (commandModule) Accept(_ consensus.Command) bool { return true } +func (commandModule) GetHighestCheckPointedView() consensus.View { + return consensus.View(0) +} + // Proposed tells the acceptor that the propose phase for the given command succeeded, and it should no longer be // accepted in the future. func (commandModule) Proposed(_ consensus.Command) {} @@ -162,4 +167,8 @@ func (cm commandModule) Exec(block *consensus.Block) { cm.node.executedBlocks = append(cm.node.executedBlocks, block) } -func (commandModule) Fork(block *consensus.Block) {} +func (commandModule) AddCommand(_ *clientpb.Command) {} +func (commandModule) Fork(block *consensus.Block) {} + +var _ consensus.Acceptor = (*commandModule)(nil) +var _ consensus.CommandQueue = (*commandModule)(nil)