Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint #67

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
"sync"

"github.com/relab/hotstuff/consensus"
"github.com/relab/hotstuff/internal/proto/hotstuffpb"
)

// blockChain stores a limited amount of blocks in a map.
// blocks are evicted in LRU order.
type blockChain struct {
mods *consensus.Modules
mut sync.Mutex
pruneHeight consensus.View
blocks map[consensus.Hash]*consensus.Block
blockAtHeight map[consensus.View]*consensus.Block
pendingFetch map[consensus.Hash]context.CancelFunc // allows a pending fetch operation to be cancelled
mods *consensus.Modules
mut sync.Mutex
pruneHeight consensus.View
previousCheckPointedView consensus.View
blocks map[consensus.Hash]*consensus.Block
blockAtHeight map[consensus.View]*consensus.Block
pendingFetch map[consensus.Hash]context.CancelFunc // allows a pending fetch operation to be cancelled
}

// InitConsensusModule gives the module a reference to the Modules object.
Expand All @@ -29,14 +31,30 @@ func (chain *blockChain) InitConsensusModule(mods *consensus.Modules, _ *consens
// Blocks are dropped in least recently used order.
func New() consensus.BlockChain {
bc := &blockChain{
blocks: make(map[consensus.Hash]*consensus.Block),
blockAtHeight: make(map[consensus.View]*consensus.Block),
pendingFetch: make(map[consensus.Hash]context.CancelFunc),
blocks: make(map[consensus.Hash]*consensus.Block),
blockAtHeight: make(map[consensus.View]*consensus.Block),
pendingFetch: make(map[consensus.Hash]context.CancelFunc),
previousCheckPointedView: consensus.GetGenesis().View(),
}
bc.Store(consensus.GetGenesis())
return bc
}

// CreateSnapShot is invoked to indicate the application to create a snapshot for the current
// committed blocks. This is a sample implementation and a placeholder.
func (chain *blockChain) CreateSnapShot() {
currentCommittedView := chain.mods.Consensus().CommittedBlock().View()
for h := chain.previousCheckPointedView; h < currentCommittedView; h++ {
block := chain.blockAtHeight[h]
if block == nil {
continue
}
chain.mods.MetricsLogger().Log(hotstuffpb.BlockToProto(chain.blockAtHeight[h]))
}
chain.mods.Logger().Info("Checkpoint completed until View ", currentCommittedView)
chain.previousCheckPointedView = currentCommittedView
}

// Store stores a block in the blockchain
func (chain *blockChain) Store(block *consensus.Block) {
chain.mut.Lock()
Expand Down
78 changes: 78 additions & 0 deletions commandcache/checkpointcc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package commandcache

import (
"context"

"github.com/relab/hotstuff/consensus"
"github.com/relab/hotstuff/internal/proto/clientpb"
)

// CheckPointCmdCache piggybacks the command cache implementation to also provide checkpoint service for the replicas.
// Though it is implemented as separate file to the existing commandcache, it is planned to merge them.
type CheckPointCmdCache struct {
mods *consensus.Modules
cmdCache *CmdCache
checkPointRotationIndex int
highestCheckPointViewIndex uint64
}

// NewCC initializes the checkpoint command cache.
func NewCC(cc *CmdCache, checkPointIndex int) *CheckPointCmdCache {
return &CheckPointCmdCache{
cmdCache: cc,
checkPointRotationIndex: checkPointIndex,
}
}

// InitConsensusModule gives the module access to the other modules.
func (c *CheckPointCmdCache) InitConsensusModule(mods *consensus.Modules, _ *consensus.OptionsBuilder) {
c.mods = mods
}

// AddCommand adds the command to the cache
func (c *CheckPointCmdCache) AddCommand(cmd *clientpb.Command) {
c.cmdCache.AddCommand(cmd)
}

// Get returns a batch of commands to propose.
func (c *CheckPointCmdCache) Get(ctx context.Context) (cmd consensus.Command, ok bool) {
view := uint64(c.mods.Synchronizer().View())
batch, ok := c.cmdCache.getBatch(ctx)
if !ok {
return cmd, ok
}
if view%uint64(c.checkPointRotationIndex) == 0 ||
view-uint64(c.checkPointRotationIndex) > c.highestCheckPointViewIndex {
batch.IsCheckPointRequest = true
view = view - (view % uint64(c.checkPointRotationIndex))
batch.CheckPointViewNumber = uint64(view)
}
return c.cmdCache.marshalBatch(batch)
}

// Accept returns true if the replica can accept the batch.
func (c *CheckPointCmdCache) Accept(cmd consensus.Command) bool {
batch, ok := c.cmdCache.unmarshalCommand(cmd)
if !ok {
c.mods.Logger().Info("Failed to unmarshal a command to batch")
return false
}
if batch.IsCheckPointRequest && batch.CheckPointViewNumber > c.highestCheckPointViewIndex {
c.mods.BlockChain().CreateSnapShot()
c.highestCheckPointViewIndex = batch.CheckPointViewNumber
}
return c.cmdCache.Accept(cmd)
}

// Proposed updates the serial numbers such that we will not accept the given batch again.
func (c *CheckPointCmdCache) Proposed(cmd consensus.Command) {
c.cmdCache.Proposed(cmd)
}

// GetHighestCheckPointedView returns the view in which the checkpoint is completed.
func (c *CheckPointCmdCache) GetHighestCheckPointedView() consensus.View {
return consensus.View(c.highestCheckPointViewIndex)
}

var _ consensus.Acceptor = (*CheckPointCmdCache)(nil)
var _ consensus.CommandQueue = (*CheckPointCmdCache)(nil)
77 changes: 49 additions & 28 deletions replica/cmdcache.go → commandcache/commandcache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package replica
package commandcache

import (
"container/list"
Expand All @@ -7,13 +7,13 @@ import (

"github.com/relab/hotstuff/consensus"
"github.com/relab/hotstuff/internal/proto/clientpb"
"github.com/relab/hotstuff/modules"
"google.golang.org/protobuf/proto"
)

type cmdCache struct {
// CmdCache caches the commands to be processed by the protocol.
type CmdCache struct {
mut sync.Mutex
mods *modules.Modules
mods *consensus.Modules
c chan struct{}
batchSize int
serialNumbers map[uint32]uint64 // highest proposed serial number per client ID
Expand All @@ -22,8 +22,9 @@ type cmdCache struct {
unmarshaler proto.UnmarshalOptions
}

func newCmdCache(batchSize int) *cmdCache {
return &cmdCache{
// New returns the initialized command cache
func New(batchSize int) *CmdCache {
return &CmdCache{
c: make(chan struct{}),
batchSize: batchSize,
serialNumbers: make(map[uint32]uint64),
Expand All @@ -32,12 +33,13 @@ func newCmdCache(batchSize int) *cmdCache {
}
}

// InitModule gives the module access to the other modules.
func (c *cmdCache) InitModule(mods *modules.Modules) {
// InitConsensusModule gives the module access to the other modules.
func (c *CmdCache) InitConsensusModule(mods *consensus.Modules, _ *consensus.OptionsBuilder) {
c.mods = mods
}

func (c *cmdCache) addCommand(cmd *clientpb.Command) {
// AddCommand adds the commands to the cache.
func (c *CmdCache) AddCommand(cmd *clientpb.Command) {
c.mut.Lock()
defer c.mut.Unlock()
if serialNo := c.serialNumbers[cmd.GetClientID()]; serialNo >= cmd.GetSequenceNumber() {
Expand All @@ -54,9 +56,10 @@ func (c *cmdCache) addCommand(cmd *clientpb.Command) {
}
}

// Get returns a batch of commands to propose.
func (c *cmdCache) Get(ctx context.Context) (cmd consensus.Command, ok bool) {
batch := new(clientpb.Batch)
// getBatch: fetches the batch, available for processing
func (c *CmdCache) getBatch(ctx context.Context) (batch *clientpb.Batch, ok bool) {

batch = new(clientpb.Batch)

c.mut.Lock()
awaitBatch:
Expand All @@ -80,7 +83,7 @@ awaitBatch:
}
c.cache.Remove(elem)
cmd := elem.Value.(*clientpb.Command)
if serialNo := c.serialNumbers[cmd.GetClientID()]; serialNo >= cmd.GetSequenceNumber() {
if serialNo := c.serialNumbers[uint32(cmd.ClientID)]; serialNo >= cmd.SequenceNumber {
// command is too old
i--
continue
Expand All @@ -92,51 +95,63 @@ awaitBatch:
if len(batch.Commands) == 0 {
goto awaitBatch
}

defer c.mut.Unlock()
return batch, true
}

// Get returns a batch of commands to propose.
func (c *CmdCache) Get(ctx context.Context) (cmd consensus.Command, ok bool) {
batch, _ := c.getBatch(ctx)
return c.marshalBatch(batch)
}

// marshalBatch: Internal method used to marshal a batch of commands to a single command string.
func (c *CmdCache) marshalBatch(batch *clientpb.Batch) (cmd consensus.Command, ok bool) {
// otherwise, we should have at least one command
b, err := c.marshaler.Marshal(batch)
if err != nil {
c.mods.Logger().Errorf("Failed to marshal batch: %v", err)
return "", false
}

cmd = consensus.Command(b)
return cmd, true
}

// Accept returns true if the replica can accept the batch.
func (c *cmdCache) Accept(cmd consensus.Command) bool {
batch := new(clientpb.Batch)
// unmarshalCommand: Internal method used to unmarshal a string of command to the underlying batch.
func (c *CmdCache) unmarshalCommand(cmd consensus.Command) (batch *clientpb.Batch, ok bool) {
// otherwise, we should have at least one command
batch = new(clientpb.Batch)
err := c.unmarshaler.Unmarshal([]byte(cmd), batch)
if err != nil {
c.mods.Logger().Errorf("Failed to unmarshal batch: %v", err)
return false
return batch, false
}
return batch, true
}

// Accept returns true if the replica can accept the batch.
func (c *CmdCache) Accept(cmd consensus.Command) bool {
batch, ok := c.unmarshalCommand(cmd)
if !ok {
return false
}
c.mut.Lock()
defer c.mut.Unlock()

for _, cmd := range batch.GetCommands() {
if serialNo := c.serialNumbers[cmd.GetClientID()]; serialNo >= cmd.GetSequenceNumber() {
// command is too old, can't accept
return false
}
}

return true
}

// Proposed updates the serial numbers such that we will not accept the given batch again.
func (c *cmdCache) Proposed(cmd consensus.Command) {
batch := new(clientpb.Batch)
err := c.unmarshaler.Unmarshal([]byte(cmd), batch)
if err != nil {
c.mods.Logger().Errorf("Failed to unmarshal batch: %v", err)
func (c *CmdCache) Proposed(cmd consensus.Command) {
batch, ok := c.unmarshalCommand(cmd)
if !ok {
return
}

c.mut.Lock()
defer c.mut.Unlock()

Expand All @@ -147,4 +162,10 @@ func (c *cmdCache) Proposed(cmd consensus.Command) {
}
}

var _ consensus.Acceptor = (*cmdCache)(nil)
// GetHighestCheckPointedView returns the View ID in which the checkpoint completed.
func (c *CmdCache) GetHighestCheckPointedView() consensus.View {
return consensus.GetGenesis().View()
}

var _ consensus.Acceptor = (*CmdCache)(nil)
var _ consensus.CommandQueue = (*CmdCache)(nil)
1 change: 0 additions & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (cs *consensusBase) OnPropose(proposal ProposeMsg) {
cs.mods.Logger().Info("OnPropose: command not accepted")
return
}

// block is safe and was accepted
cs.mods.BlockChain().Store(block)

Expand Down
9 changes: 9 additions & 0 deletions consensus/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/relab/hotstuff"
"github.com/relab/hotstuff/internal/proto/clientpb"
"github.com/relab/hotstuff/modules"
)

Expand Down Expand Up @@ -199,6 +200,8 @@ type CommandQueue interface {
// It may run until the context is cancelled.
// If no command is available, the 'ok' return value should be false.
Get(ctx context.Context) (cmd Command, ok bool)

AddCommand(*clientpb.Command)
}

//go:generate mockgen -destination=../internal/mocks/acceptor_mock.go -package=mocks . Acceptor
Expand All @@ -210,6 +213,9 @@ type Acceptor interface {
// Proposed tells the acceptor that the propose phase for the given command succeeded, and it should no longer be
// accepted in the future.
Proposed(Command)
// Should we create a new interface for this?
// GetHighestCheckPointedView returns the view in which the checkpoint is performed
GetHighestCheckPointedView() View
}

//go:generate mockgen -destination=../internal/mocks/executor_mock.go -package=mocks . Executor
Expand Down Expand Up @@ -314,6 +320,9 @@ type BlockChain interface {
// Prunes blocks from the in-memory tree up to the specified height.
// Returns a set of forked blocks (blocks that were on a different branch, and thus not committed).
PruneToHeight(height View) (forkedBlocks []*Block)

// CreateSnapShot invokes the application to create the snapshot of the application
CreateSnapShot()
}

//go:generate mockgen -destination=../internal/mocks/replica_mock.go -package=mocks . Replica
Expand Down
4 changes: 4 additions & 0 deletions internal/orchestration/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/relab/hotstuff/backend"
"github.com/relab/hotstuff/blockchain"
"github.com/relab/hotstuff/client"
"github.com/relab/hotstuff/commandcache"
"github.com/relab/hotstuff/consensus"
"github.com/relab/hotstuff/consensus/byzantine"
"github.com/relab/hotstuff/crypto"
Expand Down Expand Up @@ -192,13 +193,16 @@ func (w *Worker) createReplica(opts *orchestrationpb.ReplicaOpts) (*replica.Repl
float64(opts.GetTimeoutMultiplier()),
))

commandCache := commandcache.New(int(opts.BatchSize))
//checkpointCC := commandcache.NewCC(commandCache, 1000)
builder.Register(
consensus.New(consensusRules),
crypto.NewCache(cryptoImpl, 100), // TODO: consider making this configurable
leaderRotation,
sync,
w.metricsLogger,
blockchain.New(),
commandCache,
logging.New("hs"+strconv.Itoa(int(opts.GetID()))),
)

Expand Down
Loading