Skip to content

Commit

Permalink
node: Ensure all blocks from local_final until current_tip are collected
Browse files Browse the repository at this point in the history
  • Loading branch information
Goshawk committed Sep 5, 2024
1 parent 133d058 commit ee90625
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 52 deletions.
2 changes: 1 addition & 1 deletion node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {

// Try to detect a stalled chain
// Generally speaking, if a node is receiving future blocks from the
// network but it could not accept a new block for long time, then
// network but it cannot accept a new block for long time, then
// it might be a sign of a getting stalled on non-main branch.
if let stall_chain_fsm::State::StalledOnFork =
self.stalled_sm.on_block_received(blk).await
Expand Down
98 changes: 47 additions & 51 deletions node/src/chain/stall_chain_fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use super::acceptor::Acceptor;

use super::fsm::REDUNDANCY_PEER_FACTOR;

const CONSECUTIVE_BLOCKS_THRESHOLD: usize = 5;
const STALLED_TIMEOUT: u64 = 30; // seconds
const STALLED_TIMEOUT: u64 = 60; // seconds

#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum State {
Expand Down Expand Up @@ -96,14 +95,17 @@ impl<DB: database::DB, N: Network, VM: VMExecution> StalledChainFSM<DB, N, VM> {
}

self.update_tip(tip.inner().clone());

self.state_transition(State::Running);
}

let curr = self.state;
match curr {
State::Running => self.on_running().await,
State::Stalled => self.on_stalled(blk).await,
State::Stalled => {
if let Err(err) = self.on_stalled(blk).await {
error!("Error while processing block: {:?}", err);
}
}
State::StalledOnFork => warn!("Stalled on fork"),
};

Expand Down Expand Up @@ -131,29 +133,20 @@ impl<DB: database::DB, N: Network, VM: VMExecution> StalledChainFSM<DB, N, VM> {
}

/// Handles block from wire in the `Stalled` state
async fn on_stalled(&mut self, new_blk: &Block) {
async fn on_stalled(&mut self, new_blk: &Block) -> anyhow::Result<()> {
let key = new_blk.header().height;
self.recovery_blocks
.entry(key)
.or_insert_with(|| new_blk.clone());

if self.recovery_blocks.len() < CONSECUTIVE_BLOCKS_THRESHOLD {
// Not enough consecutive blocks collected yet
return;
}

// Check recovery blocks contains at most N consecutive blocks
let mut prev = self.latest_finalized.header().height;

let consecutive = self.recovery_blocks.keys().all(|&key| {
let is_consecutive: bool = key == prev + 1;
prev = key;
is_consecutive
});

if !consecutive {
// recovery blocks are missing
return;
// Ensure all blocks from local_final until current_tip are
// collected
let from = self.latest_finalized.header().height;
let to = self.tip.0.header().height + 1;
for height in from..to {
if !self.recovery_blocks.contains_key(&height) {
return Ok(()); // wait for more blocks
}
}

// Detect if collected blocks are valid
Expand All @@ -163,30 +156,34 @@ impl<DB: database::DB, N: Network, VM: VMExecution> StalledChainFSM<DB, N, VM> {
let exists = db
.read()
.await
.view(|t| t.get_block_exists(&blk.header().hash))
.unwrap(); // TODO:
.view(|t| t.get_block_exists(&blk.header().hash))?;

if !exists {
if exists {
// Block already exists in ledger
continue;
}

let local_blk = db
.read()
.await
.view(|t| t.fetch_block_header(&blk.header().prev_block_hash))
.unwrap();
.view(|t| t.fetch_block_by_height(blk.header().height))?;

if local_blk.is_none() {
// Block is invalid
error!(
event = "revert failed",
hash = to_str(&blk.header().hash),
err = format!("could not find prev block")
);
let local_blk = match local_blk {
Some(blk) => blk,
None => {
error!(
event = "recovery failed",
hash = to_str(&blk.header().hash),
err = format!(
"could not find local block at height {}",
blk.header().height
)
);
return Ok(());
}
};

return;
}
let main_branch_blk = blk;

// If we are here, most probably this is a block from the main
// branch
Expand All @@ -195,26 +192,25 @@ impl<DB: database::DB, N: Network, VM: VMExecution> StalledChainFSM<DB, N, VM> {
.read()
.await
.verify_header_against_local(
local_blk.as_ref().unwrap(),
blk.header(),
local_blk.header(),
main_branch_blk.header(),
)
.await;

match res {
Ok(_) => {
self.state_transition(State::StalledOnFork);
return;
}
Err(err) => {
// Block is invalid
error!(
event = "revert failed", // TODO:
hash = to_str(&blk.header().hash),
err = format!("{:?}", err)
);
}
if let Err(err) = res {
error!(
event = "recovery failed",
local_hash = to_str(&local_blk.header().hash),
remote_hash = to_str(&blk.header().hash),
err = format!("verification err: {:?}", err)
);
} else {
self.state_transition(State::StalledOnFork);
return Ok(());
}
}

Ok(())
}

/// Handles block acceptance timeout
Expand Down

0 comments on commit ee90625

Please sign in to comment.