Skip to content

Commit

Permalink
node: Request blocks by height in stall_chain_fsm
Browse files Browse the repository at this point in the history
  • Loading branch information
Goshawk committed Sep 6, 2024
1 parent ee90625 commit eccb4e3
Showing 1 changed file with 45 additions and 13 deletions.
58 changes: 45 additions & 13 deletions node/src/chain/stall_chain_fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@

use node_data::{
ledger::{to_str, Block},
message::{payload::GetBlocks, Message},
message::{
payload::{GetBlocks, Inv},
Message,
},
};
use std::{
collections::BTreeMap,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use tracing::{error, info, trace, warn};

use crate::{
database::{self, Ledger},
Expand Down Expand Up @@ -83,6 +86,13 @@ impl<DB: database::DB, N: Network, VM: VMExecution> StalledChainFSM<DB, N, VM> {
///
/// Returns the new state of the FSM after processing the block
pub(crate) async fn on_block_received(&mut self, blk: &Block) -> State {
trace!(
event = "chain.block_received",
hash = to_str(&blk.header().hash),
height = blk.header().height,
iter = blk.header().iteration,
);

let tip = self.acc.read().await.get_curr_tip().await;

if self.tip.0.header().hash != tip.inner().header().hash {
Expand Down Expand Up @@ -139,18 +149,37 @@ impl<DB: database::DB, N: Network, VM: VMExecution> StalledChainFSM<DB, N, VM> {
.entry(key)
.or_insert_with(|| new_blk.clone());

info!(
event = "chain.recovery_block_added",
hash = to_str(&new_blk.header().hash),
height = new_blk.header().height,
iter = new_blk.header().iteration,
);

// Ensure all blocks from local_final until current_tip are
// collected
let from = self.latest_finalized.header().height;
let from = self.latest_finalized.header().height + 1;
let to = self.tip.0.header().height + 1;
for height in from..to {
if !self.recovery_blocks.contains_key(&height) {
info!(event = "chain.missing_recovery_block", height);
return Ok(()); // wait for more blocks
}
}

info!(
event = "chain.all_recovery_block_collected",
hash = to_str(&new_blk.header().hash),
height = new_blk.header().height,
iter = new_blk.header().iteration,
);

// Detect if collected blocks are valid
for (_, blk) in self.recovery_blocks.iter() {
if blk.header().height > self.tip.0.header().height {
continue;
}

let db: Arc<RwLock<DB>> = self.acc.read().await.db.clone();

let exists = db
Expand Down Expand Up @@ -201,7 +230,9 @@ impl<DB: database::DB, N: Network, VM: VMExecution> StalledChainFSM<DB, N, VM> {
error!(
event = "recovery failed",
local_hash = to_str(&local_blk.header().hash),
remote_hash = to_str(&blk.header().hash),
local_height = local_blk.header().height,
remote_hash = to_str(&main_branch_blk.header().hash),
remote_height = main_branch_blk.header().height,
err = format!("verification err: {:?}", err)
);
} else {
Expand All @@ -217,17 +248,18 @@ impl<DB: database::DB, N: Network, VM: VMExecution> StalledChainFSM<DB, N, VM> {
///
/// Request missing blocks since last finalized block
async fn on_accept_block_timeout(&mut self) {
// Request missing blocks since my last finalized block
let get_blocks = Message::new_get_blocks(GetBlocks {
locator: self.latest_finalized.header().hash,
});
let from = self.latest_finalized.header().height + 1;
let to = self.tip.0.header().height + 1;

info!(event = "chain.requesting_blocks", from, to,);

let mut inv = Inv::new(0);
for height in from..to {
inv.add_block_from_height(height);
}

let network = self.acc.read().await.network.clone();
if let Err(e) = network
.read()
.await
.send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR)
.await
if let Err(e) = network.read().await.flood_request(&inv, None, 8).await
{
warn!("Unable to request GetBlocks {e}");
return;
Expand Down

0 comments on commit eccb4e3

Please sign in to comment.