Skip to content

Commit

Permalink
node: include faults into block
Browse files Browse the repository at this point in the history
- Add `faults` to `Block`
- Change `DB::store_block` to save faults in a separa CF
- Add `DB::fetch_faults` to retrieve latest faults
  • Loading branch information
herr-seppia committed Jul 4, 2024
1 parent 877b996 commit b9855d5
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 5 deletions.
7 changes: 6 additions & 1 deletion node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,12 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
let genesis_blk = genesis::generate_state(state);
db.write().await.update(|t| {
// Persist genesis block
t.store_block(genesis_blk.header(), &[], Label::Final(0))
t.store_block(
genesis_blk.header(),
&[],
&[],
Label::Final(0),
)
})?;

BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
Expand Down
3 changes: 2 additions & 1 deletion node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,8 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {

let label = rolling_results.0;
// Store block with updated transactions with Error and GasSpent
block_size_on_disk = db.store_block(header, &txs, label)?;
block_size_on_disk =
db.store_block(header, &txs, blk.faults(), label)?;

Ok((txs, rolling_results))
})?;
Expand Down
1 change: 1 addition & 0 deletions node/src/chain/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) fn generate_state(state_hash: [u8; 32]) -> Block {
..Default::default()
},
vec![],
vec![],
)
.expect("block should be valid")
}
5 changes: 4 additions & 1 deletion node/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::path::Path;
pub mod rocksdb;

use anyhow::Result;
use node_data::ledger;
use node_data::ledger::{self, Fault};
use node_data::ledger::{Label, SpentTransaction};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -52,6 +52,7 @@ pub trait Ledger {
&self,
header: &ledger::Header,
txs: &[SpentTransaction],
faults: &[Fault],
label: Label,
) -> Result<usize>;

Expand Down Expand Up @@ -91,6 +92,8 @@ pub trait Ledger {
hash: &[u8; 32],
label: Label,
) -> Result<()>;

fn fetch_faults(&self, start_height: u64) -> Result<Vec<Fault>>;
}

pub trait Candidate {
Expand Down
116 changes: 114 additions & 2 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{Candidate, DatabaseOptions, Ledger, Metadata, Persist, DB};
use anyhow::Result;
use std::cell::RefCell;

use node_data::ledger::{self, Label, SpentTransaction};
use node_data::ledger::{self, Fault, Label, SpentTransaction};
use node_data::Serializable;

use crate::database::Mempool;
Expand All @@ -32,6 +32,7 @@ use tracing::info;

const CF_LEDGER_HEADER: &str = "cf_ledger_header";
const CF_LEDGER_TXS: &str = "cf_ledger_txs";
const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
const CF_CANDIDATES: &str = "cf_candidates";
const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
Expand Down Expand Up @@ -73,6 +74,11 @@ impl Backend {
.cf_handle(CF_LEDGER_TXS)
.expect("CF_LEDGER_TXS column family must exist");

let ledger_faults_cf = self
.rocksdb
.cf_handle(CF_LEDGER_FAULTS)
.expect("CF_LEDGER_FAULTS column family must exist");

let candidates_cf = self
.rocksdb
.cf_handle(CF_CANDIDATES)
Expand Down Expand Up @@ -116,6 +122,7 @@ impl Backend {
candidates_height_cf,
ledger_cf,
ledger_txs_cf,
ledger_faults_cf,
mempool_cf,
nullifiers_cf,
fees_cf,
Expand Down Expand Up @@ -179,6 +186,10 @@ impl DB for Backend {
blocks_cf_opts.clone(),
),
ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
ColumnFamilyDescriptor::new(
CF_LEDGER_FAULTS,
blocks_cf_opts.clone(),
),
ColumnFamilyDescriptor::new(
CF_LEDGER_HEIGHT,
blocks_cf_opts.clone(),
Expand Down Expand Up @@ -249,6 +260,7 @@ pub struct DBTransaction<'db, DB: DBAccess> {

// Ledger column families
ledger_cf: &'db ColumnFamily,
ledger_faults_cf: &'db ColumnFamily,
ledger_txs_cf: &'db ColumnFamily,
ledger_height_cf: &'db ColumnFamily,

Expand All @@ -267,6 +279,7 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
&self,
header: &ledger::Header,
txs: &[SpentTransaction],
faults: &[Fault],
label: Label,
) -> Result<usize> {
// COLUMN FAMILY: CF_LEDGER_HEADER
Expand All @@ -282,6 +295,8 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
.iter()
.map(|t| t.inner.id())
.collect::<Vec<[u8; 32]>>(),

faults_ids: faults.iter().map(|f| f.hash()).collect::<Vec<_>>(),
}
.write(&mut buf)?;

Expand All @@ -303,11 +318,51 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
self.put_cf(cf, tx.inner.id(), d)?;
}
}

// COLUMN FAMILY: CF_LEDGER_FAULTS
{
let cf = self.ledger_faults_cf;

// store all block faults
for f in faults {
let mut d = vec![];
f.write(&mut d)?;
self.put_cf(cf, f.hash(), d)?;
}
}
self.store_block_label(header.height, &header.hash, label)?;

Ok(self.get_size())
}

fn fetch_faults(&self, start_height: u64) -> Result<Vec<Fault>> {
let mut faults = vec![];
let mut hash = self
.op_read(MD_HASH_KEY)?
.ok_or(anyhow::anyhow!("Cannot read tip"))?;

loop {
let block = self.fetch_block(&hash)?.ok_or(anyhow::anyhow!(
"Cannot read block {}",
hex::encode(&hash)
))?;

let block_height = block.header().height;

if block_height >= start_height {
hash = block.header().prev_block_hash.to_vec();
faults.extend(block.into_faults());
} else {
break;
}

if block_height == 0 {
break;
}
}
Ok(faults)
}

fn store_block_label(
&self,
height: u64,
Expand All @@ -332,6 +387,9 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
for tx in b.txs() {
self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
}
for f in b.faults() {
self.inner.delete_cf(self.ledger_faults_cf, f.hash())?;
}

self.inner.delete_cf(self.ledger_cf, b.header().hash)?;

Expand Down Expand Up @@ -364,8 +422,23 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
txs.push(tx.inner);
}

// Retrieve all faults ID with single call
let faults_buffer = self.snapshot.multi_get_cf(
record
.faults_ids
.iter()
.map(|id| (self.ledger_faults_cf, id))
.collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
);
let mut faults = vec![];
for buf in faults_buffer {
let buf = buf?.unwrap();
let fault = ledger::Fault::read(&mut &buf.to_vec()[..])?;
faults.push(fault);
}

Ok(Some(
ledger::Block::new(record.header, txs)
ledger::Block::new(record.header, txs, faults)
.expect("block should be valid"),
))
}
Expand Down Expand Up @@ -839,6 +912,7 @@ fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
struct HeaderRecord {
header: ledger::Header,
transactions_ids: Vec<[u8; 32]>,
faults_ids: Vec<[u8; 32]>,
}

impl node_data::Serializable for HeaderRecord {
Expand All @@ -855,6 +929,15 @@ impl node_data::Serializable for HeaderRecord {
w.write_all(tx_id)?;
}

// Write faults count
let len = self.faults_ids.len() as u32;
w.write_all(&len.to_le_bytes())?;

// Write faults hashes
for f_id in &self.faults_ids {
w.write_all(f_id)?;
}

Ok(())
}

Expand All @@ -877,9 +960,22 @@ impl node_data::Serializable for HeaderRecord {
transactions_ids.push(tx_id);
}

// Read faults count
let len = Self::read_u32_le(r)?;

// Read faults hashes
let mut faults_ids = vec![];
for _ in 0..len {
let mut f_id = [0u8; 32];
r.read_exact(&mut f_id[..])?;

faults_ids.push(f_id);
}

Ok(Self {
header,
transactions_ids,
faults_ids,
})
}
}
Expand Down Expand Up @@ -909,6 +1005,7 @@ mod tests {
txn.store_block(
b.header(),
&to_spent_txs(b.txs()),
b.faults(),
Label::Final(3),
)?;
Ok(())
Expand All @@ -928,6 +1025,15 @@ mod tests {
for pos in 0..b.txs().len() {
assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
}

// Assert all faults are fully fetched from ledger as
// well.
for pos in 0..b.faults().len() {
assert_eq!(
db_blk.faults()[pos].hash(),
b.faults()[pos].hash()
);
}
});

assert!(db
Expand Down Expand Up @@ -956,6 +1062,7 @@ mod tests {
txn.store_block(
b.header(),
&to_spent_txs(b.txs()),
b.faults(),
Label::Final(3),
)
.expect("block to be stored");
Expand Down Expand Up @@ -985,6 +1092,7 @@ mod tests {
txn.store_block(
b.header(),
&to_spent_txs(b.txs()),
b.faults(),
Label::Final(3),
)
.unwrap();
Expand Down Expand Up @@ -1138,6 +1246,7 @@ mod tests {
txn.store_block(
b.header(),
&to_spent_txs(b.txs()),
b.faults(),
Label::Final(3),
)?;
Ok(())
Expand Down Expand Up @@ -1172,6 +1281,7 @@ mod tests {
txn.store_block(
b.header(),
&to_spent_txs(b.txs()),
b.faults(),
Label::Attested(3),
)?;
Ok(())
Expand Down Expand Up @@ -1202,6 +1312,7 @@ mod tests {
txn.store_block(
b.header(),
&to_spent_txs(b.txs()),
b.faults(),
Label::Attested(3),
)?;
Ok(())
Expand Down Expand Up @@ -1234,6 +1345,7 @@ mod tests {
ut.store_block(
b.header(),
&to_spent_txs(b.txs()),
b.faults(),
Label::Final(3),
)?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions rusk/tests/common/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub fn generator_procedure(
..Default::default()
},
txs,
vec![],
)
.expect("valid block");

Expand Down

0 comments on commit b9855d5

Please sign in to comment.