From 910fbd61affbbe5113bedd47f5b2e19d8755fe99 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Wed, 23 Feb 2022 09:57:05 +0100 Subject: [PATCH 01/10] Increase block time to ~15sec --- pkg/config/consts.go | 2 +- pkg/core/consensus/blockgenerator/candidate/blockgenerator.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/config/consts.go b/pkg/config/consts.go index 6183077a0..2e7b49358 100644 --- a/pkg/config/consts.go +++ b/pkg/config/consts.go @@ -26,7 +26,7 @@ const ( MaxInvBlocks = 500 // Protocol-based consensus step time. - ConsensusTimeOut = 5 * time.Second + ConsensusTimeOut = 20 * time.Second // KadcastInitialHeight sets the default initial height for Kadcast broadcast algorithm. KadcastInitialHeight byte = 128 diff --git a/pkg/core/consensus/blockgenerator/candidate/blockgenerator.go b/pkg/core/consensus/blockgenerator/candidate/blockgenerator.go index a2937aea7..d5e5ad0b4 100644 --- a/pkg/core/consensus/blockgenerator/candidate/blockgenerator.go +++ b/pkg/core/consensus/blockgenerator/candidate/blockgenerator.go @@ -139,7 +139,7 @@ func (bg *generator) execute(ctx context.Context, txs []transactions.ContractCal // fetchOrTimeout will keep trying to FetchMempoolTxs() until either // we get some txs or or the timeout expires. func (bg *generator) fetchOrTimeout(keys [][]byte) ([]transactions.ContractCall, error) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() tick := time.NewTicker(500 * time.Millisecond) From 1ebe536b072716e39dc48310e4eb4c347a8d70d3 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Wed, 2 Mar 2022 12:40:47 +0100 Subject: [PATCH 02/10] Update testnet state root --- pkg/config/genesis/generation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/genesis/generation.go b/pkg/config/genesis/generation.go index 017332c34..bce0fb3d9 100644 --- a/pkg/config/genesis/generation.go +++ b/pkg/config/genesis/generation.go @@ -16,7 +16,7 @@ import ( ) // DEFAULT_STATE_ROOT is the state root result of "rusk make state". -const DEFAULT_STATE_ROOT string = "cae88d982bf63e0bb6c18ab1c43c643d7895d6b212f0d4e26414808673a1ccea" +const DEFAULT_STATE_ROOT string = "2a9fed740fb9978c5c39d0a45ad34543035514eb62f8351fdcb694c29687e3dd" // Generate a genesis block. The constitution of the block depends on the passed // config. From 6638cecae8359e1ede49209bf8880faba5f6bba4 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 28 Feb 2022 13:43:53 +0200 Subject: [PATCH 03/10] Enforce state rebuild --- harness/engine/network.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/harness/engine/network.go b/harness/engine/network.go index 422dcfeb9..a8b490ab7 100644 --- a/harness/engine/network.go +++ b/harness/engine/network.go @@ -356,7 +356,7 @@ func (n *Network) start(nodeDir string, name string, arg ...string) error { // CREATE THE RUSK STATE for the local rusk stateExec := name + "-recovery-state" - cmd := exec.Command(stateExec) + cmd := exec.Command(stateExec, "-w", "-f") cmd.Env = append(envWithNoRusk, "TMPDIR="+nodeDir, "RUSK_PROFILE_PATH="+nodeDir) cmd.Start() cmd.Wait() From 1449c3e516d10b4c64f25bce0bd4bb25abd291bc Mon Sep 17 00:00:00 2001 From: goshawk Date: Wed, 23 Feb 2022 08:42:34 +0200 Subject: [PATCH 04/10] Implement a KV record - Persisted Hash, for both heavy and lite database drivers --- pkg/core/database/heavy/transactions.go | 46 ++++++++++++++++------- pkg/core/database/interfaces.go | 7 ++-- pkg/core/database/lite/database.go | 1 + pkg/core/database/lite/transactions.go | 16 +++++++- pkg/core/database/testing/drivers_test.go | 31 +++++++++++---- pkg/core/database/testing/utils.go | 2 +- 6 files changed, 76 insertions(+), 27 deletions(-) diff --git a/pkg/core/database/heavy/transactions.go b/pkg/core/database/heavy/transactions.go index a3801e730..87e6026eb 100644 --- a/pkg/core/database/heavy/transactions.go +++ b/pkg/core/database/heavy/transactions.go @@ -52,12 +52,14 @@ var ( TxIDPrefix = []byte{0x04} // KeyImagePrefix is the prefix to identify the Key Image. KeyImagePrefix = []byte{0x05} - // StatePrefix is the prefix to identify the State. - StatePrefix = []byte{0x06} - // OutputKeyPrefix is the prefix to identify the Output. + // TipPrefix is the prefix to identify the Blockchain tip hash. + TipPrefix = []byte{0x06} + // OutputKeyPrefix is the prefix to identify the Output. // TODO: Remove this OutputKeyPrefix = []byte{0x07} // CandidatePrefix is the prefix to identify Candidate messages. CandidatePrefix = []byte{0x08} + // + PersistedPrefix = []byte{0x09} ) type transaction struct { @@ -83,7 +85,7 @@ type transaction struct { // // It is assumed that StoreBlock would be called much less times than Fetch* // APIs. Based on that, extra indexing data is put to provide fast-read lookups. -func (t transaction) StoreBlock(b *block.Block) error { +func (t transaction) StoreBlock(b *block.Block, persisted bool) error { if t.batch == nil { // t.batch is initialized only on a open, read-write transaction // (built with transaction.Update()). @@ -161,18 +163,23 @@ func (t transaction) StoreBlock(b *block.Block) error { } key = append(HeightPrefix, heightBuf.Bytes()...) - value = b.Header.Hash - t.put(key, value) + t.put(key, b.Header.Hash) - // Key = StatePrefix + // Key = TipPrefix // Value = Hash(chain tip) // // To support fetching blockchain tip - key = StatePrefix - value = b.Header.Hash + t.put(TipPrefix, b.Header.Hash) - t.put(key, value) + if persisted { + // Key = PersistedPrefix + // Value = Hash(chain tip) + // + // To support fetching blockchain persisted hash + + t.put(PersistedPrefix, b.Header.Hash) + } return nil } @@ -440,10 +447,18 @@ func (t transaction) FetchBlock(hash []byte) (*block.Block, error) { } func (t transaction) FetchState() (*database.State, error) { - key := StatePrefix + tipHash, err := t.snapshot.Get(TipPrefix, nil) + if err == leveldb.ErrNotFound || len(tipHash) == 0 { + // overwrite error message + err = database.ErrStateNotFound + } - value, err := t.snapshot.Get(key, nil) - if err == leveldb.ErrNotFound || len(value) == 0 { + if err != nil { + return nil, err + } + + persistedHash, err := t.snapshot.Get(PersistedPrefix, nil) + if err == leveldb.ErrNotFound || len(persistedHash) == 0 { // overwrite error message err = database.ErrStateNotFound } @@ -452,7 +467,10 @@ func (t transaction) FetchState() (*database.State, error) { return nil, err } - return &database.State{TipHash: value}, nil + return &database.State{ + TipHash: tipHash, + PersistedHash: persistedHash, + }, nil } func (t transaction) FetchCurrentHeight() (uint64, error) { diff --git a/pkg/core/database/interfaces.go b/pkg/core/database/interfaces.go index 52f1a5b65..9545bef28 100644 --- a/pkg/core/database/interfaces.go +++ b/pkg/core/database/interfaces.go @@ -66,7 +66,7 @@ type Transaction interface { FetchBlockTxByHash(txID []byte) (tx transactions.ContractCall, txIndex uint32, blockHeaderHash []byte, err error) FetchBlockHashByHeight(height uint64) ([]byte, error) FetchBlockExists(hash []byte) (bool, error) - // Fetch chain state information (chain tip hash). + // Fetch chain state information (chain tip hash, persisted block etc). FetchState() (*State, error) // Check if an input keyImage is already stored. If succeeds, it returns @@ -77,7 +77,7 @@ type Transaction interface { // Store the next chain block in a append-only manner // Overwrites only if block with same hash already stored // Not to be called concurrently, as it updates chain tip. - StoreBlock(block *block.Block) error + StoreBlock(block *block.Block, persisted bool) error // FetchBlock will return a block, given a hash. FetchBlock(hash []byte) (*block.Block, error) @@ -134,5 +134,6 @@ type DB interface { // State represents a single db entry that provides chain metadata. This // includes currently only chain tip hash but could be extended at later stage. type State struct { - TipHash []byte + TipHash []byte + PersistedHash []byte } diff --git a/pkg/core/database/lite/database.go b/pkg/core/database/lite/database.go index 9a35be533..6027692c1 100755 --- a/pkg/core/database/lite/database.go +++ b/pkg/core/database/lite/database.go @@ -29,6 +29,7 @@ const ( stateInd outputKeyInd candidateInd + persistedInd maxInd ) diff --git a/pkg/core/database/lite/transactions.go b/pkg/core/database/lite/transactions.go index 9c15a7d6f..c27ced4cf 100644 --- a/pkg/core/database/lite/transactions.go +++ b/pkg/core/database/lite/transactions.go @@ -30,7 +30,7 @@ type transaction struct { // map lookup operation on block per height, one can utilize a height as index // in a slice. // NB: A single slice of all blocks to be used to avoid all duplications. -func (t *transaction) StoreBlock(b *block.Block) error { +func (t *transaction) StoreBlock(b *block.Block, persisted bool) error { if !t.writable { return errors.New("read-only transaction") } @@ -82,6 +82,10 @@ func (t *transaction) StoreBlock(b *block.Block) error { // Map stateKey to chain state (tip) t.batch[stateInd][toKey(stateKey)] = b.Header.Hash + if persisted { + t.batch[persistedInd][toKey(stateKey)] = b.Header.Hash + } + return nil } @@ -225,6 +229,16 @@ func (t transaction) FetchState() (*database.State, error) { s := &database.State{} s.TipHash = hash + if hash, exists = t.db.storage[persistedInd][toKey(stateKey)]; !exists { + return nil, database.ErrStateNotFound + } + + if len(hash) == 0 { + return nil, database.ErrStateNotFound + } + + s.PersistedHash = hash + return s, nil } diff --git a/pkg/core/database/testing/drivers_test.go b/pkg/core/database/testing/drivers_test.go index 2ba52baed..333355f86 100644 --- a/pkg/core/database/testing/drivers_test.go +++ b/pkg/core/database/testing/drivers_test.go @@ -148,18 +148,28 @@ func _TestDriver(m *testing.M, driverName string) int { func TestStoreBlock(test *testing.T) { // Generate additional blocks to store - genBlocks, err := generateChainBlocks(2) + blocks, err := generateChainBlocks(20) if err != nil { test.Fatal(err.Error()) } + var persistedHash []byte done := false + err = db.Update(func(t database.Transaction) error { - for _, block := range genBlocks { - err = t.StoreBlock(block) + for i, b := range blocks { + + var persisted bool + if i%3 == 0 { + persisted = true + persistedHash = b.Header.Hash + } + + err = t.StoreBlock(b, persisted) if err != nil { return err } + } done = true return nil @@ -180,9 +190,14 @@ func TestStoreBlock(test *testing.T) { return err1 } - if !bytes.Equal(genBlocks[len(genBlocks)-1].Header.Hash, s.TipHash) { + if !bytes.Equal(blocks[len(blocks)-1].Header.Hash, s.TipHash) { return fmt.Errorf("invalid chain tip") } + + if !bytes.Equal(s.PersistedHash, persistedHash) { + return fmt.Errorf("invalid persisted hash") + } + return nil }) @@ -379,7 +394,7 @@ func TestAtomicUpdates(test *testing.T) { forcedError := errors.New("force majeure situation") err := db.Update(func(t database.Transaction) error { for height, block := range genBlocks { - err := t.StoreBlock(block) + err := t.StoreBlock(block, false) if err != nil { return err } @@ -426,7 +441,7 @@ func TestReadOnlyTx(test *testing.T) { // Try to call StoreBlock on a read-only DB Tx err := db.View(func(t database.Transaction) error { for _, block := range blocks { - err := t.StoreBlock(block) + err := t.StoreBlock(block, false) if err != nil { return err } @@ -501,7 +516,7 @@ func TestReadOnlyDB_Mode(test *testing.T) { // Store all blocks with read-write DB err = dbReadWrite.Update(func(t database.Transaction) error { for _, b := range genBlocks { - if e := t.StoreBlock(b); e != nil { + if e := t.StoreBlock(b, false); e != nil { return e } } @@ -532,7 +547,7 @@ func TestReadOnlyDB_Mode(test *testing.T) { // Ensure read-only DB cannot write err = dbReadOnly.Update(func(t database.Transaction) error { for _, block := range blocks { - err1 := t.StoreBlock(block) + err1 := t.StoreBlock(block, false) if err1 != nil { return err1 } diff --git a/pkg/core/database/testing/utils.go b/pkg/core/database/testing/utils.go index e80579e15..d540d8dd6 100644 --- a/pkg/core/database/testing/utils.go +++ b/pkg/core/database/testing/utils.go @@ -30,7 +30,7 @@ func storeBlocks(db database.DB, blocks []*block.Block) error { err := db.Update(func(t database.Transaction) error { for _, block := range blocks { // Store block - err := t.StoreBlock(block) + err := t.StoreBlock(block, false) if err != nil { fmt.Print(err.Error()) return err From 6a741da8f71df7f3fea5a1f50158cbb2c14b02b8 Mon Sep 17 00:00:00 2001 From: goshawk Date: Wed, 23 Feb 2022 10:23:31 +0200 Subject: [PATCH 05/10] - Rename FetchState into FetchRegistry - Update database README - Clean up deprecated APIs --- pkg/core/database/README.md | 5 ++ pkg/core/database/heavy.md | 14 ++--- pkg/core/database/heavy/transactions.go | 75 +++-------------------- pkg/core/database/interfaces.go | 21 ++----- pkg/core/database/lite/transactions.go | 32 +--------- pkg/core/database/testing/drivers_test.go | 2 +- 6 files changed, 25 insertions(+), 124 deletions(-) diff --git a/pkg/core/database/README.md b/pkg/core/database/README.md index 5d6ef2d1b..80b04dfd1 100644 --- a/pkg/core/database/README.md +++ b/pkg/core/database/README.md @@ -80,6 +80,11 @@ _ = db.View(func(tx database.Tx) error { }) ``` +## Blockchain registry +Blockchain database supports (read/write) two additional KV records which store both hash of the blockchain latest block and hash of blockchain latest `persisted` block. A block has a meaning of a persisted block only if rusk.Persist grpc has been called for it successfully. + +`FetchRegistry` method from Transaction interface should be used to fetch the abovementioned KV records. + ## Additional features Additional features that can be provided by a Driver: diff --git a/pkg/core/database/heavy.md b/pkg/core/database/heavy.md index 0e79dfab7..b3b119918 100644 --- a/pkg/core/database/heavy.md +++ b/pkg/core/database/heavy.md @@ -10,16 +10,16 @@ For general concept explanation one can refer to /pkg/core/database/README.md. T | :---: | :---: | :---: | :---: | :---: | | 0x01 | HeaderHash | Header.Encode\(\) | 1 per block | | | 0x02 | HeaderHash + TxID | TxIndex + Tx.Encode\(\) | block txs count | | -| 0x04 | TxID | HeaderHash | block txs count | FetchBlockTxByHash | -| 0x05 | KeyImage | TxID | sum of block txs inputs | FetchKeyImageExists | | 0x03 | Height | HeaderHash | 1 per block | FetchBlockHashByHeight | -| 0x07 | State | Chain tip hash | 1 per chain | FetchState | +| 0x04 | TxID | HeaderHash | block txs count | FetchBlockTxByHash | +| 0x05 | Tip | Hash of latest block | 1 per chain | FetchRegistry | +| 0x06 | Persisted | Hash of latest persisted block | 1 per chain | FetchRegistry | ## K/V storage schema to store a candidate `pkg/core/block.Block` | Prefix | KEY | VALUE | Count | Used by | | :---: | :---: | :---: | :---: | :---: | -| 0x06 | HeaderHash + Height | Block.Encode\(\) | Many per blockchain | Store/Fetch/Delete CandidateBlock | +| 0x07 | HeaderHash | Block.Encode\(\) | Many per blockchain | Store/Fetch/Delete CandidateBlock | Table notation @@ -28,9 +28,3 @@ Table notation * \'+' operation - denotes concatenation of byte arrays * Tx.Encode\(\) - Encoded binary form of all Tx fields without TxID -## K/V storage schema to store block generator bid values - -| Prefix | KEY | VALUE | Count | Used by | -| :---: | :---: | :---: | :---: | :---: | -| 0x08 | ExpiryHeight | D + K | 1 per bidding transaction made by user | FetchBidValues | - diff --git a/pkg/core/database/heavy/transactions.go b/pkg/core/database/heavy/transactions.go index 87e6026eb..d447086e8 100644 --- a/pkg/core/database/heavy/transactions.go +++ b/pkg/core/database/heavy/transactions.go @@ -8,7 +8,6 @@ package heavy import ( "bytes" - "encoding/binary" "errors" "fmt" "math" @@ -50,16 +49,12 @@ var ( HeightPrefix = []byte{0x03} // TxIDPrefix is the prefix to identify the Transaction ID. TxIDPrefix = []byte{0x04} - // KeyImagePrefix is the prefix to identify the Key Image. - KeyImagePrefix = []byte{0x05} - // TipPrefix is the prefix to identify the Blockchain tip hash. - TipPrefix = []byte{0x06} - // OutputKeyPrefix is the prefix to identify the Output. // TODO: Remove this - OutputKeyPrefix = []byte{0x07} + // TipPrefix is the prefix to identify the hash of the latest blochchain block. + TipPrefix = []byte{0x05} + // PersistedPrefix is the prefix to identify the hash of the latest blochchain persisted block. + PersistedPrefix = []byte{0x06} // CandidatePrefix is the prefix to identify Candidate messages. - CandidatePrefix = []byte{0x08} - // - PersistedPrefix = []byte{0x09} + CandidatePrefix = []byte{0x07} ) type transaction struct { @@ -229,39 +224,6 @@ func (t transaction) FetchBlockExists(hash []byte) (bool, error) { return exists, err } -// FetchOutputExists checks if an output exists in the db. -func (t transaction) FetchOutputExists(destkey []byte) (bool, error) { - key := append(OutputKeyPrefix, destkey...) - exists, err := t.snapshot.Has(key, nil) - - // goleveldb returns nilIfNotFound - // see also nilIfNotFound in leveldb/db.go - if !exists && err == nil { - // overwrite error message - err = database.ErrOutputNotFound - } - - return exists, err -} - -// FetchOutputUnlockHeight returns the unlockheight of an output. -func (t transaction) FetchOutputUnlockHeight(destkey []byte) (uint64, error) { - key := append(OutputKeyPrefix, destkey...) - - unlockHeightBytes, err := t.snapshot.Get(key, nil) - if err != nil { - return 0, err - } - - if len(unlockHeightBytes) != 8 { - return 0, errors.New("unlock height malformed") - } - - // output unlock height is the first 8 bytes - unlockHeight := binary.LittleEndian.Uint64(unlockHeightBytes[0:8]) - return unlockHeight, err -} - func (t transaction) FetchBlockHeader(hash []byte) (*block.Header, error) { key := append(HeaderPrefix, hash...) @@ -408,27 +370,6 @@ func (t transaction) FetchBlockTxByHash(txID []byte) (transactions.ContractCall, return nil, txIndex, nil, errors.New("block tx is available but fetching it fails") } -// FetchKeyImageExists checks if the KeyImage exists. If so, it also returns the -// hash of its corresponding tx. -// -// Due to performance concerns, the found tx is not verified. By explicitly -// calling FetchBlockTxByHash, a consumer can check if the tx is real. -func (t transaction) FetchKeyImageExists(keyImage []byte) (bool, []byte, error) { - key := append(KeyImagePrefix, keyImage...) - - txID, err := t.snapshot.Get(key, nil) - if err != nil { - if err == leveldb.ErrNotFound { - // overwrite error message - err = database.ErrKeyImageNotFound - } - - return false, nil, err - } - - return true, txID, nil -} - func (t transaction) FetchBlock(hash []byte) (*block.Block, error) { header, err := t.FetchBlockHeader(hash) if err != nil { @@ -446,7 +387,7 @@ func (t transaction) FetchBlock(hash []byte) (*block.Block, error) { }, nil } -func (t transaction) FetchState() (*database.State, error) { +func (t transaction) FetchRegistry() (*database.Registry, error) { tipHash, err := t.snapshot.Get(TipPrefix, nil) if err == leveldb.ErrNotFound || len(tipHash) == 0 { // overwrite error message @@ -467,14 +408,14 @@ func (t transaction) FetchState() (*database.State, error) { return nil, err } - return &database.State{ + return &database.Registry{ TipHash: tipHash, PersistedHash: persistedHash, }, nil } func (t transaction) FetchCurrentHeight() (uint64, error) { - state, err := t.FetchState() + state, err := t.FetchRegistry() if err != nil { return 0, err } diff --git a/pkg/core/database/interfaces.go b/pkg/core/database/interfaces.go index 9545bef28..4079b1cb8 100644 --- a/pkg/core/database/interfaces.go +++ b/pkg/core/database/interfaces.go @@ -66,12 +66,8 @@ type Transaction interface { FetchBlockTxByHash(txID []byte) (tx transactions.ContractCall, txIndex uint32, blockHeaderHash []byte, err error) FetchBlockHashByHeight(height uint64) ([]byte, error) FetchBlockExists(hash []byte) (bool, error) - // Fetch chain state information (chain tip hash, persisted block etc). - FetchState() (*State, error) - - // Check if an input keyImage is already stored. If succeeds, it returns - // also txID the input belongs to. - FetchKeyImageExists(keyImage []byte) (exists bool, txID []byte, err error) + // Fetch chain registry (chain tip hash, persisted block etc). + FetchRegistry() (*Registry, error) // Read-write transactions // Store the next chain block in a append-only manner @@ -86,14 +82,6 @@ type Transaction interface { // block in the database. FetchCurrentHeight() (uint64, error) - // FetchOutputExists returns whether or not an output exists for the - // given destination public key. - FetchOutputExists(destkey []byte) (bool, error) - - // FetchOutputUnlockHeight will return the unlock height for an output - // given a destination public key. - FetchOutputUnlockHeight(destkey []byte) (uint64, error) - // FetchBlockHeightSince try to find height of a block generated around // sinceUnixTime starting the search from height (tip - offset). FetchBlockHeightSince(sinceUnixTime int64, offset uint64) (uint64, error) @@ -131,9 +119,8 @@ type DB interface { Close() error } -// State represents a single db entry that provides chain metadata. This -// includes currently only chain tip hash but could be extended at later stage. -type State struct { +// Registry represents a set database records that provide chain metadata. +type Registry struct { TipHash []byte PersistedHash []byte } diff --git a/pkg/core/database/lite/transactions.go b/pkg/core/database/lite/transactions.go index c27ced4cf..e7f73016a 100644 --- a/pkg/core/database/lite/transactions.go +++ b/pkg/core/database/lite/transactions.go @@ -8,7 +8,6 @@ package lite import ( "bytes" - "encoding/binary" "errors" "fmt" "math" @@ -189,32 +188,7 @@ func (t transaction) FetchBlockTxByHash(txID []byte) (transactions.ContractCall, return tx, txIndex, hash, err } -func (t transaction) FetchKeyImageExists(keyImage []byte) (bool, []byte, error) { - var txID []byte - var exists bool - - if txID, exists = t.db.storage[keyImagesInd][toKey(keyImage)]; !exists { - return false, nil, database.ErrKeyImageNotFound - } - - return true, txID, nil -} - -func (t transaction) FetchOutputExists(destkey []byte) (bool, error) { - _, exists := t.db.storage[outputKeyInd][toKey(destkey)] - return exists, nil -} - -func (t transaction) FetchOutputUnlockHeight(destkey []byte) (uint64, error) { - unlockHeight, exists := t.db.storage[outputKeyInd][toKey(destkey)] - if !exists { - return 0, errors.New("this output does not exist") - } - - return binary.LittleEndian.Uint64(unlockHeight), nil -} - -func (t transaction) FetchState() (*database.State, error) { +func (t transaction) FetchRegistry() (*database.Registry, error) { var hash []byte var exists bool @@ -226,7 +200,7 @@ func (t transaction) FetchState() (*database.State, error) { return nil, database.ErrStateNotFound } - s := &database.State{} + s := &database.Registry{} s.TipHash = hash if hash, exists = t.db.storage[persistedInd][toKey(stateKey)]; !exists { @@ -274,7 +248,7 @@ func (t *transaction) FetchBlock(hash []byte) (*block.Block, error) { } func (t *transaction) FetchCurrentHeight() (uint64, error) { - state, err := t.FetchState() + state, err := t.FetchRegistry() if err != nil { return 0, err } diff --git a/pkg/core/database/testing/drivers_test.go b/pkg/core/database/testing/drivers_test.go index 333355f86..75d5bc2a6 100644 --- a/pkg/core/database/testing/drivers_test.go +++ b/pkg/core/database/testing/drivers_test.go @@ -185,7 +185,7 @@ func TestStoreBlock(test *testing.T) { // Ensure chain tip is updated too err = db.View(func(t database.Transaction) error { - s, err1 := t.FetchState() + s, err1 := t.FetchRegistry() if err1 != nil { return err1 } From c22d34648a55fdf39de233b55bbae35acc0da7dd Mon Sep 17 00:00:00 2001 From: goshawk Date: Wed, 23 Feb 2022 11:50:18 +0200 Subject: [PATCH 06/10] - Persist a block in both contract storage state and dusk-blockchain in ACID-compliant manner - database.StoreBlock is populated with persisted flag. --- pkg/core/chain/chain.go | 96 ++++++++++++------- pkg/core/chain/chain_test.go | 4 +- pkg/core/chain/loader.go | 23 ++--- pkg/core/chain/mock.go | 4 +- pkg/core/chain/synchronizer_test.go | 2 +- pkg/core/consensus/testing/node.go | 2 +- pkg/gql/query/query_test.go | 2 +- .../peer/responding/blockhashbroker_test.go | 2 +- 8 files changed, 82 insertions(+), 53 deletions(-) diff --git a/pkg/core/chain/chain.go b/pkg/core/chain/chain.go index f6f6eadc2..32539f22f 100644 --- a/pkg/core/chain/chain.go +++ b/pkg/core/chain/chain.go @@ -59,7 +59,7 @@ type Verifier interface { // store the blockchain. type Loader interface { // LoadTip of the chain. - LoadTip() (*block.Block, error) + LoadTip() (*block.Block, []byte, error) // Clear removes everything from the DB. Clear() error // Close the Loader and finalizes any pending connection. @@ -68,8 +68,6 @@ type Loader interface { Height() (uint64, error) // BlockAt returns the block at a given height. BlockAt(uint64) (block.Block, error) - // Append a block on the storage. - Append(*block.Block) error } // Chain represents the nodes blockchain. @@ -131,22 +129,24 @@ func New(ctx context.Context, db database.DB, eventBus *eventbus.EventBus, rpcBu return nil, err } - chain.p = &provisioners - - prevBlock, err := loader.LoadTip() - if err != nil { - return nil, err + if srv != nil { + node.RegisterChainServer(srv, chain) } - chain.tip = prevBlock + chain.p = &provisioners - if srv != nil { - node.RegisterChainServer(srv, chain) + if err := chain.syncWithRusk(); err != nil { + return nil, err } return chain, nil } +func (c *Chain) syncWithRusk() error { + // TODO: + return nil +} + // ProcessBlockFromNetwork will handle blocks incoming from the network. // It will allow the chain to enter sync mode if it detects that we are behind, // which will cancel the running consensus loop and attempt to reach the new @@ -443,11 +443,11 @@ func (c *Chain) acceptBlock(blk block.Block) error { return err } - // 3. Store the approved block and update in-memory chain tip - l.Debug("storing block") + // 3. Persist the approved block and update in-memory chain tip + l.Debug("persisting block") - if err := c.loader.Append(b); err != nil { - l.WithError(err).Error("block storing failed") + if err := c.persist(b); err != nil { + l.WithError(err).Error("persisting block failed") return err } @@ -459,6 +459,55 @@ func (c *Chain) acceptBlock(blk block.Block) error { return nil } +// Persist persists a block in both Contract Storage state and dusk-blockchain in ACID-compliant manner. +func (c *Chain) persist(b *block.Block) error { + var ( + fields = logger.Fields{ + "event": "accept_block", + "height": b.Header.Height, + "hash": util.StringifyBytes(b.Header.Hash), + "curr_h": c.tip.Header.Height, + } + log = log.WithFields(fields) + + err error + pe = config.Get().State.PersistEvery + ) + + // Persisting + + err = c.db.Update(func(t database.Transaction) error { + var persisted bool + + if pe > 0 && b.Header.Height%pe == 0 { + // Mark it as a persisted block + persisted = true + } + + // Persist block into dusk-blockchain database before any attempt to persist in Rusk. + // If StoreBlock fails, no change will be applied in Rusk. + // If Rusk.Persist fails, StoreBlock is rollbacked. + err := t.StoreBlock(b, persisted) + if err != nil { + return err + } + + // Persist Rusk state + if persisted { + if err := c.proxy.Executor().Persist(c.ctx, b.Header.StateHash); err != nil { + log.WithError(err).Error("persisting contract state failed") + return err + } + + log.Debug("persisting contract state completed") + } + + return nil + }) + + return err +} + // postAcceptBlock performs all post-events on accepting a block. func (c *Chain) postAcceptBlock(blk block.Block, l *logrus.Entry) { // 1. Notify other subsystems for the accepted block @@ -482,23 +531,6 @@ func (c *Chain) postAcceptBlock(blk block.Block, l *logrus.Entry) { go c.storeStakesInStormDB(blk.Header.Height) } - // 4. Persist state - pe := config.Get().State.PersistEvery - - if pe > 0 { - if blk.Header.Height%pe == 0 { - l.Debug("chain: persist state", blk.Header.Height) - - if err := c.proxy.Executor().Persist(c.ctx, c.tip.Header.StateHash); err != nil { - // TODO: trigger resync procedure - // https://github.com/dusk-network/dusk-blockchain/issues/1286 - l.WithError(err).Error("chain: persist state failed") - } - } - } else { - l.Warn("State won't persist! Set `state.persistEvery` to a positive value") - } - diagnostics.LogPublishErrors("chain/chain.go, topics.AcceptedBlock", errList) l.Debug("procedure ended") } diff --git a/pkg/core/chain/chain_test.go b/pkg/core/chain/chain_test.go index 6810c8348..bf2b23bdc 100644 --- a/pkg/core/chain/chain_test.go +++ b/pkg/core/chain/chain_test.go @@ -177,11 +177,11 @@ func TestFetchTip(t *testing.T) { _, chain := setupChainTest(t, 0) // on a modern chain, state(tip) must point at genesis - var s *database.State + var s *database.Registry err := chain.db.View(func(t database.Transaction) error { var err error - s, err = t.FetchState() + s, err = t.FetchRegistry() return err }) assert.NoError(err) diff --git a/pkg/core/chain/loader.go b/pkg/core/chain/loader.go index eb553b521..356907f60 100644 --- a/pkg/core/chain/loader.go +++ b/pkg/core/chain/loader.go @@ -78,13 +78,6 @@ func (l *DBLoader) Height() (uint64, error) { return height, err } -// Append stores a block in the DB. -func (l *DBLoader) Append(blk *block.Block) error { - return l.db.Update(func(t database.Transaction) error { - return t.StoreBlock(blk) - }) -} - // BlockAt returns the block stored at a given height. func (l *DBLoader) BlockAt(searchingHeight uint64) (block.Block, error) { var blk *block.Block @@ -188,22 +181,24 @@ func (l *DBLoader) PerformSanityCheck(startAt, firstBlocksAmount, lastBlocksAmou } // LoadTip returns the tip of the chain. -func (l *DBLoader) LoadTip() (*block.Block, error) { +func (l *DBLoader) LoadTip() (*block.Block, []byte, error) { var tip *block.Block + var persistedHash []byte err := l.db.Update(func(t database.Transaction) error { - s, err := t.FetchState() + s, err := t.FetchRegistry() if err != nil { // TODO: maybe log the error here and diversify between empty // results and actual errors // Store Genesis Block, if a modern node runs - err = t.StoreBlock(l.genesis) + err = t.StoreBlock(l.genesis, true) if err != nil { return err } tip = l.genesis + persistedHash = l.genesis.Header.Hash return nil } @@ -219,10 +214,12 @@ func (l *DBLoader) LoadTip() (*block.Block, error) { } tip = &block.Block{Header: h, Txs: txs} + persistedHash = s.PersistedHash + return nil }) if err != nil { - return nil, err + return nil, nil, err } // Verify chain state. There shouldn't be any blocks higher than chainTip @@ -244,8 +241,8 @@ func (l *DBLoader) LoadTip() (*block.Block, error) { }) if err != nil { - return nil, err + return nil, nil, err } - return tip, nil + return tip, persistedHash, nil } diff --git a/pkg/core/chain/mock.go b/pkg/core/chain/mock.go index 0cecb5912..a2c2e202e 100644 --- a/pkg/core/chain/mock.go +++ b/pkg/core/chain/mock.go @@ -40,8 +40,8 @@ func (m *MockLoader) Height() (uint64, error) { } // LoadTip of the chain. -func (m *MockLoader) LoadTip() (*block.Block, error) { - return &m.blockchain[len(m.blockchain)], nil +func (m *MockLoader) LoadTip() (*block.Block, []byte, error) { + return &m.blockchain[len(m.blockchain)], nil, nil } // PerformSanityCheck on first N blocks and M last blocks. diff --git a/pkg/core/chain/synchronizer_test.go b/pkg/core/chain/synchronizer_test.go index 9af912ee1..e5af1a76c 100644 --- a/pkg/core/chain/synchronizer_test.go +++ b/pkg/core/chain/synchronizer_test.go @@ -58,7 +58,7 @@ func setupSynchronizerTest() (*synchronizer, chan consensus.Results) { genesis := genesis.Decode() if err := db.Update(func(t database.Transaction) error { - return t.StoreBlock(genesis) + return t.StoreBlock(genesis, false) }); err != nil { panic(err) } diff --git a/pkg/core/consensus/testing/node.go b/pkg/core/consensus/testing/node.go index 8923e2416..3e156c1ea 100644 --- a/pkg/core/consensus/testing/node.go +++ b/pkg/core/consensus/testing/node.go @@ -36,7 +36,7 @@ func newNode(ctx context.Context, assert *assert.Assertions, eb *eventbus.EventB // the `proxy` either way. genesis := genesis.Decode() l := chain.NewDBLoader(db, genesis) - _, err := l.LoadTip() + _, _, err := l.LoadTip() assert.NoError(err) e := &consensus.Emitter{ diff --git a/pkg/gql/query/query_test.go b/pkg/gql/query/query_test.go index d68d83a84..ff7bbf588 100644 --- a/pkg/gql/query/query_test.go +++ b/pkg/gql/query/query_test.go @@ -147,7 +147,7 @@ func initializeDB(db database.DB) error { return db.Update(func(t database.Transaction) error { for _, block := range chain { - err := t.StoreBlock(block) + err := t.StoreBlock(block, false) if err != nil { return err } diff --git a/pkg/p2p/peer/responding/blockhashbroker_test.go b/pkg/p2p/peer/responding/blockhashbroker_test.go index eb4736090..11f26a18f 100644 --- a/pkg/p2p/peer/responding/blockhashbroker_test.go +++ b/pkg/p2p/peer/responding/blockhashbroker_test.go @@ -77,7 +77,7 @@ func createGetBlocks(locator []byte) message.Message { func storeBlocks(db database.DB, blocks []*block.Block) error { for _, blk := range blocks { err := db.Update(func(t database.Transaction) error { - return t.StoreBlock(blk) + return t.StoreBlock(blk, false) }) if err != nil { return err From f5d154a9b2ca384b2536bd099d6599ce63a95130 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 28 Feb 2022 12:15:40 +0200 Subject: [PATCH 07/10] Sync up with rusk state at startup --- cmd/dusk/server.go | 2 + harness/engine/network.go | 1 + harness/tests/localnet_test.go | 8 +-- pkg/core/chain/chain.go | 113 +++++++++++++++++++++++++++------ pkg/core/chain/chain_test.go | 2 +- pkg/core/chain/fallback.go | 2 +- pkg/gql/query/query_test.go | 2 +- 7 files changed, 100 insertions(+), 30 deletions(-) diff --git a/cmd/dusk/server.go b/cmd/dusk/server.go index f8f92e9f7..376548b55 100644 --- a/cmd/dusk/server.go +++ b/cmd/dusk/server.go @@ -115,6 +115,8 @@ func Setup() *Server { proxy, ruskConn := setupGRPCClients(gctx) + log.Info("grpc connection with rusk service established") + m := mempool.NewMempool(db, eventBus, rpcBus, proxy.Prober()) m.Run(parentCtx) diff --git a/harness/engine/network.go b/harness/engine/network.go index a8b490ab7..fcab0dbde 100644 --- a/harness/engine/network.go +++ b/harness/engine/network.go @@ -358,6 +358,7 @@ func (n *Network) start(nodeDir string, name string, arg ...string) error { stateExec := name + "-recovery-state" cmd := exec.Command(stateExec, "-w", "-f") cmd.Env = append(envWithNoRusk, "TMPDIR="+nodeDir, "RUSK_PROFILE_PATH="+nodeDir) + cmd.Start() cmd.Wait() diff --git a/harness/tests/localnet_test.go b/harness/tests/localnet_test.go index b8e91eb13..4c1a1f259 100644 --- a/harness/tests/localnet_test.go +++ b/harness/tests/localnet_test.go @@ -11,7 +11,6 @@ import ( "encoding/hex" "flag" "fmt" - "io/ioutil" "os" "strconv" "testing" @@ -44,15 +43,12 @@ var ( // // The network should be fully functioning and ready to accept messaging. func TestMain(m *testing.M) { - var err error flag.Parse() // create the temp-dir workspace. Quit on error - workspace, err = ioutil.TempDir(os.TempDir(), "localnet-") - if err != nil { - quit(localNet, workspace, err) - } + workspace = os.TempDir() + "/localnet" + os.Mkdir(workspace, 0700) // set the network size if localNetSizeStr != "" { diff --git a/pkg/core/chain/chain.go b/pkg/core/chain/chain.go index 32539f22f..e7916513e 100644 --- a/pkg/core/chain/chain.go +++ b/pkg/core/chain/chain.go @@ -143,7 +143,72 @@ func New(ctx context.Context, db database.DB, eventBus *eventbus.EventBus, rpcBu } func (c *Chain) syncWithRusk() error { - // TODO: + ruskStateHash, err := c.proxy.Executor().GetStateRoot(c.ctx) + if err != nil { + return err + } + + prevBlock, persistedHash, err := c.loader.LoadTip() + if err != nil { + return err + } + + // Detect if both services are on the different state + var persitedBlock *block.Block + err = c.db.View(func(t database.Transaction) error { + persitedBlock, err = t.FetchBlock(persistedHash) + if err != nil { + return err + } + + if !bytes.Equal(persitedBlock.Header.StateHash, ruskStateHash) { + log.WithField("rusk", hex.EncodeToString(ruskStateHash)). + WithField("node", hex.EncodeToString(persitedBlock.Header.StateHash)). + Error("invalid state detected") + return errors.New("invalid state detected") + } + + return err + }) + if err != nil { + return err + } + + // Update blockchain tip (in-memory) + c.tip = persitedBlock + + // If both persisted block hash and latest blockchain block hash are the + // same then there is no need to execute sync-up. + if bytes.Equal(persistedHash, prevBlock.Header.Hash) { + return nil + } + + // re-accept missing block in order to recover Rusk (unpersisted) state. + i := persitedBlock.Header.Height + for { + i++ + + var blk *block.Block + err = c.db.View(func(t database.Transaction) error { + hash, err := t.FetchBlockHashByHeight(i) + if err != nil { + return err + } + + blk, err = t.FetchBlock(hash) + return err + }) + if err != nil { + break + } + + // Re-accepting all blocks that have not been persisted in Rusk. + // This will re-execute accept/finalize accordingly and update chain tip. + if err := c.acceptBlock(*blk, false); err != nil { + return err + } + } + return nil } @@ -200,7 +265,7 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([] // from the network during out-of-sync state. func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error { log.WithField("height", blk.Header.Height).Trace("accepting sync block") - return c.acceptBlock(blk) + return c.acceptBlock(blk, true) } // TryNextConsecutiveBlockInSync is the processing path for accepting a block @@ -235,7 +300,7 @@ func (c *Chain) TryNextConsecutiveBlockIsValid(blk block.Block) error { l := log.WithFields(fields) - return c.isValidBlock(blk, l) + return c.isValidBlock(blk, l, true) } // ProcessSyncTimerExpired called by outsync timer when a peer does not provide GetData response. @@ -263,7 +328,7 @@ func (c *Chain) ProcessSyncTimerExpired(strPeerAddr string) error { func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error { log.WithField("height", blk.Header.Height).Trace("accepting succeeding block") - if err := c.acceptBlock(blk); err != nil { + if err := c.acceptBlock(blk, true); err != nil { return err } @@ -317,7 +382,9 @@ func (c *Chain) runStateTransition(tipBlk, blk block.Block) (*block.Block, error blk.Header.Height, config.BlockGasLimit) if err != nil { - l.WithError(err).Error("Error in executing the state transition") + l.WithError(err). + WithField("grpc", "finalize"). + Error("Error in executing the state transition") return block.NewBlock(), err } default: @@ -328,7 +395,10 @@ func (c *Chain) runStateTransition(tipBlk, blk block.Block) (*block.Block, error blk.Header.Height, config.BlockGasLimit) if err != nil { - l.WithError(err).Error("Error in executing the state transition") + l.WithError(err). + WithField("grpc", "accept"). + Error("Error in executing the state transition") + return block.NewBlock(), err } } @@ -390,12 +460,14 @@ func (c *Chain) sanityCheckStateHash() error { return nil } -func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry) error { +func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry, withSanityCheck bool) error { l.Debug("verifying block") // Check that stateless and stateful checks pass - if err := c.verifier.SanityCheckBlock(*c.tip, blk); err != nil { - l.WithError(err).Error("block verification failed") - return err + if withSanityCheck { + if err := c.verifier.SanityCheckBlock(*c.tip, blk); err != nil { + l.WithError(err).Error("block verification failed") + return err + } } // Check the certificate @@ -417,7 +489,7 @@ func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry) error { // 1. We have not seen it before // 2. All stateless and stateful checks are true // Returns nil, if checks passed and block was successfully saved. -func (c *Chain) acceptBlock(blk block.Block) error { +func (c *Chain) acceptBlock(blk block.Block, withSanityCheck bool) error { fields := logger.Fields{ "event": "accept_block", "height": blk.Header.Height, @@ -430,7 +502,7 @@ func (c *Chain) acceptBlock(blk block.Block) error { var err error // 1. Ensure block fields and certificate are valid - if err = c.isValidBlock(blk, l); err != nil { + if err = c.isValidBlock(blk, l, withSanityCheck); err != nil { l.WithError(err).Error("invalid block error") return err } @@ -451,15 +523,15 @@ func (c *Chain) acceptBlock(blk block.Block) error { return err } - c.tip = &blk + c.tip = b // 5. Perform all post-events on accepting a block - c.postAcceptBlock(blk, l) + c.postAcceptBlock(*b, l) return nil } -// Persist persists a block in both Contract Storage state and dusk-blockchain in ACID-compliant manner. +// Persist persists a block in both Contract Storage state and dusk-blockchain db in atomic manner. func (c *Chain) persist(b *block.Block) error { var ( fields = logger.Fields{ @@ -474,26 +546,25 @@ func (c *Chain) persist(b *block.Block) error { pe = config.Get().State.PersistEvery ) - // Persisting - + // Atomic persist err = c.db.Update(func(t database.Transaction) error { - var persisted bool + var p bool if pe > 0 && b.Header.Height%pe == 0 { // Mark it as a persisted block - persisted = true + p = true } // Persist block into dusk-blockchain database before any attempt to persist in Rusk. // If StoreBlock fails, no change will be applied in Rusk. // If Rusk.Persist fails, StoreBlock is rollbacked. - err := t.StoreBlock(b, persisted) + err := t.StoreBlock(b, p) if err != nil { return err } // Persist Rusk state - if persisted { + if p { if err := c.proxy.Executor().Persist(c.ctx, b.Header.StateHash); err != nil { log.WithError(err).Error("persisting contract state failed") return err diff --git a/pkg/core/chain/chain_test.go b/pkg/core/chain/chain_test.go index bf2b23bdc..b9b470d90 100644 --- a/pkg/core/chain/chain_test.go +++ b/pkg/core/chain/chain_test.go @@ -152,7 +152,7 @@ func TestAcceptBlock(t *testing.T) { cert.Step = 5 blk.Header.Certificate = cert - assert.NoError(c.acceptBlock(*blk)) + assert.NoError(c.acceptBlock(*blk, true), true) // Should have `blk` as blockchain head now assert.True(bytes.Equal(blk.Header.Hash, c.tip.Header.Hash)) diff --git a/pkg/core/chain/fallback.go b/pkg/core/chain/fallback.go index 9c106bdcb..5efb77b91 100644 --- a/pkg/core/chain/fallback.go +++ b/pkg/core/chain/fallback.go @@ -35,7 +35,7 @@ func (c *Chain) tryFallback(blk block.Block) error { c.tip = &prevBlk // Perform verify and accept block procedure - if err := c.acceptBlock(blk); err != nil { + if err := c.acceptBlock(blk, true); err != nil { c.tip = &oldTip return err } diff --git a/pkg/gql/query/query_test.go b/pkg/gql/query/query_test.go index ff7bbf588..fba40a15c 100644 --- a/pkg/gql/query/query_test.go +++ b/pkg/gql/query/query_test.go @@ -147,7 +147,7 @@ func initializeDB(db database.DB) error { return db.Update(func(t database.Transaction) error { for _, block := range chain { - err := t.StoreBlock(block, false) + err := t.StoreBlock(block, true) if err != nil { return err } From 13a6e5079eb04991ad9639c9f3bb1885c50cb254 Mon Sep 17 00:00:00 2001 From: goshawk Date: Tue, 1 Mar 2022 14:32:08 +0200 Subject: [PATCH 08/10] Resolve lint errors --- harness/tests/localnet_test.go | 1 - pkg/core/chain/chain.go | 33 +++++++++++++++-------- pkg/core/database/heavy/transactions.go | 13 +++++---- pkg/core/database/lite/database.go | 2 -- pkg/core/database/testing/drivers_test.go | 14 +++++----- 5 files changed, 36 insertions(+), 27 deletions(-) diff --git a/harness/tests/localnet_test.go b/harness/tests/localnet_test.go index 4c1a1f259..d0ff7b9b2 100644 --- a/harness/tests/localnet_test.go +++ b/harness/tests/localnet_test.go @@ -43,7 +43,6 @@ var ( // // The network should be fully functioning and ready to accept messaging. func TestMain(m *testing.M) { - flag.Parse() // create the temp-dir workspace. Quit on error diff --git a/pkg/core/chain/chain.go b/pkg/core/chain/chain.go index e7916513e..7fc29e437 100644 --- a/pkg/core/chain/chain.go +++ b/pkg/core/chain/chain.go @@ -143,18 +143,26 @@ func New(ctx context.Context, db database.DB, eventBus *eventbus.EventBus, rpcBu } func (c *Chain) syncWithRusk() error { - ruskStateHash, err := c.proxy.Executor().GetStateRoot(c.ctx) + var ( + err error + ruskStateHash []byte + persistedHash []byte + prevBlock *block.Block + ) + + ruskStateHash, err = c.proxy.Executor().GetStateRoot(c.ctx) if err != nil { return err } - prevBlock, persistedHash, err := c.loader.LoadTip() + prevBlock, persistedHash, err = c.loader.LoadTip() if err != nil { return err } // Detect if both services are on the different state var persitedBlock *block.Block + err = c.db.View(func(t database.Transaction) error { persitedBlock, err = t.FetchBlock(persistedHash) if err != nil { @@ -185,12 +193,16 @@ func (c *Chain) syncWithRusk() error { // re-accept missing block in order to recover Rusk (unpersisted) state. i := persitedBlock.Header.Height + for { i++ var blk *block.Block + err = c.db.View(func(t database.Transaction) error { - hash, err := t.FetchBlockHashByHeight(i) + var hash []byte + + hash, err = t.FetchBlockHashByHeight(i) if err != nil { return err } @@ -198,6 +210,7 @@ func (c *Chain) syncWithRusk() error { blk, err = t.FetchBlock(hash) return err }) + if err != nil { break } @@ -534,13 +547,12 @@ func (c *Chain) acceptBlock(blk block.Block, withSanityCheck bool) error { // Persist persists a block in both Contract Storage state and dusk-blockchain db in atomic manner. func (c *Chain) persist(b *block.Block) error { var ( - fields = logger.Fields{ + clog = log.WithFields(logger.Fields{ "event": "accept_block", "height": b.Header.Height, "hash": util.StringifyBytes(b.Header.Hash), "curr_h": c.tip.Header.Height, - } - log = log.WithFields(fields) + }) err error pe = config.Get().State.PersistEvery @@ -558,19 +570,18 @@ func (c *Chain) persist(b *block.Block) error { // Persist block into dusk-blockchain database before any attempt to persist in Rusk. // If StoreBlock fails, no change will be applied in Rusk. // If Rusk.Persist fails, StoreBlock is rollbacked. - err := t.StoreBlock(b, p) - if err != nil { + if err = t.StoreBlock(b, p); err != nil { return err } // Persist Rusk state if p { - if err := c.proxy.Executor().Persist(c.ctx, b.Header.StateHash); err != nil { - log.WithError(err).Error("persisting contract state failed") + if err = c.proxy.Executor().Persist(c.ctx, b.Header.StateHash); err != nil { + clog.WithError(err).Error("persisting contract state failed") return err } - log.Debug("persisting contract state completed") + clog.Debug("persisting contract state completed") } return nil diff --git a/pkg/core/database/heavy/transactions.go b/pkg/core/database/heavy/transactions.go index d447086e8..9c6d5718a 100644 --- a/pkg/core/database/heavy/transactions.go +++ b/pkg/core/database/heavy/transactions.go @@ -49,9 +49,9 @@ var ( HeightPrefix = []byte{0x03} // TxIDPrefix is the prefix to identify the Transaction ID. TxIDPrefix = []byte{0x04} - // TipPrefix is the prefix to identify the hash of the latest blochchain block. + // TipPrefix is the prefix to identify the hash of the latest blockchain block. TipPrefix = []byte{0x05} - // PersistedPrefix is the prefix to identify the hash of the latest blochchain persisted block. + // PersistedPrefix is the prefix to identify the hash of the latest blockchain persisted block. PersistedPrefix = []byte{0x06} // CandidatePrefix is the prefix to identify Candidate messages. CandidatePrefix = []byte{0x07} @@ -167,12 +167,11 @@ func (t transaction) StoreBlock(b *block.Block, persisted bool) error { // To support fetching blockchain tip t.put(TipPrefix, b.Header.Hash) + // Key = PersistedPrefix + // Value = Hash(chain tip) + // + // To support fetching blockchain persisted hash if persisted { - // Key = PersistedPrefix - // Value = Hash(chain tip) - // - // To support fetching blockchain persisted hash - t.put(PersistedPrefix, b.Header.Hash) } diff --git a/pkg/core/database/lite/database.go b/pkg/core/database/lite/database.go index 6027692c1..207e83a15 100755 --- a/pkg/core/database/lite/database.go +++ b/pkg/core/database/lite/database.go @@ -24,10 +24,8 @@ const ( blocksInd = iota txsInd txHashInd - keyImagesInd heightInd stateInd - outputKeyInd candidateInd persistedInd maxInd diff --git a/pkg/core/database/testing/drivers_test.go b/pkg/core/database/testing/drivers_test.go index 75d5bc2a6..079dfb8fc 100644 --- a/pkg/core/database/testing/drivers_test.go +++ b/pkg/core/database/testing/drivers_test.go @@ -147,17 +147,19 @@ func _TestDriver(m *testing.M, driverName string) int { } func TestStoreBlock(test *testing.T) { + var ( + persistedHash []byte + done bool + ) + // Generate additional blocks to store - blocks, err := generateChainBlocks(20) + blks, err := generateChainBlocks(20) if err != nil { test.Fatal(err.Error()) } - var persistedHash []byte - done := false - err = db.Update(func(t database.Transaction) error { - for i, b := range blocks { + for i, b := range blks { var persisted bool if i%3 == 0 { @@ -190,7 +192,7 @@ func TestStoreBlock(test *testing.T) { return err1 } - if !bytes.Equal(blocks[len(blocks)-1].Header.Hash, s.TipHash) { + if !bytes.Equal(blks[len(blks)-1].Header.Hash, s.TipHash) { return fmt.Errorf("invalid chain tip") } From 83a988c3f7f6122da52c952ca386d1ad61bb62e1 Mon Sep 17 00:00:00 2001 From: goshawk Date: Tue, 1 Mar 2022 14:52:43 +0200 Subject: [PATCH 09/10] Disable PerformSanityCheck temporarily --- cmd/dusk/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/dusk/server.go b/cmd/dusk/server.go index 376548b55..9125fb0a6 100644 --- a/cmd/dusk/server.go +++ b/cmd/dusk/server.go @@ -72,9 +72,9 @@ func LaunchChain(ctx context.Context, cl *loop.Consensus, proxy transactions.Pro // Perform database sanity check to ensure that it is rational before // bootstrapping all node subsystems - if err := l.PerformSanityCheck(0, 10, 0); err != nil { - return nil, err - } + //if err := l.PerformSanityCheck(0, 10, 0); err != nil { + // return nil, err + //} return chainProcess, nil } From d9b6f1e204001104d7cb375d3fa17c7d867991a0 Mon Sep 17 00:00:00 2001 From: goshawk Date: Wed, 2 Mar 2022 09:43:37 +0200 Subject: [PATCH 10/10] Simplify PerformSanityCheck/SanityCheckBlockchain to check blocks from height 1 --- cmd/dusk/server.go | 6 +++--- pkg/core/chain/chain.go | 6 +++--- pkg/core/chain/loader.go | 32 ++++++++++---------------------- pkg/core/chain/mock.go | 8 ++++---- 4 files changed, 20 insertions(+), 32 deletions(-) diff --git a/cmd/dusk/server.go b/cmd/dusk/server.go index 9125fb0a6..4d065b342 100644 --- a/cmd/dusk/server.go +++ b/cmd/dusk/server.go @@ -72,9 +72,9 @@ func LaunchChain(ctx context.Context, cl *loop.Consensus, proxy transactions.Pro // Perform database sanity check to ensure that it is rational before // bootstrapping all node subsystems - //if err := l.PerformSanityCheck(0, 10, 0); err != nil { - // return nil, err - //} + if err := l.SanityCheckBlockchain(0, 10); err != nil { + return nil, err + } return chainProcess, nil } diff --git a/pkg/core/chain/chain.go b/pkg/core/chain/chain.go index 7fc29e437..b07ad2828 100644 --- a/pkg/core/chain/chain.go +++ b/pkg/core/chain/chain.go @@ -49,8 +49,8 @@ var ErrBlockAlreadyAccepted = errors.New("discarded block from the past") // Verifier performs checks on the blockchain and potentially new incoming block. type Verifier interface { - // PerformSanityCheck on first N blocks and M last blocks. - PerformSanityCheck(startAt uint64, firstBlocksAmount uint64, lastBlockAmount uint64) error + // SanityCheckBlockchain on first N blocks and M last blocks. + SanityCheckBlockchain(startAt uint64, firstBlocksAmount uint64) error // SanityCheckBlock will verify whether a block is valid according to the rules of the consensus. SanityCheckBlock(prevBlock block.Block, blk block.Block) error } @@ -58,7 +58,7 @@ type Verifier interface { // Loader is an interface which abstracts away the storage used by the Chain to // store the blockchain. type Loader interface { - // LoadTip of the chain. + // LoadTip of the chain. Returns blockchain tip and persisted hash. LoadTip() (*block.Block, []byte, error) // Clear removes everything from the DB. Clear() error diff --git a/pkg/core/chain/loader.go b/pkg/core/chain/loader.go index 356907f60..c42290256 100644 --- a/pkg/core/chain/loader.go +++ b/pkg/core/chain/loader.go @@ -18,7 +18,7 @@ import ( const ( // SanityCheckHeight is the suggested amount of blocks to check when - // calling Loader.PerformSanityCheck. + // calling Loader.SanityCheckBlockchain. SanityCheckHeight uint64 = 10 ) @@ -117,40 +117,28 @@ func (l *DBLoader) Close(driver string) error { return drvr.Close() } -// PerformSanityCheck checks the head and the tail of the blockchain to avoid +// SanityCheckBlockchain checks the head and the tail of the blockchain to avoid // inconsistencies and a faulty bootstrap. -func (l *DBLoader) PerformSanityCheck(startAt, firstBlocksAmount, lastBlocksAmount uint64) error { +func (l *DBLoader) SanityCheckBlockchain(startAt, firstBlocksAmount uint64) error { var height uint64 - var prevBlock *block.Block - if startAt > 0 { - return errors.New("performing sanity checks from arbitrary points is not supported yet") - } - - if startAt == 0 { - prevBlock = l.genesis - } - - prevHeader := prevBlock.Header // Verify first N blocks err := l.db.View(func(t database.Transaction) error { - // This will most likely verify genesis, unless the startAt parameter - // is set to some other height. In case of genesis, failure here would mostly occur if mainnet node - // loads testnet blockchain - hash, err := t.FetchBlockHashByHeight(startAt) + h, err := t.FetchBlockHashByHeight(startAt) if err != nil { return err } - if !bytes.Equal(prevHeader.Hash, hash) { - return fmt.Errorf("invalid genesis block") + prevHeader, err := t.FetchBlockHeader(h) + if err != nil { + return err } - for height = 1; height <= firstBlocksAmount; height++ { + for height = startAt + 1; height <= firstBlocksAmount; height++ { hash, err := t.FetchBlockHashByHeight(height) if err == database.ErrBlockNotFound { - // seems we reach the tip + // we reach the tip return nil } @@ -176,7 +164,7 @@ func (l *DBLoader) PerformSanityCheck(startAt, firstBlocksAmount, lastBlocksAmou return err } - // TODO: Verify lastBlockAmount blocks + // TODO: Verify last blocks return nil } diff --git a/pkg/core/chain/mock.go b/pkg/core/chain/mock.go index a2c2e202e..ccd25e940 100644 --- a/pkg/core/chain/mock.go +++ b/pkg/core/chain/mock.go @@ -13,8 +13,8 @@ import ( // MockVerifier is a mock for the chain.Verifier interface. type MockVerifier struct{} -// PerformSanityCheck on first N blocks and M last blocks. -func (v *MockVerifier) PerformSanityCheck(uint64, uint64, uint64) error { +// SanityCheckBlockchain on first N blocks and M last blocks. +func (v *MockVerifier) SanityCheckBlockchain(uint64, uint64) error { return nil } @@ -44,8 +44,8 @@ func (m *MockLoader) LoadTip() (*block.Block, []byte, error) { return &m.blockchain[len(m.blockchain)], nil, nil } -// PerformSanityCheck on first N blocks and M last blocks. -func (m *MockLoader) PerformSanityCheck(uint64, uint64, uint64) error { +// SanityCheckBlockchain on first N blocks and M last blocks. +func (m *MockLoader) SanityCheckBlockchain(uint64, uint64, uint64) error { return nil }