Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: improve fetch_faults operation #1982

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions node-data/src/ledger/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ impl Block {
&self.faults
}

pub fn into_faults(self) -> Vec<Fault> {
self.faults
}

pub fn set_attestation(&mut self, att: Attestation) {
self.header.att = att;
}
Expand Down
5 changes: 2 additions & 3 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
"Cannot get header for last finalized block hash {}",
to_str(&lfb_hash)
))?
.0
.state_hash;

// A block is considered stable when is either Confirmed or Attested
Expand Down Expand Up @@ -709,7 +708,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {

let state_hash = db
.fetch_block_header(&hash)?
.map(|(h, _)| h.state_hash)
.map(|h| h.state_hash)
.ok_or(anyhow!(
"Cannot get header for hash {}",
to_str(&hash)
Expand Down Expand Up @@ -974,7 +973,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
.view(|t| {
let res = t
.fetch_block_header(&header.prev_block_hash)?
.map(|(prev, _)| prev.seed);
.map(|prev| prev.seed);

anyhow::Ok::<Option<Seed>>(res)
})?
Expand Down
10 changes: 4 additions & 6 deletions node/src/chain/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution>

let prev_header = self.acc.db.read().await.view(|t| {
let prev_hash = &local.prev_block_hash;
t.fetch_block_header(prev_hash)?
.map(|(header, _)| header)
.ok_or(anyhow::anyhow!(
"Unable to find block with hash {}",
to_str(prev_hash)
))
t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!(
"Unable to find block with hash {}",
to_str(prev_hash)
))
})?;

info!(
Expand Down
6 changes: 3 additions & 3 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
//
// Then we fallback to N_B.PrevBlock and accept N_B
let result = acc.db.read().await.view(|t| {
if let Some((prev_header, _)) =
if let Some(prev_header) =
t.fetch_block_header(&remote_blk.header().prev_block_hash)?
{
let local_height = prev_header.height + 1;
Expand Down Expand Up @@ -555,9 +555,9 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
.fetch_block_header(
&remote_blk.header().prev_block_hash,
)?
.map(|(prev_header, _)| prev_header.state_hash);
.map(|prev| prev.state_hash);

anyhow::Ok::<Option<[u8; 32]>>(res)
anyhow::Ok(res)
})?
.ok_or_else(|| {
anyhow::anyhow!("could not retrieve state_hash")
Expand Down
4 changes: 2 additions & 2 deletions node/src/chain/header_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub async fn verify_faults<DB: database::DB>(
db.read()
.await
.view(|db| {
let (prev_header, _) = db
let prev_header = db
.fetch_block_header(&fault_header.prev_block_hash)?
.ok_or(anyhow::anyhow!("Slashing a non accepted header"))?;
if prev_header.height != fault_header.round - 1 {
Expand All @@ -308,7 +308,7 @@ pub async fn verify_faults<DB: database::DB>(
// id directly This needs the fault id to be
// changed into "HEIGHT|TYPE|PROV_KEY"
let stored_faults =
db.fetch_faults(fault_header.round - EPOCH)?;
db.fetch_faults_by_block(fault_header.round - EPOCH)?;
if stored_faults.iter().any(|other| f.same(other)) {
anyhow::bail!("Double fault detected");
}
Expand Down
20 changes: 13 additions & 7 deletions node/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ use std::path::Path;
pub mod rocksdb;

use anyhow::Result;
use node_data::ledger::{self, Fault};
use node_data::ledger::{Label, SpentTransaction};
use node_data::ledger::{self, Fault, Label, SpentTransaction};
use serde::{Deserialize, Serialize};

pub struct LightBlock {
pub header: ledger::Header,
pub transactions_ids: Vec<[u8; 32]>,
pub faults_ids: Vec<[u8; 32]>,
}

pub trait DB: Send + Sync + 'static {
type P<'a>: Persist;

Expand Down Expand Up @@ -57,10 +62,10 @@ pub trait Ledger {
) -> Result<usize>;

fn delete_block(&self, b: &ledger::Block) -> Result<()>;
fn fetch_block_header(
&self,
hash: &[u8],
) -> Result<Option<(ledger::Header, Vec<[u8; 32]>)>>;
fn fetch_block_header(&self, hash: &[u8])
-> Result<Option<ledger::Header>>;

fn fetch_light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>>;

fn fetch_block(&self, hash: &[u8]) -> Result<Option<ledger::Block>>;
fn fetch_block_hash_by_height(
Expand Down Expand Up @@ -93,7 +98,8 @@ pub trait Ledger {
label: Label,
) -> Result<()>;

fn fetch_faults(&self, start_height: u64) -> Result<Vec<Fault>>;
fn fetch_faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>>;
fn fetch_faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>>;
}

pub trait Candidate {
Expand Down
74 changes: 49 additions & 25 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use super::{Candidate, DatabaseOptions, Ledger, Metadata, Persist, DB};
use super::{
Candidate, DatabaseOptions, Ledger, LightBlock, Metadata, Persist, DB,
};
use anyhow::Result;
use std::cell::RefCell;

use node_data::ledger::{self, Fault, Label, SpentTransaction};
use node_data::ledger::{self, Fault, Header, Label, SpentTransaction};
use node_data::Serializable;

use crate::database::Mempool;
Expand Down Expand Up @@ -289,7 +291,7 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
let cf = self.ledger_cf;

let mut buf = vec![];
HeaderRecord {
LightBlock {
header: header.clone(),
transactions_ids: txs
.iter()
Expand Down Expand Up @@ -335,23 +337,22 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
Ok(self.get_size())
}

fn fetch_faults(&self, start_height: u64) -> Result<Vec<Fault>> {
fn fetch_faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
let mut faults = vec![];
let mut hash = self
.op_read(MD_HASH_KEY)?
.ok_or(anyhow::anyhow!("Cannot read tip"))?;

loop {
let block = self.fetch_block(&hash)?.ok_or(anyhow::anyhow!(
"Cannot read block {}",
hex::encode(&hash)
))?;
let block = self.fetch_light_block(&hash)?.ok_or(
anyhow::anyhow!("Cannot read block {}", hex::encode(&hash)),
)?;

let block_height = block.header().height;
let block_height = block.header.height;

if block_height >= start_height {
hash = block.header().prev_block_hash.to_vec();
faults.extend(block.into_faults());
hash = block.header.prev_block_hash.to_vec();
faults.extend(self.fetch_faults(&block.faults_ids)?);
} else {
break;
}
Expand Down Expand Up @@ -400,10 +401,32 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
Ok(self.snapshot.get_cf(self.ledger_cf, hash)?.is_some())
}

fn fetch_faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
if faults_ids.is_empty() {
return Ok(vec![]);
}
let ids = faults_ids
.iter()
.map(|id| (self.ledger_faults_cf, id))
.collect::<Vec<_>>();

// Retrieve all faults ID with single call
let faults_buffer = self.snapshot.multi_get_cf(ids);

let mut faults = vec![];
for buf in faults_buffer {
let buf = buf?.unwrap();
let fault = ledger::Fault::read(&mut &buf.to_vec()[..])?;
faults.push(fault);
}

Ok(faults)
}

fn fetch_block(&self, hash: &[u8]) -> Result<Option<ledger::Block>> {
match self.snapshot.get_cf(self.ledger_cf, hash)? {
Some(blob) => {
let record = HeaderRecord::read(&mut &blob[..])?;
let record = LightBlock::read(&mut &blob[..])?;

// Retrieve all transactions buffers with single call
let txs_buffers = self.snapshot.multi_get_cf(
Expand Down Expand Up @@ -446,14 +469,21 @@ impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
}
}

fn fetch_block_header(
&self,
hash: &[u8],
) -> Result<Option<(ledger::Header, Vec<[u8; 32]>)>> {
fn fetch_light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
match self.snapshot.get_cf(self.ledger_cf, hash)? {
Some(blob) => {
let record = HeaderRecord::read(&mut &blob[..])?;
Ok(Some((record.header, record.transactions_ids)))
let record = LightBlock::read(&mut &blob[..])?;
Ok(Some(record))
}
None => Ok(None),
}
}

fn fetch_block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
match self.snapshot.get_cf(self.ledger_cf, hash)? {
Some(blob) => {
let record = Header::read(&mut &blob[..])?;
Ok(Some(record))
}
None => Ok(None),
}
Expand Down Expand Up @@ -909,13 +939,7 @@ fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
Ok((value, hash))
}

struct HeaderRecord {
header: ledger::Header,
transactions_ids: Vec<[u8; 32]>,
faults_ids: Vec<[u8; 32]>,
}

impl node_data::Serializable for HeaderRecord {
impl node_data::Serializable for LightBlock {
fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
// Write block header
self.header.write(w)?;
Expand Down
23 changes: 11 additions & 12 deletions rusk/src/lib/http/chain/graphql/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ pub async fn last_block(ctx: &Context<'_>) -> FieldResult<Block> {
let hash = t.op_read(MD_HASH_KEY)?;
match hash {
None => Ok(None),
Some(hash) => t.fetch_block_header(&hash),
Some(hash) => t.fetch_light_block(&hash),
}
})?;

block
.map(|(header, txs_id)| Block::new(header, txs_id))
.map(Block::from)
.ok_or_else(|| FieldError::new("Cannot find last block"))
}

Expand All @@ -49,8 +49,8 @@ pub async fn block_by_hash(
) -> OptResult<Block> {
let db = ctx.data::<DBContext>()?;
let hash = hex::decode(hash)?;
let block = db.read().await.view(|t| t.fetch_block_header(&hash))?;
Ok(block.map(|(header, txs_id)| Block::new(header, txs_id)))
let header = db.read().await.view(|t| t.fetch_light_block(&hash))?;
Ok(header.map(Block::from))
}

pub async fn last_blocks(
Expand All @@ -67,11 +67,11 @@ pub async fn last_blocks(
let mut blocks = vec![last_block];
let mut count = count - 1;
while (count > 0) {
match t.fetch_block_header(&hash_to_search)? {
match t.fetch_light_block(&hash_to_search)? {
None => break,
Some((header, txs_id)) => {
hash_to_search = header.prev_block_hash;
blocks.push(Block::new(header, txs_id));
Some(h) => {
hash_to_search = h.header.prev_block_hash;
blocks.push(Block::from(h));
count -= 1;
}
}
Expand All @@ -95,10 +95,9 @@ pub async fn blocks_range(
hash_to_search = t.fetch_block_hash_by_height(height)?;
}
if let Some(hash) = hash_to_search {
let (header, txs_id) =
t.fetch_block_header(&hash)?.expect("Block to be found");
hash_to_search = header.prev_block_hash.into();
blocks.push(Block::new(header, txs_id))
let h = t.fetch_light_block(&hash)?.expect("Block to be found");
hash_to_search = h.header.prev_block_hash.into();
blocks.push(Block::from(h))
}
}
Ok::<_, anyhow::Error>(blocks)
Expand Down
18 changes: 10 additions & 8 deletions rusk/src/lib/http/chain/graphql/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@
use std::ops::Deref;

use async_graphql::{FieldError, FieldResult, Object, SimpleObject};
use node::database::{Ledger, DB};
use node::database::{Ledger, LightBlock, DB};

pub struct Block {
header: node_data::ledger::Header,
txs_id: Vec<[u8; 32]>,
}

impl Block {
pub fn new(
header: node_data::ledger::Header,
txs_id: Vec<[u8; 32]>,
) -> Self {
Self { header, txs_id }
impl From<LightBlock> for Block {
fn from(value: LightBlock) -> Self {
Self {
header: value.header,
txs_id: value.transactions_ids,
}
}
}

impl Block {
pub fn header(&self) -> &node_data::ledger::Header {
&self.header
}
Expand Down Expand Up @@ -206,7 +208,7 @@ impl SpentTransaction {
let db = ctx.data::<super::DBContext>()?.read().await;
let block_height = self.0.block_height;

let (header, _) = db.view(|t| {
let header = db.view(|t| {
let block_hash =
t.fetch_block_hash_by_height(block_height)?.ok_or_else(
|| FieldError::new("Cannot find block hash by height"),
Expand Down
8 changes: 4 additions & 4 deletions rusk/src/lib/http/chain/graphql/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ pub async fn last_transactions(
let mut txs = vec![];
let mut current_block =
t.op_read(MD_HASH_KEY).and_then(|res| match res {
Some(hash) => t.fetch_block_header(&hash),
Some(hash) => t.fetch_light_block(&hash),
None => Ok(None),
})?;

while let Some((header, block_txs)) = current_block {
for txs_id in block_txs {
while let Some(h) = current_block {
for txs_id in h.transactions_ids {
let tx =
t.get_ledger_tx_by_hash(&txs_id)?.ok_or_else(|| {
FieldError::new("Cannot find transaction")
Expand All @@ -48,7 +48,7 @@ pub async fn last_transactions(
return Ok::<_, async_graphql::Error>(txs);
}
}
current_block = t.fetch_block_header(&header.prev_block_hash)?;
current_block = t.fetch_light_block(&h.header.prev_block_hash)?;
}

Ok::<_, async_graphql::Error>(txs)
Expand Down
Loading