From b9855d5f0ad0e6e1a6d18b14e1367954a4458008 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 4 Jul 2024 16:00:30 +0200 Subject: [PATCH] node: include faults into block - Add `faults` to `Block` - Change `DB::store_block` to save faults in a separa CF - Add `DB::fetch_faults` to retrieve latest faults --- node/src/chain.rs | 7 ++- node/src/chain/acceptor.rs | 3 +- node/src/chain/genesis.rs | 1 + node/src/database.rs | 5 +- node/src/database/rocksdb.rs | 116 ++++++++++++++++++++++++++++++++++- rusk/tests/common/state.rs | 1 + 6 files changed, 128 insertions(+), 5 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index 7971cb8133..4261c86e37 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -259,7 +259,12 @@ impl ChainSrv { let genesis_blk = genesis::generate_state(state); db.write().await.update(|t| { // Persist genesis block - t.store_block(genesis_blk.header(), &[], Label::Final(0)) + t.store_block( + genesis_blk.header(), + &[], + &[], + Label::Final(0), + ) })?; BlockWithLabel::new_with_label(genesis_blk, Label::Final(0)) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 9d580e0d74..ae7e498217 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -470,7 +470,8 @@ impl Acceptor { let label = rolling_results.0; // Store block with updated transactions with Error and GasSpent - block_size_on_disk = db.store_block(header, &txs, label)?; + block_size_on_disk = + db.store_block(header, &txs, blk.faults(), label)?; Ok((txs, rolling_results)) })?; diff --git a/node/src/chain/genesis.rs b/node/src/chain/genesis.rs index 6d52cfb07c..d676bdd1cc 100644 --- a/node/src/chain/genesis.rs +++ b/node/src/chain/genesis.rs @@ -16,6 +16,7 @@ pub(crate) fn generate_state(state_hash: [u8; 32]) -> Block { ..Default::default() }, vec![], + vec![], ) .expect("block should be valid") } diff --git a/node/src/database.rs b/node/src/database.rs index e7d3da43ba..5b7012ba5a 100644 --- a/node/src/database.rs +++ b/node/src/database.rs @@ -10,7 +10,7 @@ use std::path::Path; pub mod rocksdb; use anyhow::Result; -use node_data::ledger; +use node_data::ledger::{self, Fault}; use node_data::ledger::{Label, SpentTransaction}; use serde::{Deserialize, Serialize}; @@ -52,6 +52,7 @@ pub trait Ledger { &self, header: &ledger::Header, txs: &[SpentTransaction], + faults: &[Fault], label: Label, ) -> Result; @@ -91,6 +92,8 @@ pub trait Ledger { hash: &[u8; 32], label: Label, ) -> Result<()>; + + fn fetch_faults(&self, start_height: u64) -> Result>; } pub trait Candidate { diff --git a/node/src/database/rocksdb.rs b/node/src/database/rocksdb.rs index c630e8f172..60043967e3 100644 --- a/node/src/database/rocksdb.rs +++ b/node/src/database/rocksdb.rs @@ -8,7 +8,7 @@ use super::{Candidate, DatabaseOptions, Ledger, Metadata, Persist, DB}; use anyhow::Result; use std::cell::RefCell; -use node_data::ledger::{self, Label, SpentTransaction}; +use node_data::ledger::{self, Fault, Label, SpentTransaction}; use node_data::Serializable; use crate::database::Mempool; @@ -32,6 +32,7 @@ use tracing::info; const CF_LEDGER_HEADER: &str = "cf_ledger_header"; const CF_LEDGER_TXS: &str = "cf_ledger_txs"; +const CF_LEDGER_FAULTS: &str = "cf_ledger_faults"; const CF_LEDGER_HEIGHT: &str = "cf_ledger_height"; const CF_CANDIDATES: &str = "cf_candidates"; const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height"; @@ -73,6 +74,11 @@ impl Backend { .cf_handle(CF_LEDGER_TXS) .expect("CF_LEDGER_TXS column family must exist"); + let ledger_faults_cf = self + .rocksdb + .cf_handle(CF_LEDGER_FAULTS) + .expect("CF_LEDGER_FAULTS column family must exist"); + let candidates_cf = self .rocksdb .cf_handle(CF_CANDIDATES) @@ -116,6 +122,7 @@ impl Backend { candidates_height_cf, ledger_cf, ledger_txs_cf, + ledger_faults_cf, mempool_cf, nullifiers_cf, fees_cf, @@ -179,6 +186,10 @@ impl DB for Backend { blocks_cf_opts.clone(), ), ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()), + ColumnFamilyDescriptor::new( + CF_LEDGER_FAULTS, + blocks_cf_opts.clone(), + ), ColumnFamilyDescriptor::new( CF_LEDGER_HEIGHT, blocks_cf_opts.clone(), @@ -249,6 +260,7 @@ pub struct DBTransaction<'db, DB: DBAccess> { // Ledger column families ledger_cf: &'db ColumnFamily, + ledger_faults_cf: &'db ColumnFamily, ledger_txs_cf: &'db ColumnFamily, ledger_height_cf: &'db ColumnFamily, @@ -267,6 +279,7 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { &self, header: &ledger::Header, txs: &[SpentTransaction], + faults: &[Fault], label: Label, ) -> Result { // COLUMN FAMILY: CF_LEDGER_HEADER @@ -282,6 +295,8 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { .iter() .map(|t| t.inner.id()) .collect::>(), + + faults_ids: faults.iter().map(|f| f.hash()).collect::>(), } .write(&mut buf)?; @@ -303,11 +318,51 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { self.put_cf(cf, tx.inner.id(), d)?; } } + + // COLUMN FAMILY: CF_LEDGER_FAULTS + { + let cf = self.ledger_faults_cf; + + // store all block faults + for f in faults { + let mut d = vec![]; + f.write(&mut d)?; + self.put_cf(cf, f.hash(), d)?; + } + } self.store_block_label(header.height, &header.hash, label)?; Ok(self.get_size()) } + fn fetch_faults(&self, start_height: u64) -> Result> { + let mut faults = vec![]; + let mut hash = self + .op_read(MD_HASH_KEY)? + .ok_or(anyhow::anyhow!("Cannot read tip"))?; + + loop { + let block = self.fetch_block(&hash)?.ok_or(anyhow::anyhow!( + "Cannot read block {}", + hex::encode(&hash) + ))?; + + let block_height = block.header().height; + + if block_height >= start_height { + hash = block.header().prev_block_hash.to_vec(); + faults.extend(block.into_faults()); + } else { + break; + } + + if block_height == 0 { + break; + } + } + Ok(faults) + } + fn store_block_label( &self, height: u64, @@ -332,6 +387,9 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { for tx in b.txs() { self.inner.delete_cf(self.ledger_txs_cf, tx.id())?; } + for f in b.faults() { + self.inner.delete_cf(self.ledger_faults_cf, f.hash())?; + } self.inner.delete_cf(self.ledger_cf, b.header().hash)?; @@ -364,8 +422,23 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> { txs.push(tx.inner); } + // Retrieve all faults ID with single call + let faults_buffer = self.snapshot.multi_get_cf( + record + .faults_ids + .iter() + .map(|id| (self.ledger_faults_cf, id)) + .collect::>(), + ); + let mut faults = vec![]; + for buf in faults_buffer { + let buf = buf?.unwrap(); + let fault = ledger::Fault::read(&mut &buf.to_vec()[..])?; + faults.push(fault); + } + Ok(Some( - ledger::Block::new(record.header, txs) + ledger::Block::new(record.header, txs, faults) .expect("block should be valid"), )) } @@ -839,6 +912,7 @@ fn deserialize_key(r: &mut R) -> Result<(u64, [u8; 32])> { struct HeaderRecord { header: ledger::Header, transactions_ids: Vec<[u8; 32]>, + faults_ids: Vec<[u8; 32]>, } impl node_data::Serializable for HeaderRecord { @@ -855,6 +929,15 @@ impl node_data::Serializable for HeaderRecord { w.write_all(tx_id)?; } + // Write faults count + let len = self.faults_ids.len() as u32; + w.write_all(&len.to_le_bytes())?; + + // Write faults hashes + for f_id in &self.faults_ids { + w.write_all(f_id)?; + } + Ok(()) } @@ -877,9 +960,22 @@ impl node_data::Serializable for HeaderRecord { transactions_ids.push(tx_id); } + // Read faults count + let len = Self::read_u32_le(r)?; + + // Read faults hashes + let mut faults_ids = vec![]; + for _ in 0..len { + let mut f_id = [0u8; 32]; + r.read_exact(&mut f_id[..])?; + + faults_ids.push(f_id); + } + Ok(Self { header, transactions_ids, + faults_ids, }) } } @@ -909,6 +1005,7 @@ mod tests { txn.store_block( b.header(), &to_spent_txs(b.txs()), + b.faults(), Label::Final(3), )?; Ok(()) @@ -928,6 +1025,15 @@ mod tests { for pos in 0..b.txs().len() { assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id()); } + + // Assert all faults are fully fetched from ledger as + // well. + for pos in 0..b.faults().len() { + assert_eq!( + db_blk.faults()[pos].hash(), + b.faults()[pos].hash() + ); + } }); assert!(db @@ -956,6 +1062,7 @@ mod tests { txn.store_block( b.header(), &to_spent_txs(b.txs()), + b.faults(), Label::Final(3), ) .expect("block to be stored"); @@ -985,6 +1092,7 @@ mod tests { txn.store_block( b.header(), &to_spent_txs(b.txs()), + b.faults(), Label::Final(3), ) .unwrap(); @@ -1138,6 +1246,7 @@ mod tests { txn.store_block( b.header(), &to_spent_txs(b.txs()), + b.faults(), Label::Final(3), )?; Ok(()) @@ -1172,6 +1281,7 @@ mod tests { txn.store_block( b.header(), &to_spent_txs(b.txs()), + b.faults(), Label::Attested(3), )?; Ok(()) @@ -1202,6 +1312,7 @@ mod tests { txn.store_block( b.header(), &to_spent_txs(b.txs()), + b.faults(), Label::Attested(3), )?; Ok(()) @@ -1234,6 +1345,7 @@ mod tests { ut.store_block( b.header(), &to_spent_txs(b.txs()), + b.faults(), Label::Final(3), )?; Ok(()) diff --git a/rusk/tests/common/state.rs b/rusk/tests/common/state.rs index 3491ee9f6d..0ac06a4567 100644 --- a/rusk/tests/common/state.rs +++ b/rusk/tests/common/state.rs @@ -132,6 +132,7 @@ pub fn generator_procedure( ..Default::default() }, txs, + vec![], ) .expect("valid block");