diff --git a/cmd/dusk/server.go b/cmd/dusk/server.go index f8f92e9f7..4d065b342 100644 --- a/cmd/dusk/server.go +++ b/cmd/dusk/server.go @@ -72,7 +72,7 @@ func LaunchChain(ctx context.Context, cl *loop.Consensus, proxy transactions.Pro // Perform database sanity check to ensure that it is rational before // bootstrapping all node subsystems - if err := l.PerformSanityCheck(0, 10, 0); err != nil { + if err := l.SanityCheckBlockchain(0, 10); err != nil { return nil, err } @@ -115,6 +115,8 @@ func Setup() *Server { proxy, ruskConn := setupGRPCClients(gctx) + log.Info("grpc connection with rusk service established") + m := mempool.NewMempool(db, eventBus, rpcBus, proxy.Prober()) m.Run(parentCtx) diff --git a/harness/engine/network.go b/harness/engine/network.go index 422dcfeb9..fcab0dbde 100644 --- a/harness/engine/network.go +++ b/harness/engine/network.go @@ -356,8 +356,9 @@ func (n *Network) start(nodeDir string, name string, arg ...string) error { // CREATE THE RUSK STATE for the local rusk stateExec := name + "-recovery-state" - cmd := exec.Command(stateExec) + cmd := exec.Command(stateExec, "-w", "-f") cmd.Env = append(envWithNoRusk, "TMPDIR="+nodeDir, "RUSK_PROFILE_PATH="+nodeDir) + cmd.Start() cmd.Wait() diff --git a/harness/tests/localnet_test.go b/harness/tests/localnet_test.go index b8e91eb13..d0ff7b9b2 100644 --- a/harness/tests/localnet_test.go +++ b/harness/tests/localnet_test.go @@ -11,7 +11,6 @@ import ( "encoding/hex" "flag" "fmt" - "io/ioutil" "os" "strconv" "testing" @@ -44,15 +43,11 @@ var ( // // The network should be fully functioning and ready to accept messaging. func TestMain(m *testing.M) { - var err error - flag.Parse() // create the temp-dir workspace. Quit on error - workspace, err = ioutil.TempDir(os.TempDir(), "localnet-") - if err != nil { - quit(localNet, workspace, err) - } + workspace = os.TempDir() + "/localnet" + os.Mkdir(workspace, 0700) // set the network size if localNetSizeStr != "" { diff --git a/pkg/core/chain/chain.go b/pkg/core/chain/chain.go index f6f6eadc2..b07ad2828 100644 --- a/pkg/core/chain/chain.go +++ b/pkg/core/chain/chain.go @@ -49,8 +49,8 @@ var ErrBlockAlreadyAccepted = errors.New("discarded block from the past") // Verifier performs checks on the blockchain and potentially new incoming block. type Verifier interface { - // PerformSanityCheck on first N blocks and M last blocks. - PerformSanityCheck(startAt uint64, firstBlocksAmount uint64, lastBlockAmount uint64) error + // SanityCheckBlockchain on first N blocks and M last blocks. + SanityCheckBlockchain(startAt uint64, firstBlocksAmount uint64) error // SanityCheckBlock will verify whether a block is valid according to the rules of the consensus. SanityCheckBlock(prevBlock block.Block, blk block.Block) error } @@ -58,8 +58,8 @@ type Verifier interface { // Loader is an interface which abstracts away the storage used by the Chain to // store the blockchain. type Loader interface { - // LoadTip of the chain. - LoadTip() (*block.Block, error) + // LoadTip of the chain. Returns blockchain tip and persisted hash. + LoadTip() (*block.Block, []byte, error) // Clear removes everything from the DB. Clear() error // Close the Loader and finalizes any pending connection. @@ -68,8 +68,6 @@ type Loader interface { Height() (uint64, error) // BlockAt returns the block at a given height. BlockAt(uint64) (block.Block, error) - // Append a block on the storage. - Append(*block.Block) error } // Chain represents the nodes blockchain. @@ -131,20 +129,100 @@ func New(ctx context.Context, db database.DB, eventBus *eventbus.EventBus, rpcBu return nil, err } + if srv != nil { + node.RegisterChainServer(srv, chain) + } + chain.p = &provisioners - prevBlock, err := loader.LoadTip() - if err != nil { + if err := chain.syncWithRusk(); err != nil { return nil, err } - chain.tip = prevBlock + return chain, nil +} - if srv != nil { - node.RegisterChainServer(srv, chain) +func (c *Chain) syncWithRusk() error { + var ( + err error + ruskStateHash []byte + persistedHash []byte + prevBlock *block.Block + ) + + ruskStateHash, err = c.proxy.Executor().GetStateRoot(c.ctx) + if err != nil { + return err } - return chain, nil + prevBlock, persistedHash, err = c.loader.LoadTip() + if err != nil { + return err + } + + // Detect if both services are on the different state + var persitedBlock *block.Block + + err = c.db.View(func(t database.Transaction) error { + persitedBlock, err = t.FetchBlock(persistedHash) + if err != nil { + return err + } + + if !bytes.Equal(persitedBlock.Header.StateHash, ruskStateHash) { + log.WithField("rusk", hex.EncodeToString(ruskStateHash)). + WithField("node", hex.EncodeToString(persitedBlock.Header.StateHash)). + Error("invalid state detected") + return errors.New("invalid state detected") + } + + return err + }) + if err != nil { + return err + } + + // Update blockchain tip (in-memory) + c.tip = persitedBlock + + // If both persisted block hash and latest blockchain block hash are the + // same then there is no need to execute sync-up. + if bytes.Equal(persistedHash, prevBlock.Header.Hash) { + return nil + } + + // re-accept missing block in order to recover Rusk (unpersisted) state. + i := persitedBlock.Header.Height + + for { + i++ + + var blk *block.Block + + err = c.db.View(func(t database.Transaction) error { + var hash []byte + + hash, err = t.FetchBlockHashByHeight(i) + if err != nil { + return err + } + + blk, err = t.FetchBlock(hash) + return err + }) + + if err != nil { + break + } + + // Re-accepting all blocks that have not been persisted in Rusk. + // This will re-execute accept/finalize accordingly and update chain tip. + if err := c.acceptBlock(*blk, false); err != nil { + return err + } + } + + return nil } // ProcessBlockFromNetwork will handle blocks incoming from the network. @@ -200,7 +278,7 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([] // from the network during out-of-sync state. func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error { log.WithField("height", blk.Header.Height).Trace("accepting sync block") - return c.acceptBlock(blk) + return c.acceptBlock(blk, true) } // TryNextConsecutiveBlockInSync is the processing path for accepting a block @@ -235,7 +313,7 @@ func (c *Chain) TryNextConsecutiveBlockIsValid(blk block.Block) error { l := log.WithFields(fields) - return c.isValidBlock(blk, l) + return c.isValidBlock(blk, l, true) } // ProcessSyncTimerExpired called by outsync timer when a peer does not provide GetData response. @@ -263,7 +341,7 @@ func (c *Chain) ProcessSyncTimerExpired(strPeerAddr string) error { func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error { log.WithField("height", blk.Header.Height).Trace("accepting succeeding block") - if err := c.acceptBlock(blk); err != nil { + if err := c.acceptBlock(blk, true); err != nil { return err } @@ -317,7 +395,9 @@ func (c *Chain) runStateTransition(tipBlk, blk block.Block) (*block.Block, error blk.Header.Height, config.BlockGasLimit) if err != nil { - l.WithError(err).Error("Error in executing the state transition") + l.WithError(err). + WithField("grpc", "finalize"). + Error("Error in executing the state transition") return block.NewBlock(), err } default: @@ -328,7 +408,10 @@ func (c *Chain) runStateTransition(tipBlk, blk block.Block) (*block.Block, error blk.Header.Height, config.BlockGasLimit) if err != nil { - l.WithError(err).Error("Error in executing the state transition") + l.WithError(err). + WithField("grpc", "accept"). + Error("Error in executing the state transition") + return block.NewBlock(), err } } @@ -390,12 +473,14 @@ func (c *Chain) sanityCheckStateHash() error { return nil } -func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry) error { +func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry, withSanityCheck bool) error { l.Debug("verifying block") // Check that stateless and stateful checks pass - if err := c.verifier.SanityCheckBlock(*c.tip, blk); err != nil { - l.WithError(err).Error("block verification failed") - return err + if withSanityCheck { + if err := c.verifier.SanityCheckBlock(*c.tip, blk); err != nil { + l.WithError(err).Error("block verification failed") + return err + } } // Check the certificate @@ -417,7 +502,7 @@ func (c *Chain) isValidBlock(blk block.Block, l *logrus.Entry) error { // 1. We have not seen it before // 2. All stateless and stateful checks are true // Returns nil, if checks passed and block was successfully saved. -func (c *Chain) acceptBlock(blk block.Block) error { +func (c *Chain) acceptBlock(blk block.Block, withSanityCheck bool) error { fields := logger.Fields{ "event": "accept_block", "height": blk.Header.Height, @@ -430,7 +515,7 @@ func (c *Chain) acceptBlock(blk block.Block) error { var err error // 1. Ensure block fields and certificate are valid - if err = c.isValidBlock(blk, l); err != nil { + if err = c.isValidBlock(blk, l, withSanityCheck); err != nil { l.WithError(err).Error("invalid block error") return err } @@ -443,22 +528,68 @@ func (c *Chain) acceptBlock(blk block.Block) error { return err } - // 3. Store the approved block and update in-memory chain tip - l.Debug("storing block") + // 3. Persist the approved block and update in-memory chain tip + l.Debug("persisting block") - if err := c.loader.Append(b); err != nil { - l.WithError(err).Error("block storing failed") + if err := c.persist(b); err != nil { + l.WithError(err).Error("persisting block failed") return err } - c.tip = &blk + c.tip = b // 5. Perform all post-events on accepting a block - c.postAcceptBlock(blk, l) + c.postAcceptBlock(*b, l) return nil } +// Persist persists a block in both Contract Storage state and dusk-blockchain db in atomic manner. +func (c *Chain) persist(b *block.Block) error { + var ( + clog = log.WithFields(logger.Fields{ + "event": "accept_block", + "height": b.Header.Height, + "hash": util.StringifyBytes(b.Header.Hash), + "curr_h": c.tip.Header.Height, + }) + + err error + pe = config.Get().State.PersistEvery + ) + + // Atomic persist + err = c.db.Update(func(t database.Transaction) error { + var p bool + + if pe > 0 && b.Header.Height%pe == 0 { + // Mark it as a persisted block + p = true + } + + // Persist block into dusk-blockchain database before any attempt to persist in Rusk. + // If StoreBlock fails, no change will be applied in Rusk. + // If Rusk.Persist fails, StoreBlock is rollbacked. + if err = t.StoreBlock(b, p); err != nil { + return err + } + + // Persist Rusk state + if p { + if err = c.proxy.Executor().Persist(c.ctx, b.Header.StateHash); err != nil { + clog.WithError(err).Error("persisting contract state failed") + return err + } + + clog.Debug("persisting contract state completed") + } + + return nil + }) + + return err +} + // postAcceptBlock performs all post-events on accepting a block. func (c *Chain) postAcceptBlock(blk block.Block, l *logrus.Entry) { // 1. Notify other subsystems for the accepted block @@ -482,23 +613,6 @@ func (c *Chain) postAcceptBlock(blk block.Block, l *logrus.Entry) { go c.storeStakesInStormDB(blk.Header.Height) } - // 4. Persist state - pe := config.Get().State.PersistEvery - - if pe > 0 { - if blk.Header.Height%pe == 0 { - l.Debug("chain: persist state", blk.Header.Height) - - if err := c.proxy.Executor().Persist(c.ctx, c.tip.Header.StateHash); err != nil { - // TODO: trigger resync procedure - // https://github.com/dusk-network/dusk-blockchain/issues/1286 - l.WithError(err).Error("chain: persist state failed") - } - } - } else { - l.Warn("State won't persist! Set `state.persistEvery` to a positive value") - } - diagnostics.LogPublishErrors("chain/chain.go, topics.AcceptedBlock", errList) l.Debug("procedure ended") } diff --git a/pkg/core/chain/chain_test.go b/pkg/core/chain/chain_test.go index 6810c8348..b9b470d90 100644 --- a/pkg/core/chain/chain_test.go +++ b/pkg/core/chain/chain_test.go @@ -152,7 +152,7 @@ func TestAcceptBlock(t *testing.T) { cert.Step = 5 blk.Header.Certificate = cert - assert.NoError(c.acceptBlock(*blk)) + assert.NoError(c.acceptBlock(*blk, true), true) // Should have `blk` as blockchain head now assert.True(bytes.Equal(blk.Header.Hash, c.tip.Header.Hash)) @@ -177,11 +177,11 @@ func TestFetchTip(t *testing.T) { _, chain := setupChainTest(t, 0) // on a modern chain, state(tip) must point at genesis - var s *database.State + var s *database.Registry err := chain.db.View(func(t database.Transaction) error { var err error - s, err = t.FetchState() + s, err = t.FetchRegistry() return err }) assert.NoError(err) diff --git a/pkg/core/chain/fallback.go b/pkg/core/chain/fallback.go index 9c106bdcb..5efb77b91 100644 --- a/pkg/core/chain/fallback.go +++ b/pkg/core/chain/fallback.go @@ -35,7 +35,7 @@ func (c *Chain) tryFallback(blk block.Block) error { c.tip = &prevBlk // Perform verify and accept block procedure - if err := c.acceptBlock(blk); err != nil { + if err := c.acceptBlock(blk, true); err != nil { c.tip = &oldTip return err } diff --git a/pkg/core/chain/loader.go b/pkg/core/chain/loader.go index eb553b521..c42290256 100644 --- a/pkg/core/chain/loader.go +++ b/pkg/core/chain/loader.go @@ -18,7 +18,7 @@ import ( const ( // SanityCheckHeight is the suggested amount of blocks to check when - // calling Loader.PerformSanityCheck. + // calling Loader.SanityCheckBlockchain. SanityCheckHeight uint64 = 10 ) @@ -78,13 +78,6 @@ func (l *DBLoader) Height() (uint64, error) { return height, err } -// Append stores a block in the DB. -func (l *DBLoader) Append(blk *block.Block) error { - return l.db.Update(func(t database.Transaction) error { - return t.StoreBlock(blk) - }) -} - // BlockAt returns the block stored at a given height. func (l *DBLoader) BlockAt(searchingHeight uint64) (block.Block, error) { var blk *block.Block @@ -124,40 +117,28 @@ func (l *DBLoader) Close(driver string) error { return drvr.Close() } -// PerformSanityCheck checks the head and the tail of the blockchain to avoid +// SanityCheckBlockchain checks the head and the tail of the blockchain to avoid // inconsistencies and a faulty bootstrap. -func (l *DBLoader) PerformSanityCheck(startAt, firstBlocksAmount, lastBlocksAmount uint64) error { +func (l *DBLoader) SanityCheckBlockchain(startAt, firstBlocksAmount uint64) error { var height uint64 - var prevBlock *block.Block - - if startAt > 0 { - return errors.New("performing sanity checks from arbitrary points is not supported yet") - } - if startAt == 0 { - prevBlock = l.genesis - } - - prevHeader := prevBlock.Header // Verify first N blocks err := l.db.View(func(t database.Transaction) error { - // This will most likely verify genesis, unless the startAt parameter - // is set to some other height. In case of genesis, failure here would mostly occur if mainnet node - // loads testnet blockchain - hash, err := t.FetchBlockHashByHeight(startAt) + h, err := t.FetchBlockHashByHeight(startAt) if err != nil { return err } - if !bytes.Equal(prevHeader.Hash, hash) { - return fmt.Errorf("invalid genesis block") + prevHeader, err := t.FetchBlockHeader(h) + if err != nil { + return err } - for height = 1; height <= firstBlocksAmount; height++ { + for height = startAt + 1; height <= firstBlocksAmount; height++ { hash, err := t.FetchBlockHashByHeight(height) if err == database.ErrBlockNotFound { - // seems we reach the tip + // we reach the tip return nil } @@ -183,27 +164,29 @@ func (l *DBLoader) PerformSanityCheck(startAt, firstBlocksAmount, lastBlocksAmou return err } - // TODO: Verify lastBlockAmount blocks + // TODO: Verify last blocks return nil } // LoadTip returns the tip of the chain. -func (l *DBLoader) LoadTip() (*block.Block, error) { +func (l *DBLoader) LoadTip() (*block.Block, []byte, error) { var tip *block.Block + var persistedHash []byte err := l.db.Update(func(t database.Transaction) error { - s, err := t.FetchState() + s, err := t.FetchRegistry() if err != nil { // TODO: maybe log the error here and diversify between empty // results and actual errors // Store Genesis Block, if a modern node runs - err = t.StoreBlock(l.genesis) + err = t.StoreBlock(l.genesis, true) if err != nil { return err } tip = l.genesis + persistedHash = l.genesis.Header.Hash return nil } @@ -219,10 +202,12 @@ func (l *DBLoader) LoadTip() (*block.Block, error) { } tip = &block.Block{Header: h, Txs: txs} + persistedHash = s.PersistedHash + return nil }) if err != nil { - return nil, err + return nil, nil, err } // Verify chain state. There shouldn't be any blocks higher than chainTip @@ -244,8 +229,8 @@ func (l *DBLoader) LoadTip() (*block.Block, error) { }) if err != nil { - return nil, err + return nil, nil, err } - return tip, nil + return tip, persistedHash, nil } diff --git a/pkg/core/chain/mock.go b/pkg/core/chain/mock.go index 0cecb5912..ccd25e940 100644 --- a/pkg/core/chain/mock.go +++ b/pkg/core/chain/mock.go @@ -13,8 +13,8 @@ import ( // MockVerifier is a mock for the chain.Verifier interface. type MockVerifier struct{} -// PerformSanityCheck on first N blocks and M last blocks. -func (v *MockVerifier) PerformSanityCheck(uint64, uint64, uint64) error { +// SanityCheckBlockchain on first N blocks and M last blocks. +func (v *MockVerifier) SanityCheckBlockchain(uint64, uint64) error { return nil } @@ -40,12 +40,12 @@ func (m *MockLoader) Height() (uint64, error) { } // LoadTip of the chain. -func (m *MockLoader) LoadTip() (*block.Block, error) { - return &m.blockchain[len(m.blockchain)], nil +func (m *MockLoader) LoadTip() (*block.Block, []byte, error) { + return &m.blockchain[len(m.blockchain)], nil, nil } -// PerformSanityCheck on first N blocks and M last blocks. -func (m *MockLoader) PerformSanityCheck(uint64, uint64, uint64) error { +// SanityCheckBlockchain on first N blocks and M last blocks. +func (m *MockLoader) SanityCheckBlockchain(uint64, uint64, uint64) error { return nil } diff --git a/pkg/core/chain/synchronizer_test.go b/pkg/core/chain/synchronizer_test.go index 9af912ee1..e5af1a76c 100644 --- a/pkg/core/chain/synchronizer_test.go +++ b/pkg/core/chain/synchronizer_test.go @@ -58,7 +58,7 @@ func setupSynchronizerTest() (*synchronizer, chan consensus.Results) { genesis := genesis.Decode() if err := db.Update(func(t database.Transaction) error { - return t.StoreBlock(genesis) + return t.StoreBlock(genesis, false) }); err != nil { panic(err) } diff --git a/pkg/core/consensus/testing/node.go b/pkg/core/consensus/testing/node.go index 8923e2416..3e156c1ea 100644 --- a/pkg/core/consensus/testing/node.go +++ b/pkg/core/consensus/testing/node.go @@ -36,7 +36,7 @@ func newNode(ctx context.Context, assert *assert.Assertions, eb *eventbus.EventB // the `proxy` either way. genesis := genesis.Decode() l := chain.NewDBLoader(db, genesis) - _, err := l.LoadTip() + _, _, err := l.LoadTip() assert.NoError(err) e := &consensus.Emitter{ diff --git a/pkg/core/database/README.md b/pkg/core/database/README.md index 5d6ef2d1b..80b04dfd1 100644 --- a/pkg/core/database/README.md +++ b/pkg/core/database/README.md @@ -80,6 +80,11 @@ _ = db.View(func(tx database.Tx) error { }) ``` +## Blockchain registry +Blockchain database supports (read/write) two additional KV records which store both hash of the blockchain latest block and hash of blockchain latest `persisted` block. A block has a meaning of a persisted block only if rusk.Persist grpc has been called for it successfully. + +`FetchRegistry` method from Transaction interface should be used to fetch the abovementioned KV records. + ## Additional features Additional features that can be provided by a Driver: diff --git a/pkg/core/database/heavy.md b/pkg/core/database/heavy.md index 0e79dfab7..b3b119918 100644 --- a/pkg/core/database/heavy.md +++ b/pkg/core/database/heavy.md @@ -10,16 +10,16 @@ For general concept explanation one can refer to /pkg/core/database/README.md. T | :---: | :---: | :---: | :---: | :---: | | 0x01 | HeaderHash | Header.Encode\(\) | 1 per block | | | 0x02 | HeaderHash + TxID | TxIndex + Tx.Encode\(\) | block txs count | | -| 0x04 | TxID | HeaderHash | block txs count | FetchBlockTxByHash | -| 0x05 | KeyImage | TxID | sum of block txs inputs | FetchKeyImageExists | | 0x03 | Height | HeaderHash | 1 per block | FetchBlockHashByHeight | -| 0x07 | State | Chain tip hash | 1 per chain | FetchState | +| 0x04 | TxID | HeaderHash | block txs count | FetchBlockTxByHash | +| 0x05 | Tip | Hash of latest block | 1 per chain | FetchRegistry | +| 0x06 | Persisted | Hash of latest persisted block | 1 per chain | FetchRegistry | ## K/V storage schema to store a candidate `pkg/core/block.Block` | Prefix | KEY | VALUE | Count | Used by | | :---: | :---: | :---: | :---: | :---: | -| 0x06 | HeaderHash + Height | Block.Encode\(\) | Many per blockchain | Store/Fetch/Delete CandidateBlock | +| 0x07 | HeaderHash | Block.Encode\(\) | Many per blockchain | Store/Fetch/Delete CandidateBlock | Table notation @@ -28,9 +28,3 @@ Table notation * \'+' operation - denotes concatenation of byte arrays * Tx.Encode\(\) - Encoded binary form of all Tx fields without TxID -## K/V storage schema to store block generator bid values - -| Prefix | KEY | VALUE | Count | Used by | -| :---: | :---: | :---: | :---: | :---: | -| 0x08 | ExpiryHeight | D + K | 1 per bidding transaction made by user | FetchBidValues | - diff --git a/pkg/core/database/heavy/transactions.go b/pkg/core/database/heavy/transactions.go index a3801e730..9c6d5718a 100644 --- a/pkg/core/database/heavy/transactions.go +++ b/pkg/core/database/heavy/transactions.go @@ -8,7 +8,6 @@ package heavy import ( "bytes" - "encoding/binary" "errors" "fmt" "math" @@ -50,14 +49,12 @@ var ( HeightPrefix = []byte{0x03} // TxIDPrefix is the prefix to identify the Transaction ID. TxIDPrefix = []byte{0x04} - // KeyImagePrefix is the prefix to identify the Key Image. - KeyImagePrefix = []byte{0x05} - // StatePrefix is the prefix to identify the State. - StatePrefix = []byte{0x06} - // OutputKeyPrefix is the prefix to identify the Output. - OutputKeyPrefix = []byte{0x07} + // TipPrefix is the prefix to identify the hash of the latest blockchain block. + TipPrefix = []byte{0x05} + // PersistedPrefix is the prefix to identify the hash of the latest blockchain persisted block. + PersistedPrefix = []byte{0x06} // CandidatePrefix is the prefix to identify Candidate messages. - CandidatePrefix = []byte{0x08} + CandidatePrefix = []byte{0x07} ) type transaction struct { @@ -83,7 +80,7 @@ type transaction struct { // // It is assumed that StoreBlock would be called much less times than Fetch* // APIs. Based on that, extra indexing data is put to provide fast-read lookups. -func (t transaction) StoreBlock(b *block.Block) error { +func (t transaction) StoreBlock(b *block.Block, persisted bool) error { if t.batch == nil { // t.batch is initialized only on a open, read-write transaction // (built with transaction.Update()). @@ -161,18 +158,22 @@ func (t transaction) StoreBlock(b *block.Block) error { } key = append(HeightPrefix, heightBuf.Bytes()...) - value = b.Header.Hash - t.put(key, value) + t.put(key, b.Header.Hash) - // Key = StatePrefix + // Key = TipPrefix // Value = Hash(chain tip) // // To support fetching blockchain tip - key = StatePrefix - value = b.Header.Hash + t.put(TipPrefix, b.Header.Hash) - t.put(key, value) + // Key = PersistedPrefix + // Value = Hash(chain tip) + // + // To support fetching blockchain persisted hash + if persisted { + t.put(PersistedPrefix, b.Header.Hash) + } return nil } @@ -222,39 +223,6 @@ func (t transaction) FetchBlockExists(hash []byte) (bool, error) { return exists, err } -// FetchOutputExists checks if an output exists in the db. -func (t transaction) FetchOutputExists(destkey []byte) (bool, error) { - key := append(OutputKeyPrefix, destkey...) - exists, err := t.snapshot.Has(key, nil) - - // goleveldb returns nilIfNotFound - // see also nilIfNotFound in leveldb/db.go - if !exists && err == nil { - // overwrite error message - err = database.ErrOutputNotFound - } - - return exists, err -} - -// FetchOutputUnlockHeight returns the unlockheight of an output. -func (t transaction) FetchOutputUnlockHeight(destkey []byte) (uint64, error) { - key := append(OutputKeyPrefix, destkey...) - - unlockHeightBytes, err := t.snapshot.Get(key, nil) - if err != nil { - return 0, err - } - - if len(unlockHeightBytes) != 8 { - return 0, errors.New("unlock height malformed") - } - - // output unlock height is the first 8 bytes - unlockHeight := binary.LittleEndian.Uint64(unlockHeightBytes[0:8]) - return unlockHeight, err -} - func (t transaction) FetchBlockHeader(hash []byte) (*block.Header, error) { key := append(HeaderPrefix, hash...) @@ -401,27 +369,6 @@ func (t transaction) FetchBlockTxByHash(txID []byte) (transactions.ContractCall, return nil, txIndex, nil, errors.New("block tx is available but fetching it fails") } -// FetchKeyImageExists checks if the KeyImage exists. If so, it also returns the -// hash of its corresponding tx. -// -// Due to performance concerns, the found tx is not verified. By explicitly -// calling FetchBlockTxByHash, a consumer can check if the tx is real. -func (t transaction) FetchKeyImageExists(keyImage []byte) (bool, []byte, error) { - key := append(KeyImagePrefix, keyImage...) - - txID, err := t.snapshot.Get(key, nil) - if err != nil { - if err == leveldb.ErrNotFound { - // overwrite error message - err = database.ErrKeyImageNotFound - } - - return false, nil, err - } - - return true, txID, nil -} - func (t transaction) FetchBlock(hash []byte) (*block.Block, error) { header, err := t.FetchBlockHeader(hash) if err != nil { @@ -439,11 +386,19 @@ func (t transaction) FetchBlock(hash []byte) (*block.Block, error) { }, nil } -func (t transaction) FetchState() (*database.State, error) { - key := StatePrefix +func (t transaction) FetchRegistry() (*database.Registry, error) { + tipHash, err := t.snapshot.Get(TipPrefix, nil) + if err == leveldb.ErrNotFound || len(tipHash) == 0 { + // overwrite error message + err = database.ErrStateNotFound + } - value, err := t.snapshot.Get(key, nil) - if err == leveldb.ErrNotFound || len(value) == 0 { + if err != nil { + return nil, err + } + + persistedHash, err := t.snapshot.Get(PersistedPrefix, nil) + if err == leveldb.ErrNotFound || len(persistedHash) == 0 { // overwrite error message err = database.ErrStateNotFound } @@ -452,11 +407,14 @@ func (t transaction) FetchState() (*database.State, error) { return nil, err } - return &database.State{TipHash: value}, nil + return &database.Registry{ + TipHash: tipHash, + PersistedHash: persistedHash, + }, nil } func (t transaction) FetchCurrentHeight() (uint64, error) { - state, err := t.FetchState() + state, err := t.FetchRegistry() if err != nil { return 0, err } diff --git a/pkg/core/database/interfaces.go b/pkg/core/database/interfaces.go index 52f1a5b65..4079b1cb8 100644 --- a/pkg/core/database/interfaces.go +++ b/pkg/core/database/interfaces.go @@ -66,18 +66,14 @@ type Transaction interface { FetchBlockTxByHash(txID []byte) (tx transactions.ContractCall, txIndex uint32, blockHeaderHash []byte, err error) FetchBlockHashByHeight(height uint64) ([]byte, error) FetchBlockExists(hash []byte) (bool, error) - // Fetch chain state information (chain tip hash). - FetchState() (*State, error) - - // Check if an input keyImage is already stored. If succeeds, it returns - // also txID the input belongs to. - FetchKeyImageExists(keyImage []byte) (exists bool, txID []byte, err error) + // Fetch chain registry (chain tip hash, persisted block etc). + FetchRegistry() (*Registry, error) // Read-write transactions // Store the next chain block in a append-only manner // Overwrites only if block with same hash already stored // Not to be called concurrently, as it updates chain tip. - StoreBlock(block *block.Block) error + StoreBlock(block *block.Block, persisted bool) error // FetchBlock will return a block, given a hash. FetchBlock(hash []byte) (*block.Block, error) @@ -86,14 +82,6 @@ type Transaction interface { // block in the database. FetchCurrentHeight() (uint64, error) - // FetchOutputExists returns whether or not an output exists for the - // given destination public key. - FetchOutputExists(destkey []byte) (bool, error) - - // FetchOutputUnlockHeight will return the unlock height for an output - // given a destination public key. - FetchOutputUnlockHeight(destkey []byte) (uint64, error) - // FetchBlockHeightSince try to find height of a block generated around // sinceUnixTime starting the search from height (tip - offset). FetchBlockHeightSince(sinceUnixTime int64, offset uint64) (uint64, error) @@ -131,8 +119,8 @@ type DB interface { Close() error } -// State represents a single db entry that provides chain metadata. This -// includes currently only chain tip hash but could be extended at later stage. -type State struct { - TipHash []byte +// Registry represents a set database records that provide chain metadata. +type Registry struct { + TipHash []byte + PersistedHash []byte } diff --git a/pkg/core/database/lite/database.go b/pkg/core/database/lite/database.go index 9a35be533..207e83a15 100755 --- a/pkg/core/database/lite/database.go +++ b/pkg/core/database/lite/database.go @@ -24,11 +24,10 @@ const ( blocksInd = iota txsInd txHashInd - keyImagesInd heightInd stateInd - outputKeyInd candidateInd + persistedInd maxInd ) diff --git a/pkg/core/database/lite/transactions.go b/pkg/core/database/lite/transactions.go index 9c15a7d6f..e7f73016a 100644 --- a/pkg/core/database/lite/transactions.go +++ b/pkg/core/database/lite/transactions.go @@ -8,7 +8,6 @@ package lite import ( "bytes" - "encoding/binary" "errors" "fmt" "math" @@ -30,7 +29,7 @@ type transaction struct { // map lookup operation on block per height, one can utilize a height as index // in a slice. // NB: A single slice of all blocks to be used to avoid all duplications. -func (t *transaction) StoreBlock(b *block.Block) error { +func (t *transaction) StoreBlock(b *block.Block, persisted bool) error { if !t.writable { return errors.New("read-only transaction") } @@ -82,6 +81,10 @@ func (t *transaction) StoreBlock(b *block.Block) error { // Map stateKey to chain state (tip) t.batch[stateInd][toKey(stateKey)] = b.Header.Hash + if persisted { + t.batch[persistedInd][toKey(stateKey)] = b.Header.Hash + } + return nil } @@ -185,36 +188,22 @@ func (t transaction) FetchBlockTxByHash(txID []byte) (transactions.ContractCall, return tx, txIndex, hash, err } -func (t transaction) FetchKeyImageExists(keyImage []byte) (bool, []byte, error) { - var txID []byte +func (t transaction) FetchRegistry() (*database.Registry, error) { + var hash []byte var exists bool - if txID, exists = t.db.storage[keyImagesInd][toKey(keyImage)]; !exists { - return false, nil, database.ErrKeyImageNotFound + if hash, exists = t.db.storage[stateInd][toKey(stateKey)]; !exists { + return nil, database.ErrStateNotFound } - return true, txID, nil -} - -func (t transaction) FetchOutputExists(destkey []byte) (bool, error) { - _, exists := t.db.storage[outputKeyInd][toKey(destkey)] - return exists, nil -} - -func (t transaction) FetchOutputUnlockHeight(destkey []byte) (uint64, error) { - unlockHeight, exists := t.db.storage[outputKeyInd][toKey(destkey)] - if !exists { - return 0, errors.New("this output does not exist") + if len(hash) == 0 { + return nil, database.ErrStateNotFound } - return binary.LittleEndian.Uint64(unlockHeight), nil -} - -func (t transaction) FetchState() (*database.State, error) { - var hash []byte - var exists bool + s := &database.Registry{} + s.TipHash = hash - if hash, exists = t.db.storage[stateInd][toKey(stateKey)]; !exists { + if hash, exists = t.db.storage[persistedInd][toKey(stateKey)]; !exists { return nil, database.ErrStateNotFound } @@ -222,8 +211,7 @@ func (t transaction) FetchState() (*database.State, error) { return nil, database.ErrStateNotFound } - s := &database.State{} - s.TipHash = hash + s.PersistedHash = hash return s, nil } @@ -260,7 +248,7 @@ func (t *transaction) FetchBlock(hash []byte) (*block.Block, error) { } func (t *transaction) FetchCurrentHeight() (uint64, error) { - state, err := t.FetchState() + state, err := t.FetchRegistry() if err != nil { return 0, err } diff --git a/pkg/core/database/testing/drivers_test.go b/pkg/core/database/testing/drivers_test.go index 2ba52baed..079dfb8fc 100644 --- a/pkg/core/database/testing/drivers_test.go +++ b/pkg/core/database/testing/drivers_test.go @@ -147,19 +147,31 @@ func _TestDriver(m *testing.M, driverName string) int { } func TestStoreBlock(test *testing.T) { + var ( + persistedHash []byte + done bool + ) + // Generate additional blocks to store - genBlocks, err := generateChainBlocks(2) + blks, err := generateChainBlocks(20) if err != nil { test.Fatal(err.Error()) } - done := false err = db.Update(func(t database.Transaction) error { - for _, block := range genBlocks { - err = t.StoreBlock(block) + for i, b := range blks { + + var persisted bool + if i%3 == 0 { + persisted = true + persistedHash = b.Header.Hash + } + + err = t.StoreBlock(b, persisted) if err != nil { return err } + } done = true return nil @@ -175,14 +187,19 @@ func TestStoreBlock(test *testing.T) { // Ensure chain tip is updated too err = db.View(func(t database.Transaction) error { - s, err1 := t.FetchState() + s, err1 := t.FetchRegistry() if err1 != nil { return err1 } - if !bytes.Equal(genBlocks[len(genBlocks)-1].Header.Hash, s.TipHash) { + if !bytes.Equal(blks[len(blks)-1].Header.Hash, s.TipHash) { return fmt.Errorf("invalid chain tip") } + + if !bytes.Equal(s.PersistedHash, persistedHash) { + return fmt.Errorf("invalid persisted hash") + } + return nil }) @@ -379,7 +396,7 @@ func TestAtomicUpdates(test *testing.T) { forcedError := errors.New("force majeure situation") err := db.Update(func(t database.Transaction) error { for height, block := range genBlocks { - err := t.StoreBlock(block) + err := t.StoreBlock(block, false) if err != nil { return err } @@ -426,7 +443,7 @@ func TestReadOnlyTx(test *testing.T) { // Try to call StoreBlock on a read-only DB Tx err := db.View(func(t database.Transaction) error { for _, block := range blocks { - err := t.StoreBlock(block) + err := t.StoreBlock(block, false) if err != nil { return err } @@ -501,7 +518,7 @@ func TestReadOnlyDB_Mode(test *testing.T) { // Store all blocks with read-write DB err = dbReadWrite.Update(func(t database.Transaction) error { for _, b := range genBlocks { - if e := t.StoreBlock(b); e != nil { + if e := t.StoreBlock(b, false); e != nil { return e } } @@ -532,7 +549,7 @@ func TestReadOnlyDB_Mode(test *testing.T) { // Ensure read-only DB cannot write err = dbReadOnly.Update(func(t database.Transaction) error { for _, block := range blocks { - err1 := t.StoreBlock(block) + err1 := t.StoreBlock(block, false) if err1 != nil { return err1 } diff --git a/pkg/core/database/testing/utils.go b/pkg/core/database/testing/utils.go index e80579e15..d540d8dd6 100644 --- a/pkg/core/database/testing/utils.go +++ b/pkg/core/database/testing/utils.go @@ -30,7 +30,7 @@ func storeBlocks(db database.DB, blocks []*block.Block) error { err := db.Update(func(t database.Transaction) error { for _, block := range blocks { // Store block - err := t.StoreBlock(block) + err := t.StoreBlock(block, false) if err != nil { fmt.Print(err.Error()) return err diff --git a/pkg/gql/query/query_test.go b/pkg/gql/query/query_test.go index d68d83a84..fba40a15c 100644 --- a/pkg/gql/query/query_test.go +++ b/pkg/gql/query/query_test.go @@ -147,7 +147,7 @@ func initializeDB(db database.DB) error { return db.Update(func(t database.Transaction) error { for _, block := range chain { - err := t.StoreBlock(block) + err := t.StoreBlock(block, true) if err != nil { return err } diff --git a/pkg/p2p/peer/responding/blockhashbroker_test.go b/pkg/p2p/peer/responding/blockhashbroker_test.go index eb4736090..11f26a18f 100644 --- a/pkg/p2p/peer/responding/blockhashbroker_test.go +++ b/pkg/p2p/peer/responding/blockhashbroker_test.go @@ -77,7 +77,7 @@ func createGetBlocks(locator []byte) message.Message { func storeBlocks(db database.DB, blocks []*block.Block) error { for _, blk := range blocks { err := db.Update(func(t database.Transaction) error { - return t.StoreBlock(blk) + return t.StoreBlock(blk, false) }) if err != nil { return err