Skip to content

Commit

Permalink
Merge pull request #1313 from dusk-network/fix-1286
Browse files Browse the repository at this point in the history
Recover contract state on node restart
  • Loading branch information
goshawk-3 authored Mar 3, 2022
2 parents c50bb95 + df6795f commit a8879ac
Show file tree
Hide file tree
Showing 20 changed files with 295 additions and 249 deletions.
4 changes: 3 additions & 1 deletion cmd/dusk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func LaunchChain(ctx context.Context, cl *loop.Consensus, proxy transactions.Pro

// Perform database sanity check to ensure that it is rational before
// bootstrapping all node subsystems
if err := l.PerformSanityCheck(0, 10, 0); err != nil {
if err := l.SanityCheckBlockchain(0, 10); err != nil {
return nil, err
}

Expand Down Expand Up @@ -115,6 +115,8 @@ func Setup() *Server {

proxy, ruskConn := setupGRPCClients(gctx)

log.Info("grpc connection with rusk service established")

m := mempool.NewMempool(db, eventBus, rpcBus, proxy.Prober())
m.Run(parentCtx)

Expand Down
3 changes: 2 additions & 1 deletion harness/engine/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,9 @@ func (n *Network) start(nodeDir string, name string, arg ...string) error {

// CREATE THE RUSK STATE for the local rusk
stateExec := name + "-recovery-state"
cmd := exec.Command(stateExec)
cmd := exec.Command(stateExec, "-w", "-f")
cmd.Env = append(envWithNoRusk, "TMPDIR="+nodeDir, "RUSK_PROFILE_PATH="+nodeDir)

cmd.Start()
cmd.Wait()

Expand Down
9 changes: 2 additions & 7 deletions harness/tests/localnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"encoding/hex"
"flag"
"fmt"
"io/ioutil"
"os"
"strconv"
"testing"
Expand Down Expand Up @@ -44,15 +43,11 @@ var (
//
// The network should be fully functioning and ready to accept messaging.
func TestMain(m *testing.M) {
var err error

flag.Parse()

// create the temp-dir workspace. Quit on error
workspace, err = ioutil.TempDir(os.TempDir(), "localnet-")
if err != nil {
quit(localNet, workspace, err)
}
workspace = os.TempDir() + "/localnet"
os.Mkdir(workspace, 0700)

// set the network size
if localNetSizeStr != "" {
Expand Down
206 changes: 160 additions & 46 deletions pkg/core/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ var ErrBlockAlreadyAccepted = errors.New("discarded block from the past")

// Verifier performs checks on the blockchain and potentially new incoming block.
type Verifier interface {
// PerformSanityCheck on first N blocks and M last blocks.
PerformSanityCheck(startAt uint64, firstBlocksAmount uint64, lastBlockAmount uint64) error
// SanityCheckBlockchain on first N blocks and M last blocks.
SanityCheckBlockchain(startAt uint64, firstBlocksAmount uint64) error
// SanityCheckBlock will verify whether a block is valid according to the rules of the consensus.
SanityCheckBlock(prevBlock block.Block, blk block.Block) error
}

// Loader is an interface which abstracts away the storage used by the Chain to
// store the blockchain.
type Loader interface {
// LoadTip of the chain.
LoadTip() (*block.Block, error)
// LoadTip of the chain. Returns blockchain tip and persisted hash.
LoadTip() (*block.Block, []byte, error)
// Clear removes everything from the DB.
Clear() error
// Close the Loader and finalizes any pending connection.
Expand All @@ -68,8 +68,6 @@ type Loader interface {
Height() (uint64, error)
// BlockAt returns the block at a given height.
BlockAt(uint64) (block.Block, error)
// Append a block on the storage.
Append(*block.Block) error
}

// Chain represents the nodes blockchain.
Expand Down Expand Up @@ -131,20 +129,100 @@ func New(ctx context.Context, db database.DB, eventBus *eventbus.EventBus, rpcBu
return nil, err
}

if srv != nil {
node.RegisterChainServer(srv, chain)
}

chain.p = &provisioners

prevBlock, err := loader.LoadTip()
if err != nil {
if err := chain.syncWithRusk(); err != nil {
return nil, err
}

chain.tip = prevBlock
return chain, nil
}

if srv != nil {
node.RegisterChainServer(srv, chain)
func (c *Chain) syncWithRusk() error {
var (
err error
ruskStateHash []byte
persistedHash []byte
prevBlock *block.Block
)

ruskStateHash, err = c.proxy.Executor().GetStateRoot(c.ctx)
if err != nil {
return err
}

return chain, nil
prevBlock, persistedHash, err = c.loader.LoadTip()
if err != nil {
return err
}

// Detect if both services are on the different state
var persitedBlock *block.Block

err = c.db.View(func(t database.Transaction) error {
persitedBlock, err = t.FetchBlock(persistedHash)
if err != nil {
return err
}

if !bytes.Equal(persitedBlock.Header.StateHash, ruskStateHash) {
log.WithField("rusk", hex.EncodeToString(ruskStateHash)).
WithField("node", hex.EncodeToString(persitedBlock.Header.StateHash)).
Error("invalid state detected")
return errors.New("invalid state detected")
}

return err
})
if err != nil {
return err
}

// Update blockchain tip (in-memory)
c.tip = persitedBlock

// If both persisted block hash and latest blockchain block hash are the
// same then there is no need to execute sync-up.
if bytes.Equal(persistedHash, prevBlock.Header.Hash) {
return nil
}

// re-accept missing block in order to recover Rusk (unpersisted) state.
i := persitedBlock.Header.Height

for {
i++

var blk *block.Block

err = c.db.View(func(t database.Transaction) error {
var hash []byte

hash, err = t.FetchBlockHashByHeight(i)
if err != nil {
return err
}

blk, err = t.FetchBlock(hash)
return err
})

if err != nil {
break
}

// Re-accepting all blocks that have not been persisted in Rusk.
// This will re-execute accept/finalize accordingly and update chain tip.
if err := c.acceptBlock(*blk, false); err != nil {
return err
}
}

return nil
}

// ProcessBlockFromNetwork will handle blocks incoming from the network.
Expand Down Expand Up @@ -200,7 +278,7 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
// from the network during out-of-sync state.
func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error {
log.WithField("height", blk.Header.Height).Trace("accepting sync block")
return c.acceptBlock(blk)
return c.acceptBlock(blk, true)
}

// TryNextConsecutiveBlockInSync is the processing path for accepting a block
Expand Down Expand Up @@ -235,7 +313,7 @@ func (c *Chain) TryNextConsecutiveBlockIsValid(blk block.Block) error {

l := log.WithFields(fields)

return c.isValidBlock(blk, l)
return c.isValidBlock(blk, l, true)
}

// ProcessSyncTimerExpired called by outsync timer when a peer does not provide GetData response.
Expand Down Expand Up @@ -263,7 +341,7 @@ func (c *Chain) ProcessSyncTimerExpired(strPeerAddr string) error {
func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error {
log.WithField("height", blk.Header.Height).Trace("accepting succeeding block")

if err := c.acceptBlock(blk); err != nil {
if err := c.acceptBlock(blk, true); err != nil {
return err
}

Expand Down Expand Up @@ -317,7 +395,9 @@ func (c *Chain) runStateTransition(tipBlk, blk block.Block) (*block.Block, error
blk.Header.Height,
config.BlockGasLimit)
if err != nil {
l.WithError(err).Error("Error in executing the state transition")
l.WithError(err).
WithField("grpc", "finalize").
Error("Error in executing the state transition")
return block.NewBlock(), err
}
default:
Expand All @@ -328,7 +408,10 @@ func (c *Chain) runStateTransition(tipBlk, blk block.Block) (*block.Block, error
blk.Header.Height,
config.BlockGasLimit)
if err != nil {
l.WithError(err).Error("Error in executing the state transition")
l.WithError(err).
WithField("grpc", "accept").
Error("Error in executing the state transition")

return block.NewBlock(), err
}
}
Expand Down Expand Up @@ -390,12 +473,14 @@ func (c *Chain) sanityCheckStateHash() error {
return nil
}

func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry) error {
func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry, withSanityCheck bool) error {
l.Debug("verifying block")
// Check that stateless and stateful checks pass
if err := c.verifier.SanityCheckBlock(*c.tip, blk); err != nil {
l.WithError(err).Error("block verification failed")
return err
if withSanityCheck {
if err := c.verifier.SanityCheckBlock(*c.tip, blk); err != nil {
l.WithError(err).Error("block verification failed")
return err
}
}

// Check the certificate
Expand All @@ -417,7 +502,7 @@ func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry) error {
// 1. We have not seen it before
// 2. All stateless and stateful checks are true
// Returns nil, if checks passed and block was successfully saved.
func (c *Chain) acceptBlock(blk block.Block) error {
func (c *Chain) acceptBlock(blk block.Block, withSanityCheck bool) error {
fields := logger.Fields{
"event": "accept_block",
"height": blk.Header.Height,
Expand All @@ -430,7 +515,7 @@ func (c *Chain) acceptBlock(blk block.Block) error {
var err error

// 1. Ensure block fields and certificate are valid
if err = c.isValidBlock(blk, l); err != nil {
if err = c.isValidBlock(blk, l, withSanityCheck); err != nil {
l.WithError(err).Error("invalid block error")
return err
}
Expand All @@ -443,22 +528,68 @@ func (c *Chain) acceptBlock(blk block.Block) error {
return err
}

// 3. Store the approved block and update in-memory chain tip
l.Debug("storing block")
// 3. Persist the approved block and update in-memory chain tip
l.Debug("persisting block")

if err := c.loader.Append(b); err != nil {
l.WithError(err).Error("block storing failed")
if err := c.persist(b); err != nil {
l.WithError(err).Error("persisting block failed")
return err
}

c.tip = &blk
c.tip = b

// 5. Perform all post-events on accepting a block
c.postAcceptBlock(blk, l)
c.postAcceptBlock(*b, l)

return nil
}

// Persist persists a block in both Contract Storage state and dusk-blockchain db in atomic manner.
func (c *Chain) persist(b *block.Block) error {
var (
clog = log.WithFields(logger.Fields{
"event": "accept_block",
"height": b.Header.Height,
"hash": util.StringifyBytes(b.Header.Hash),
"curr_h": c.tip.Header.Height,
})

err error
pe = config.Get().State.PersistEvery
)

// Atomic persist
err = c.db.Update(func(t database.Transaction) error {
var p bool

if pe > 0 && b.Header.Height%pe == 0 {
// Mark it as a persisted block
p = true
}

// Persist block into dusk-blockchain database before any attempt to persist in Rusk.
// If StoreBlock fails, no change will be applied in Rusk.
// If Rusk.Persist fails, StoreBlock is rollbacked.
if err = t.StoreBlock(b, p); err != nil {
return err
}

// Persist Rusk state
if p {
if err = c.proxy.Executor().Persist(c.ctx, b.Header.StateHash); err != nil {
clog.WithError(err).Error("persisting contract state failed")
return err
}

clog.Debug("persisting contract state completed")
}

return nil
})

return err
}

// postAcceptBlock performs all post-events on accepting a block.
func (c *Chain) postAcceptBlock(blk block.Block, l *logrus.Entry) {
// 1. Notify other subsystems for the accepted block
Expand All @@ -482,23 +613,6 @@ func (c *Chain) postAcceptBlock(blk block.Block, l *logrus.Entry) {
go c.storeStakesInStormDB(blk.Header.Height)
}

// 4. Persist state
pe := config.Get().State.PersistEvery

if pe > 0 {
if blk.Header.Height%pe == 0 {
l.Debug("chain: persist state", blk.Header.Height)

if err := c.proxy.Executor().Persist(c.ctx, c.tip.Header.StateHash); err != nil {
// TODO: trigger resync procedure
// https://github.com/dusk-network/dusk-blockchain/issues/1286
l.WithError(err).Error("chain: persist state failed")
}
}
} else {
l.Warn("State won't persist! Set `state.persistEvery` to a positive value")
}

diagnostics.LogPublishErrors("chain/chain.go, topics.AcceptedBlock", errList)
l.Debug("procedure ended")
}
Expand Down
Loading

0 comments on commit a8879ac

Please sign in to comment.