From 9cae72f25492c90c4e4054f316cdf99c1aa3a4af Mon Sep 17 00:00:00 2001 From: Goshawk Date: Tue, 27 Aug 2024 16:48:11 +0300 Subject: [PATCH 01/12] node: Implement a FSM for stalled_chain states --- node/src/chain/stall_chain_fsm.rs | 121 ++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 node/src/chain/stall_chain_fsm.rs diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs new file mode 100644 index 0000000000..5dbe3b98b7 --- /dev/null +++ b/node/src/chain/stall_chain_fsm.rs @@ -0,0 +1,121 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Copyright (c) DUSK NETWORK. All rights reserved. + +use node_data::ledger::Block; +use std::collections::BTreeMap; + +const CONSECUTIVE_BLOCKS_THRESHOLD: usize = 5; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub(crate) enum State { + /// Blocks are being accepted + Running, + /// No block has been accepted recently + /// + /// A chain progress may be stalled for any reasons listed below: + /// - `disconnected` Node is not receiving any messages from the network + /// - `open_consensus` Network is struggling to produce a block for many + /// iterations + /// - `higher_iteration_branch` Network has moved forward with block of a + /// higher iteration + Stalled, + /// Node is disconnected from the main branch + StalledOnFork, +} + +/// Implements a simple FSM to detect a stalled state of the chain +pub(crate) struct StalledChainFSM { + state: State, + recovery_blocks: BTreeMap, + + latest_finalized: Option, + tip: Option, +} + +impl StalledChainFSM { + pub(crate) fn new() -> Self { + Self { + state: State::Running, + recovery_blocks: BTreeMap::new(), + latest_finalized: None, + tip: None, + } + } + + pub(crate) fn on_block_received(&mut self, blk: &Block) -> State { + let curr = self.state; + match curr { + State::Stalled => self.on_stalled(blk), + State::StalledOnFork | State::Running => curr, + } + } + + /// Returns recovery blocks as a vector + pub(crate) fn recovery_blocks(&self) -> Vec { + self.recovery_blocks.values().cloned().collect() + } + + /// Handles block from wire in the `Stalled` state + pub(crate) fn on_stalled(&mut self, blk: &Block) -> State { + let key = blk.header().height; + self.recovery_blocks + .entry(key) + .or_insert_with(|| blk.clone()); + + if self.recovery_blocks.len() < CONSECUTIVE_BLOCKS_THRESHOLD { + return State::Stalled; + } + + // Check recovery blocks contains at most N consecutive blocks + let mut prev = self + .latest_finalized + .as_ref() + .map(|b| b.header().height) + .unwrap_or(0); // TODO: + + let consecutive = self.recovery_blocks.keys().all(|&key| { + let is_consecutive = key == prev + 1; + prev = key; + is_consecutive + }); + + if !consecutive { + // Not enough consecutive blocks collected yet + return State::Stalled; + } + + // Detect if collected blocks are valid + if self + .recovery_blocks + .iter() + .all(|(_, blk)| self.dry_run_accept_block(blk)) + { + State::StalledOnFork + } else { + State::Stalled + } + } + + pub(crate) fn on_block_accepted(&mut self, blk: &Block, is_final: bool) { + self.state = State::Running; + self.recovery_blocks.clear(); + + if is_final { + self.latest_finalized = Some(blk.clone()); + } + + self.tip = Some(blk.clone()); + } + + pub(crate) fn on_accept_block_timeout(&mut self) { + self.state = State::Stalled; + } + + pub(crate) fn dry_run_accept_block(&self, _blk: &Block) -> bool { + // TODO: Implement dry-run accept block + false + } +} From ff069636658a81eb7d555aa1d2b7cfb24db0c025 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Tue, 27 Aug 2024 16:49:12 +0300 Subject: [PATCH 02/12] node: Intergrate stalled chain FSM --- node/src/chain.rs | 1 + node/src/chain/fsm.rs | 41 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index b6c7b4c56f..a7938149d1 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -12,6 +12,7 @@ mod genesis; mod header_validation; mod metrics; +mod stall_chain_fsm; use self::acceptor::Acceptor; use self::fsm::SimpleFSM; diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 0d86668390..4c996db587 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -5,6 +5,7 @@ // Copyright (c) DUSK NETWORK. All rights reserved. use super::acceptor::{Acceptor, RevertTarget}; +use super::stall_chain_fsm::{self, StalledChainFSM}; use crate::chain::fallback; use crate::database; use crate::{vm, Network}; @@ -80,6 +81,8 @@ pub(crate) struct SimpleFSM { /// Attestations cached from received Quorum messages attestations_cache: HashMap<[u8; 32], (Attestation, Instant)>, + + stalled_chain_fsm: StalledChainFSM, } impl SimpleFSM { @@ -99,6 +102,7 @@ impl SimpleFSM { network, blacklisted_blocks, attestations_cache: Default::default(), + stalled_chain_fsm: StalledChainFSM::new(), } } @@ -118,6 +122,8 @@ impl SimpleFSM { // Clear up all blacklisted blocks self.blacklisted_blocks.write().await.clear(); + self.stalled_chain_fsm.on_accept_block_timeout(); + // Request missing blocks since my last finalized block let get_blocks = Message::new_get_blocks(GetBlocks { locator: last_finalized.header().hash, @@ -158,6 +164,32 @@ impl SimpleFSM { ) -> anyhow::Result> { let block_hash = &blk.header().hash; + if let stall_chain_fsm::State::StalledOnFork = + self.stalled_chain_fsm.on_block_received(&blk) + { + info!( + event = "stalled on fork", + hash = to_str(&blk.header().hash), + height = blk.header().height, + iter = blk.header().iteration, + ); + + let mut acc = self.acc.write().await; + match acc.try_revert(RevertTarget::LastFinalizedState).await { + Ok(_) => { + counter!("dusk_revert_count").increment(1); + + for blk in self.stalled_chain_fsm.recovery_blocks() { + acc.try_accept_block(&blk, true).await?; + } + } + Err(e) => { + error!(event = "revert failed", err = format!("{:?}", e)); + return Ok(None); + } + } + } + // Filter out blocks that have already been marked as // blacklisted upon successful fallback execution. if self.blacklisted_blocks.read().await.contains(block_hash) { @@ -211,13 +243,19 @@ impl SimpleFSM { } } } - // FIXME: The block should return only if accepted. The current issue is // that the impl of State::on_block_event doesn't return always the // accepted block, so we can't rely on them // // Due to this issue, we reset the outer timeout even if we are not // accepting the received block + + // TODO: Ensure block returned is the one that was accepted + let is_final = false; // TODO: + if let Some(blk) = blk.as_ref() { + self.stalled_chain_fsm.on_block_accepted(blk, is_final); + } + Ok(blk) } @@ -467,7 +505,6 @@ impl InSyncImpl { } // Ensure that the block height is higher than the last finalized - // TODO: Retrieve the block from memory if remote_height <= acc.get_latest_final_block().await?.header().height { From 5c584fc01b31b7061552ec5ae4aa5cad38149f6e Mon Sep 17 00:00:00 2001 From: Goshawk Date: Wed, 4 Sep 2024 16:52:08 +0300 Subject: [PATCH 03/12] node: Detect an occurrence of stalling on a non-main fork --- node/src/chain/stall_chain_fsm.rs | 228 +++++++++++++++++++++++------- 1 file changed, 175 insertions(+), 53 deletions(-) diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs index 5dbe3b98b7..8736d8c3c2 100644 --- a/node/src/chain/stall_chain_fsm.rs +++ b/node/src/chain/stall_chain_fsm.rs @@ -4,10 +4,30 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -use node_data::ledger::Block; -use std::collections::BTreeMap; +use node_data::{ + ledger::{to_str, Block}, + message::{payload::GetBlocks, Message}, +}; +use std::{ + collections::BTreeMap, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; +use tokio::sync::RwLock; +use tracing::{error, warn}; + +use crate::{ + database::{self, Ledger}, + vm::VMExecution, + Network, +}; + +use super::acceptor::Acceptor; + +use super::fsm::REDUNDANCY_PEER_FACTOR; const CONSECUTIVE_BLOCKS_THRESHOLD: usize = 5; +const STALLED_TIMEOUT: u64 = 30; // seconds #[derive(Debug, Clone, Copy, PartialEq)] pub(crate) enum State { @@ -15,42 +35,72 @@ pub(crate) enum State { Running, /// No block has been accepted recently /// - /// A chain progress may be stalled for any reasons listed below: - /// - `disconnected` Node is not receiving any messages from the network - /// - `open_consensus` Network is struggling to produce a block for many - /// iterations - /// - `higher_iteration_branch` Network has moved forward with block of a - /// higher iteration + /// Node might be stuck on non-main branch and might need to recover Stalled, /// Node is disconnected from the main branch StalledOnFork, } /// Implements a simple FSM to detect a stalled state of the chain -pub(crate) struct StalledChainFSM { +pub(crate) struct StalledChainFSM +{ + acc: Arc>>, + state: State, recovery_blocks: BTreeMap, - latest_finalized: Option, - tip: Option, + /// Latest finalized block + latest_finalized: Block, + + /// Tip of the chain with timestamp + tip: (Block, u64), } -impl StalledChainFSM { - pub(crate) fn new() -> Self { - Self { +impl StalledChainFSM { + pub(crate) fn new( + acc: Arc>>, + latest_finalized: Block, + tip: Block, + ) -> Self { + let mut sm = Self { state: State::Running, recovery_blocks: BTreeMap::new(), - latest_finalized: None, - tip: None, - } + tip: Default::default(), + latest_finalized, + acc, + }; + + sm.update_tip(tip); + sm } - pub(crate) fn on_block_received(&mut self, blk: &Block) -> State { - let curr = self.state; - match curr { - State::Stalled => self.on_stalled(blk), - State::StalledOnFork | State::Running => curr, + /// Handles block received event + /// + /// Returns the new state of the FSM after processing the block + pub(crate) async fn on_block_received(&mut self, blk: &Block) -> State { + let tip = self.acc.read().await.get_curr_tip().await; + + if self.tip.0.header().hash != tip.inner().header().hash { + // Tip has changed, which means a new block is accepted either due + // to normal block acceptance or fallback execution + self.recovery_blocks.clear(); + + if tip.is_final() { + self.latest_finalized = tip.inner().clone(); + } + + self.update_tip(tip.inner().clone()); + self.state = State::Running; } + + let curr = self.state; + self.state = match curr { + State::Running => self.on_running().await, + State::Stalled => self.on_stalled(blk).await, + State::StalledOnFork => curr, + }; + + self.state } /// Returns recovery blocks as a vector @@ -58,64 +108,136 @@ impl StalledChainFSM { self.recovery_blocks.values().cloned().collect() } + /// Handles a running state + async fn on_running(&self) -> State { + if self.tip.1 + STALLED_TIMEOUT + < SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + { + // While we are still receiving blocks, no block + // has been accepted for a long time (tip has not changed + // recently) + return self.on_accept_block_timeout().await; + } + + State::Running + } + /// Handles block from wire in the `Stalled` state - pub(crate) fn on_stalled(&mut self, blk: &Block) -> State { - let key = blk.header().height; + async fn on_stalled(&mut self, new_blk: &Block) -> State { + let key = new_blk.header().height; self.recovery_blocks .entry(key) - .or_insert_with(|| blk.clone()); + .or_insert_with(|| new_blk.clone()); if self.recovery_blocks.len() < CONSECUTIVE_BLOCKS_THRESHOLD { + // Not enough consecutive blocks collected yet return State::Stalled; } // Check recovery blocks contains at most N consecutive blocks - let mut prev = self - .latest_finalized - .as_ref() - .map(|b| b.header().height) - .unwrap_or(0); // TODO: + let mut prev = self.latest_finalized.header().height; let consecutive = self.recovery_blocks.keys().all(|&key| { - let is_consecutive = key == prev + 1; + let is_consecutive: bool = key == prev + 1; prev = key; is_consecutive }); if !consecutive { - // Not enough consecutive blocks collected yet + // recovery blocks are missing return State::Stalled; } + let db = &self.acc.read().await.db; + // Detect if collected blocks are valid - if self - .recovery_blocks - .iter() - .all(|(_, blk)| self.dry_run_accept_block(blk)) - { - State::StalledOnFork - } else { - State::Stalled + for (_, blk) in self.recovery_blocks.iter() { + let exists = db + .read() + .await + .view(|t| t.get_block_exists(&blk.header().hash)) + .unwrap(); // TODO: + + if !exists { + // Block already exists in ledger + continue; + } + + let local_blk = db + .read() + .await + .view(|t| t.fetch_block_header(&blk.header().prev_block_hash)) + .unwrap(); + + if local_blk.is_none() { + // Block is invalid + error!( + event = "revert failed", + hash = to_str(&blk.header().hash), + err = format!("could not find prev block") + ); + + return State::Stalled; + } + + // If we are here, most probably this is a block from the main + // branch + match self + .acc + .read() + .await + .verify_header_against_local( + local_blk.as_ref().unwrap(), + blk.header(), + ) + .await + { + Ok(_) => return State::StalledOnFork, + Err(err) => { + // Block is invalid + error!( + event = "revert failed", // TODO: + hash = to_str(&blk.header().hash), + err = format!("{:?}", err) + ); + } + } } + + State::Stalled } - pub(crate) fn on_block_accepted(&mut self, blk: &Block, is_final: bool) { - self.state = State::Running; - self.recovery_blocks.clear(); + /// Handles block acceptance timeout + /// + /// Request missing blocks since last finalized block + async fn on_accept_block_timeout(&self) -> State { + // Request missing blocks since my last finalized block + let get_blocks = Message::new_get_blocks(GetBlocks { + locator: self.latest_finalized.header().hash, + }); - if is_final { - self.latest_finalized = Some(blk.clone()); + let network = &self.acc.read().await.network; + if let Err(e) = network + .read() + .await + .send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR) + .await + { + warn!("Unable to request GetBlocks {e}"); + return State::Running; } - self.tip = Some(blk.clone()); - } - - pub(crate) fn on_accept_block_timeout(&mut self) { - self.state = State::Stalled; + State::Stalled } - pub(crate) fn dry_run_accept_block(&self, _blk: &Block) -> bool { - // TODO: Implement dry-run accept block - false + fn update_tip(&mut self, tip: Block) { + self.tip.0 = tip; + self.tip.1 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); } } From 9abdcb6a99b8b0cf0cb8fb1f06da7b5aa35e8ab7 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Wed, 4 Sep 2024 16:53:10 +0300 Subject: [PATCH 04/12] node: Move Fallback::verify_header to Acceptor::verify_header_against_local --- node/src/chain/acceptor.rs | 51 ++++++++++++++++++++++- node/src/chain/fallback.rs | 55 ++---------------------- node/src/chain/fsm.rs | 85 +++++++++++++++++++++----------------- 3 files changed, 100 insertions(+), 91 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 0f9244a658..1c5fc4ea9d 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -60,7 +60,7 @@ pub(crate) enum RevertTarget { /// Acceptor also manages the initialization and lifespan of Consensus task. pub(crate) struct Acceptor { /// The tip - tip: RwLock, + pub(crate) tip: RwLock, /// Provisioners needed to verify next block pub(crate) provisioners_list: RwLock, @@ -70,7 +70,7 @@ pub(crate) struct Acceptor { pub(crate) db: Arc>, pub(crate) vm: Arc>, - network: Arc>, + pub(crate) network: Arc>, event_sender: Sender, } @@ -952,6 +952,10 @@ impl Acceptor { self.tip.read().await.inner().header().iteration } + pub(crate) async fn get_curr_tip(&self) -> BlockWithLabel { + self.tip.read().await.clone() + } + pub(crate) async fn get_result_chan( &self, ) -> AsyncQueue> { @@ -1056,6 +1060,49 @@ impl Acceptor { histogram!("dusk_block_disk_size").record(block_size_on_disk as f64); } + + /// Verifies if a block with header `local` can be replaced with a block + /// with header `new` + pub(crate) async fn verify_header_against_local( + &self, + local: &ledger::Header, + new: &ledger::Header, + ) -> Result<()> { + let prev_header = self.db.read().await.view(|t| { + let prev_hash = &local.prev_block_hash; + t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!( + "Unable to find block with hash {}", + to_str(prev_hash) + )) + })?; + + let provisioners_list = self + .vm + .read() + .await + .get_provisioners(prev_header.state_hash)?; + + let mut provisioners_list = ContextProvisioners::new(provisioners_list); + + let changed_provisioners = self + .vm + .read() + .await + .get_changed_provisioners(prev_header.state_hash)?; + provisioners_list.apply_changes(changed_provisioners); + + // Ensure header of the new block is valid according to prev_block + // header + let _ = verify_block_header( + self.db.clone(), + &prev_header, + &provisioners_list, + new, + ) + .await?; + + Ok(()) + } } async fn broadcast(network: &Arc>, msg: &Message) { diff --git a/node/src/chain/fallback.rs b/node/src/chain/fallback.rs index 860dc213ec..7de9bec33e 100644 --- a/node/src/chain/fallback.rs +++ b/node/src/chain/fallback.rs @@ -5,14 +5,12 @@ // Copyright (c) DUSK NETWORK. All rights reserved. use anyhow::{anyhow, Result}; -use dusk_consensus::user::provisioners::ContextProvisioners; -use node_data::ledger::{to_str, Header}; +use node_data::ledger::Header; use std::cmp::Ordering; use tracing::info; use crate::{ - chain::acceptor, - database::{self, Ledger}, + database::{self}, vm, Network, }; @@ -42,17 +40,6 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution> local: &Header, remote: &Header, revert_target: RevertTarget, - ) -> Result<()> { - self.verify_header(local, remote).await?; - self.acc.try_revert(revert_target).await - } - - /// Verifies if a block with header `local` can be replaced with a block - /// with header `remote` - async fn verify_header( - &self, - local: &Header, - remote: &Header, ) -> Result<()> { match (local.height, remote.iteration.cmp(&local.iteration)) { (0, _) => Err(anyhow!("cannot fallback over genesis block")), @@ -68,14 +55,6 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution> _ => Ok(()), }?; - let prev_header = self.acc.db.read().await.view(|t| { - let prev_hash = &local.prev_block_hash; - t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!( - "Unable to find block with hash {}", - to_str(prev_hash) - )) - })?; - info!( event = "execute fallback checks", height = local.height, @@ -83,33 +62,7 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution> target_iter = remote.iteration, ); - let provisioners_list = self - .acc - .vm - .read() - .await - .get_provisioners(prev_header.state_hash)?; - - let mut provisioners_list = ContextProvisioners::new(provisioners_list); - - let changed_provisioners = self - .acc - .vm - .read() - .await - .get_changed_provisioners(prev_header.state_hash)?; - provisioners_list.apply_changes(changed_provisioners); - - // Ensure header of the new block is valid according to prev_block - // header - let _ = acceptor::verify_block_header( - self.acc.db.clone(), - &prev_header, - &provisioners_list, - remote, - ) - .await?; - - Ok(()) + self.acc.verify_header_against_local(local, remote).await?; + self.acc.try_revert(revert_target).await } } diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 4c996db587..bca8e7b787 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -82,7 +82,8 @@ pub(crate) struct SimpleFSM { /// Attestations cached from received Quorum messages attestations_cache: HashMap<[u8; 32], (Attestation, Instant)>, - stalled_chain_fsm: StalledChainFSM, + /// State machine to detect a stalled state of the chain + stalled_sm: StalledChainFSM, } impl SimpleFSM { @@ -98,11 +99,15 @@ impl SimpleFSM { network.clone(), blacklisted_blocks.clone(), )), - acc, - network, + acc: acc.clone(), + network: network.clone(), blacklisted_blocks, attestations_cache: Default::default(), - stalled_chain_fsm: StalledChainFSM::new(), + stalled_sm: StalledChainFSM::new( + acc, + Block::default(), + Block::default(), /* TODO: */ + ), } } @@ -122,8 +127,6 @@ impl SimpleFSM { // Clear up all blacklisted blocks self.blacklisted_blocks.write().await.clear(); - self.stalled_chain_fsm.on_accept_block_timeout(); - // Request missing blocks since my last finalized block let get_blocks = Message::new_get_blocks(GetBlocks { locator: last_finalized.header().hash, @@ -164,32 +167,6 @@ impl SimpleFSM { ) -> anyhow::Result> { let block_hash = &blk.header().hash; - if let stall_chain_fsm::State::StalledOnFork = - self.stalled_chain_fsm.on_block_received(&blk) - { - info!( - event = "stalled on fork", - hash = to_str(&blk.header().hash), - height = blk.header().height, - iter = blk.header().iteration, - ); - - let mut acc = self.acc.write().await; - match acc.try_revert(RevertTarget::LastFinalizedState).await { - Ok(_) => { - counter!("dusk_revert_count").increment(1); - - for blk in self.stalled_chain_fsm.recovery_blocks() { - acc.try_accept_block(&blk, true).await?; - } - } - Err(e) => { - error!(event = "revert failed", err = format!("{:?}", e)); - return Ok(None); - } - } - } - // Filter out blocks that have already been marked as // blacklisted upon successful fallback execution. if self.blacklisted_blocks.read().await.contains(block_hash) { @@ -242,6 +219,44 @@ impl SimpleFSM { } } } + + // Process block event in the context of stalled chain FSM + if let stall_chain_fsm::State::StalledOnFork = + self.stalled_sm.on_block_received(&blk).await + { + info!( + event = "stalled on fork", + hash = to_str(&blk.header().hash), + height = blk.header().height, + iter = blk.header().iteration, + ); + + let mut acc = self.acc.write().await; + match acc.try_revert(RevertTarget::LastFinalizedState).await { + Ok(_) => { + counter!("dusk_revert_count").increment(1); + + info!(event = "reverted to last finalized"); + + for blk in self.stalled_sm.recovery_blocks() { + info!( + event = "recovery block", + height = blk.header().height, + hash = to_str(&blk.header().hash), + ); + + acc.try_accept_block(&blk, true).await?; + } + } + Err(e) => { + error!( + event = "revert failed", + err = format!("{:?}", e) + ); + return Ok(None); + } + } + } } // FIXME: The block should return only if accepted. The current issue is // that the impl of State::on_block_event doesn't return always the @@ -250,12 +265,6 @@ impl SimpleFSM { // Due to this issue, we reset the outer timeout even if we are not // accepting the received block - // TODO: Ensure block returned is the one that was accepted - let is_final = false; // TODO: - if let Some(blk) = blk.as_ref() { - self.stalled_chain_fsm.on_block_accepted(blk, is_final); - } - Ok(blk) } From 5d3a072528ddf1ce6380a24ceb35c33db8b0c2c4 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Wed, 4 Sep 2024 17:04:11 +0300 Subject: [PATCH 05/12] node: Initialze properly stalled state-machine --- node/src/chain.rs | 2 +- node/src/chain/fsm.rs | 24 +++++++++++------------- node/src/chain/stall_chain_fsm.rs | 14 ++++++++++---- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index a7938149d1..efb79d4d55 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -111,7 +111,7 @@ impl acc.write().await.spawn_task().await; // Start-up FSM instance - let mut fsm = SimpleFSM::new(acc.clone(), network.clone()); + let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await; let outbound_chan = acc.read().await.get_outbound_chan().await; let result_chan = acc.read().await.get_result_chan().await; diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index bca8e7b787..49fb1767cf 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -87,27 +87,25 @@ pub(crate) struct SimpleFSM { } impl SimpleFSM { - pub fn new( + pub async fn new( acc: Arc>>, network: Arc>, ) -> Self { let blacklisted_blocks = Arc::new(RwLock::new(HashSet::new())); + let stalled_sm = StalledChainFSM::new_with_acc(acc.clone()).await; + let curr = State::InSync(InSyncImpl::::new( + acc.clone(), + network.clone(), + blacklisted_blocks.clone(), + )); Self { - curr: State::InSync(InSyncImpl::::new( - acc.clone(), - network.clone(), - blacklisted_blocks.clone(), - )), - acc: acc.clone(), + curr, + acc, network: network.clone(), blacklisted_blocks, attestations_cache: Default::default(), - stalled_sm: StalledChainFSM::new( - acc, - Block::default(), - Block::default(), /* TODO: */ - ), + stalled_sm, } } @@ -222,7 +220,7 @@ impl SimpleFSM { // Process block event in the context of stalled chain FSM if let stall_chain_fsm::State::StalledOnFork = - self.stalled_sm.on_block_received(&blk).await + self.stalled_sm.on_block_received(blk).await { info!( event = "stalled on fork", diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs index 8736d8c3c2..a0f3d2a07a 100644 --- a/node/src/chain/stall_chain_fsm.rs +++ b/node/src/chain/stall_chain_fsm.rs @@ -57,11 +57,17 @@ pub(crate) struct StalledChainFSM } impl StalledChainFSM { - pub(crate) fn new( + pub(crate) async fn new_with_acc( acc: Arc>>, - latest_finalized: Block, - tip: Block, ) -> Self { + let tip = acc.read().await.get_curr_tip().await; + let latest_finalized = acc + .read() + .await + .get_latest_final_block() + .await + .expect("latest final block should exist"); + let mut sm = Self { state: State::Running, recovery_blocks: BTreeMap::new(), @@ -70,7 +76,7 @@ impl StalledChainFSM { acc, }; - sm.update_tip(tip); + sm.update_tip(tip.inner().clone()); sm } From 11f1167b7c5cb37adea254e14dec9d7b4f3bbcf9 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 5 Sep 2024 11:18:25 +0300 Subject: [PATCH 06/12] node: Remove 'request missing blocks' from on_idle --- node/src/chain/fsm.rs | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 49fb1767cf..3bf5376386 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -124,20 +124,6 @@ impl SimpleFSM { // Clear up all blacklisted blocks self.blacklisted_blocks.write().await.clear(); - - // Request missing blocks since my last finalized block - let get_blocks = Message::new_get_blocks(GetBlocks { - locator: last_finalized.header().hash, - }); - if let Err(e) = self - .network - .read() - .await - .send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR) - .await - { - warn!("Unable to request GetBlocks {e}"); - } } else { error!("could not request blocks"); } @@ -218,17 +204,13 @@ impl SimpleFSM { } } - // Process block event in the context of stalled chain FSM + // Try to detect a stalled chain + // Generally speaking, if a node is receiving future blocks from the + // network but it could not accept a new block for long time, then + // it might be a sign of a getting stalled on non-main branch. if let stall_chain_fsm::State::StalledOnFork = self.stalled_sm.on_block_received(blk).await { - info!( - event = "stalled on fork", - hash = to_str(&blk.header().hash), - height = blk.header().height, - iter = blk.header().iteration, - ); - let mut acc = self.acc.write().await; match acc.try_revert(RevertTarget::LastFinalizedState).await { Ok(_) => { From 133d0586074d8d437d699cefb09c2915a50cb303 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 5 Sep 2024 12:16:19 +0300 Subject: [PATCH 07/12] node: Logs any state transition event --- node/src/chain/stall_chain_fsm.rs | 78 +++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs index a0f3d2a07a..2a85fd18cb 100644 --- a/node/src/chain/stall_chain_fsm.rs +++ b/node/src/chain/stall_chain_fsm.rs @@ -14,7 +14,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use tokio::sync::RwLock; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use crate::{ database::{self, Ledger}, @@ -96,14 +96,15 @@ impl StalledChainFSM { } self.update_tip(tip.inner().clone()); - self.state = State::Running; + + self.state_transition(State::Running); } let curr = self.state; - self.state = match curr { + match curr { State::Running => self.on_running().await, State::Stalled => self.on_stalled(blk).await, - State::StalledOnFork => curr, + State::StalledOnFork => warn!("Stalled on fork"), }; self.state @@ -115,7 +116,7 @@ impl StalledChainFSM { } /// Handles a running state - async fn on_running(&self) -> State { + async fn on_running(&mut self) { if self.tip.1 + STALLED_TIMEOUT < SystemTime::now() .duration_since(UNIX_EPOCH) @@ -125,14 +126,12 @@ impl StalledChainFSM { // While we are still receiving blocks, no block // has been accepted for a long time (tip has not changed // recently) - return self.on_accept_block_timeout().await; + self.on_accept_block_timeout().await } - - State::Running } /// Handles block from wire in the `Stalled` state - async fn on_stalled(&mut self, new_blk: &Block) -> State { + async fn on_stalled(&mut self, new_blk: &Block) { let key = new_blk.header().height; self.recovery_blocks .entry(key) @@ -140,7 +139,7 @@ impl StalledChainFSM { if self.recovery_blocks.len() < CONSECUTIVE_BLOCKS_THRESHOLD { // Not enough consecutive blocks collected yet - return State::Stalled; + return; } // Check recovery blocks contains at most N consecutive blocks @@ -154,13 +153,13 @@ impl StalledChainFSM { if !consecutive { // recovery blocks are missing - return State::Stalled; + return; } - let db = &self.acc.read().await.db; - // Detect if collected blocks are valid for (_, blk) in self.recovery_blocks.iter() { + let db: Arc> = self.acc.read().await.db.clone(); + let exists = db .read() .await @@ -186,12 +185,12 @@ impl StalledChainFSM { err = format!("could not find prev block") ); - return State::Stalled; + return; } // If we are here, most probably this is a block from the main // branch - match self + let res = self .acc .read() .await @@ -199,9 +198,13 @@ impl StalledChainFSM { local_blk.as_ref().unwrap(), blk.header(), ) - .await - { - Ok(_) => return State::StalledOnFork, + .await; + + match res { + Ok(_) => { + self.state_transition(State::StalledOnFork); + return; + } Err(err) => { // Block is invalid error!( @@ -212,20 +215,18 @@ impl StalledChainFSM { } } } - - State::Stalled } /// Handles block acceptance timeout /// /// Request missing blocks since last finalized block - async fn on_accept_block_timeout(&self) -> State { + async fn on_accept_block_timeout(&mut self) { // Request missing blocks since my last finalized block let get_blocks = Message::new_get_blocks(GetBlocks { locator: self.latest_finalized.header().hash, }); - let network = &self.acc.read().await.network; + let network = self.acc.read().await.network.clone(); if let Err(e) = network .read() .await @@ -233,10 +234,10 @@ impl StalledChainFSM { .await { warn!("Unable to request GetBlocks {e}"); - return State::Running; + return; } - State::Stalled + self.state_transition(State::Stalled); } fn update_tip(&mut self, tip: Block) { @@ -246,4 +247,33 @@ impl StalledChainFSM { .unwrap() .as_secs(); } + + /// Changes curr state and logs the transition event + fn state_transition(&mut self, state: State) -> State { + if state == self.state { + return state; + } + + self.state = state; + + let state_str: &str = match state { + State::Running => "running", + State::Stalled => "stalled", + State::StalledOnFork => "stalled_on_fork", + }; + + let hdr = self.tip.0.header(); + info!( + event = format!("chain.{}", state_str), + tip_hash = to_str(&hdr.hash), + tip_height = hdr.height, + tip_iter = hdr.iteration, + tip_updated_at = self.tip.1, + recovery_blocks = self.recovery_blocks.len(), + final_block = to_str(&self.latest_finalized.header().hash), + final_block_height = self.latest_finalized.header().height, + ); + + state + } } From ee90625547046036e439ffd05fa877e268a7e68f Mon Sep 17 00:00:00 2001 From: Goshawk Date: Thu, 5 Sep 2024 16:22:02 +0300 Subject: [PATCH 08/12] node: Ensure all blocks from local_final until current_tip are collected --- node/src/chain/fsm.rs | 2 +- node/src/chain/stall_chain_fsm.rs | 98 +++++++++++++++---------------- 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 3bf5376386..b10b904a4a 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -206,7 +206,7 @@ impl SimpleFSM { // Try to detect a stalled chain // Generally speaking, if a node is receiving future blocks from the - // network but it could not accept a new block for long time, then + // network but it cannot accept a new block for long time, then // it might be a sign of a getting stalled on non-main branch. if let stall_chain_fsm::State::StalledOnFork = self.stalled_sm.on_block_received(blk).await diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs index 2a85fd18cb..5b09fcf4ca 100644 --- a/node/src/chain/stall_chain_fsm.rs +++ b/node/src/chain/stall_chain_fsm.rs @@ -26,8 +26,7 @@ use super::acceptor::Acceptor; use super::fsm::REDUNDANCY_PEER_FACTOR; -const CONSECUTIVE_BLOCKS_THRESHOLD: usize = 5; -const STALLED_TIMEOUT: u64 = 30; // seconds +const STALLED_TIMEOUT: u64 = 60; // seconds #[derive(Debug, Clone, Copy, PartialEq)] pub(crate) enum State { @@ -96,14 +95,17 @@ impl StalledChainFSM { } self.update_tip(tip.inner().clone()); - self.state_transition(State::Running); } let curr = self.state; match curr { State::Running => self.on_running().await, - State::Stalled => self.on_stalled(blk).await, + State::Stalled => { + if let Err(err) = self.on_stalled(blk).await { + error!("Error while processing block: {:?}", err); + } + } State::StalledOnFork => warn!("Stalled on fork"), }; @@ -131,29 +133,20 @@ impl StalledChainFSM { } /// Handles block from wire in the `Stalled` state - async fn on_stalled(&mut self, new_blk: &Block) { + async fn on_stalled(&mut self, new_blk: &Block) -> anyhow::Result<()> { let key = new_blk.header().height; self.recovery_blocks .entry(key) .or_insert_with(|| new_blk.clone()); - if self.recovery_blocks.len() < CONSECUTIVE_BLOCKS_THRESHOLD { - // Not enough consecutive blocks collected yet - return; - } - - // Check recovery blocks contains at most N consecutive blocks - let mut prev = self.latest_finalized.header().height; - - let consecutive = self.recovery_blocks.keys().all(|&key| { - let is_consecutive: bool = key == prev + 1; - prev = key; - is_consecutive - }); - - if !consecutive { - // recovery blocks are missing - return; + // Ensure all blocks from local_final until current_tip are + // collected + let from = self.latest_finalized.header().height; + let to = self.tip.0.header().height + 1; + for height in from..to { + if !self.recovery_blocks.contains_key(&height) { + return Ok(()); // wait for more blocks + } } // Detect if collected blocks are valid @@ -163,10 +156,9 @@ impl StalledChainFSM { let exists = db .read() .await - .view(|t| t.get_block_exists(&blk.header().hash)) - .unwrap(); // TODO: + .view(|t| t.get_block_exists(&blk.header().hash))?; - if !exists { + if exists { // Block already exists in ledger continue; } @@ -174,19 +166,24 @@ impl StalledChainFSM { let local_blk = db .read() .await - .view(|t| t.fetch_block_header(&blk.header().prev_block_hash)) - .unwrap(); + .view(|t| t.fetch_block_by_height(blk.header().height))?; - if local_blk.is_none() { - // Block is invalid - error!( - event = "revert failed", - hash = to_str(&blk.header().hash), - err = format!("could not find prev block") - ); + let local_blk = match local_blk { + Some(blk) => blk, + None => { + error!( + event = "recovery failed", + hash = to_str(&blk.header().hash), + err = format!( + "could not find local block at height {}", + blk.header().height + ) + ); + return Ok(()); + } + }; - return; - } + let main_branch_blk = blk; // If we are here, most probably this is a block from the main // branch @@ -195,26 +192,25 @@ impl StalledChainFSM { .read() .await .verify_header_against_local( - local_blk.as_ref().unwrap(), - blk.header(), + local_blk.header(), + main_branch_blk.header(), ) .await; - match res { - Ok(_) => { - self.state_transition(State::StalledOnFork); - return; - } - Err(err) => { - // Block is invalid - error!( - event = "revert failed", // TODO: - hash = to_str(&blk.header().hash), - err = format!("{:?}", err) - ); - } + if let Err(err) = res { + error!( + event = "recovery failed", + local_hash = to_str(&local_blk.header().hash), + remote_hash = to_str(&blk.header().hash), + err = format!("verification err: {:?}", err) + ); + } else { + self.state_transition(State::StalledOnFork); + return Ok(()); } } + + Ok(()) } /// Handles block acceptance timeout From eccb4e3bca67db0c250fe3c45298dbd307b78e28 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Fri, 6 Sep 2024 14:41:24 +0300 Subject: [PATCH 09/12] node: Request blocks by height in stall_chain_fsm --- node/src/chain/stall_chain_fsm.rs | 58 ++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs index 5b09fcf4ca..95e06c71fc 100644 --- a/node/src/chain/stall_chain_fsm.rs +++ b/node/src/chain/stall_chain_fsm.rs @@ -6,7 +6,10 @@ use node_data::{ ledger::{to_str, Block}, - message::{payload::GetBlocks, Message}, + message::{ + payload::{GetBlocks, Inv}, + Message, + }, }; use std::{ collections::BTreeMap, @@ -14,7 +17,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use tokio::sync::RwLock; -use tracing::{error, info, warn}; +use tracing::{error, info, trace, warn}; use crate::{ database::{self, Ledger}, @@ -83,6 +86,13 @@ impl StalledChainFSM { /// /// Returns the new state of the FSM after processing the block pub(crate) async fn on_block_received(&mut self, blk: &Block) -> State { + trace!( + event = "chain.block_received", + hash = to_str(&blk.header().hash), + height = blk.header().height, + iter = blk.header().iteration, + ); + let tip = self.acc.read().await.get_curr_tip().await; if self.tip.0.header().hash != tip.inner().header().hash { @@ -139,18 +149,37 @@ impl StalledChainFSM { .entry(key) .or_insert_with(|| new_blk.clone()); + info!( + event = "chain.recovery_block_added", + hash = to_str(&new_blk.header().hash), + height = new_blk.header().height, + iter = new_blk.header().iteration, + ); + // Ensure all blocks from local_final until current_tip are // collected - let from = self.latest_finalized.header().height; + let from = self.latest_finalized.header().height + 1; let to = self.tip.0.header().height + 1; for height in from..to { if !self.recovery_blocks.contains_key(&height) { + info!(event = "chain.missing_recovery_block", height); return Ok(()); // wait for more blocks } } + info!( + event = "chain.all_recovery_block_collected", + hash = to_str(&new_blk.header().hash), + height = new_blk.header().height, + iter = new_blk.header().iteration, + ); + // Detect if collected blocks are valid for (_, blk) in self.recovery_blocks.iter() { + if blk.header().height > self.tip.0.header().height { + continue; + } + let db: Arc> = self.acc.read().await.db.clone(); let exists = db @@ -201,7 +230,9 @@ impl StalledChainFSM { error!( event = "recovery failed", local_hash = to_str(&local_blk.header().hash), - remote_hash = to_str(&blk.header().hash), + local_height = local_blk.header().height, + remote_hash = to_str(&main_branch_blk.header().hash), + remote_height = main_branch_blk.header().height, err = format!("verification err: {:?}", err) ); } else { @@ -217,17 +248,18 @@ impl StalledChainFSM { /// /// Request missing blocks since last finalized block async fn on_accept_block_timeout(&mut self) { - // Request missing blocks since my last finalized block - let get_blocks = Message::new_get_blocks(GetBlocks { - locator: self.latest_finalized.header().hash, - }); + let from = self.latest_finalized.header().height + 1; + let to = self.tip.0.header().height + 1; + + info!(event = "chain.requesting_blocks", from, to,); + + let mut inv = Inv::new(0); + for height in from..to { + inv.add_block_from_height(height); + } let network = self.acc.read().await.network.clone(); - if let Err(e) = network - .read() - .await - .send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR) - .await + if let Err(e) = network.read().await.flood_request(&inv, None, 8).await { warn!("Unable to request GetBlocks {e}"); return; From 606795b5a27af9299764f1b42595d52bde76a132 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Fri, 6 Sep 2024 14:47:07 +0300 Subject: [PATCH 10/12] node: Return genesis when it's the only finalized state --- node/src/chain/acceptor.rs | 9 ++++++--- node/src/chain/fsm.rs | 1 - node/src/chain/stall_chain_fsm.rs | 7 +------ 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 1c5fc4ea9d..f60e1e6742 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -821,7 +821,7 @@ impl Acceptor { // The blockchain tip after reverting let (blk, (_, label)) = self.db.read().await.update(|t| { let mut height = curr_height; - while height != 0 { + loop { let b = Ledger::fetch_block_by_height(t, height)? .ok_or_else(|| anyhow::anyhow!("could not fetch block"))?; let h = b.header(); @@ -830,6 +830,11 @@ impl Acceptor { || anyhow::anyhow!("could not fetch block label"), )?; + // If we are at genesis block, we can stop here + if b.header().height == 0 { + return Ok((b, label)); + } + if h.state_hash == target_state_hash { return Ok((b, label)); } @@ -858,8 +863,6 @@ impl Acceptor { height -= 1; } - - Err(anyhow!("not found")) })?; if blk.header().state_hash != target_state_hash { diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index b10b904a4a..036db72a56 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -34,7 +34,6 @@ const DEFAULT_ATT_CACHE_EXPIRY: Duration = Duration::from_secs(60); /// Maximum number of hops between the requester and the node that contains the /// requested resource const DEFAULT_HOPS_LIMIT: u16 = 16; -pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5; type SharedHashSet = Arc>>; diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs index 95e06c71fc..7d910a3688 100644 --- a/node/src/chain/stall_chain_fsm.rs +++ b/node/src/chain/stall_chain_fsm.rs @@ -6,10 +6,7 @@ use node_data::{ ledger::{to_str, Block}, - message::{ - payload::{GetBlocks, Inv}, - Message, - }, + message::payload::Inv, }; use std::{ collections::BTreeMap, @@ -27,8 +24,6 @@ use crate::{ use super::acceptor::Acceptor; -use super::fsm::REDUNDANCY_PEER_FACTOR; - const STALLED_TIMEOUT: u64 = 60; // seconds #[derive(Debug, Clone, Copy, PartialEq)] From 353f8e33881d55a042e65ba86ed80991283b8950 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Tue, 10 Sep 2024 10:19:27 +0300 Subject: [PATCH 11/12] node: Handle a potential race-condition. - Revert to prev_state_hash commit. - Blacklist the blk from non-main branch - Read final block from DB --- node/src/chain/acceptor.rs | 12 +- node/src/chain/fsm.rs | 85 +++++++--- node/src/chain/stall_chain_fsm.rs | 250 ++++++++++++------------------ 3 files changed, 171 insertions(+), 176 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index f60e1e6742..256507c311 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -20,6 +20,7 @@ use node_data::ledger::{ use node_data::message::AsyncQueue; use node_data::message::Payload; +use core::panic; use dusk_consensus::operations::Voter; use execution_core::stake::{Withdraw, STAKE_CONTRACT}; use metrics::{counter, gauge, histogram}; @@ -830,13 +831,13 @@ impl Acceptor { || anyhow::anyhow!("could not fetch block label"), )?; - // If we are at genesis block, we can stop here - if b.header().height == 0 { + if h.state_hash == target_state_hash { return Ok((b, label)); } - if h.state_hash == target_state_hash { - return Ok((b, label)); + // the target_state_hash could not be found + if height == 0 { + panic!("revert to genesis block failed"); } info!( @@ -922,7 +923,8 @@ impl Acceptor { } pub(crate) async fn get_latest_final_block(&self) -> Result { - let tip = self.tip.read().await; + let tip: tokio::sync::RwLockReadGuard<'_, BlockWithLabel> = + self.tip.read().await; if tip.is_final() { return Ok(tip.inner().clone()); } diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 036db72a56..3b0ec8b9a3 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -124,7 +124,7 @@ impl SimpleFSM { // Clear up all blacklisted blocks self.blacklisted_blocks.write().await.clear(); } else { - error!("could not request blocks"); + error!("could not get final block"); } let now = Instant::now(); @@ -207,34 +207,74 @@ impl SimpleFSM { // Generally speaking, if a node is receiving future blocks from the // network but it cannot accept a new block for long time, then // it might be a sign of a getting stalled on non-main branch. - if let stall_chain_fsm::State::StalledOnFork = - self.stalled_sm.on_block_received(blk).await - { - let mut acc = self.acc.write().await; - match acc.try_revert(RevertTarget::LastFinalizedState).await { - Ok(_) => { - counter!("dusk_revert_count").increment(1); - info!(event = "reverted to last finalized"); + let res = self.stalled_sm.on_block_received(blk).await.clone(); + + match res { + stall_chain_fsm::State::StalledOnFork( + local_hash_at_fork, + remote_blk, + ) => { + info!( + event = "stalled on fork", + local_hash = to_str(&local_hash_at_fork), + remote_hash = to_str(&remote_blk.header().hash), + remote_height = remote_blk.header().height, + ); + let mut acc = self.acc.write().await; + + let prev_local_state_root = + acc.db.read().await.view(|t| { + let local_blk = t + .fetch_block_header(&local_hash_at_fork)? + .expect("local hash should exist"); + + let prev_blk = t + .fetch_block_header(&local_blk.prev_block_hash)? + .expect("prev block hash should exist"); + + anyhow::Ok(prev_blk.state_hash) + })?; + + match acc + .try_revert(RevertTarget::Commit(prev_local_state_root)) + .await + { + Ok(_) => { + counter!("dusk_revert_count").increment(1); + info!(event = "reverted to last finalized"); - for blk in self.stalled_sm.recovery_blocks() { info!( event = "recovery block", - height = blk.header().height, - hash = to_str(&blk.header().hash), + height = remote_blk.header().height, + hash = to_str(&remote_blk.header().hash), ); - acc.try_accept_block(&blk, true).await?; + acc.try_accept_block(&remote_blk, true).await?; + + // Black list the block hash to avoid accepting it + // again due to fallback execution + self.blacklisted_blocks + .write() + .await + .insert(local_hash_at_fork); + + // Reset the stalled chain FSM + self.stalled_sm.reset(remote_blk.header().clone()); + } + Err(e) => { + error!( + event = "revert failed", + err = format!("{:?}", e) + ); + return Ok(None); } - } - Err(e) => { - error!( - event = "revert failed", - err = format!("{:?}", e) - ); - return Ok(None); } } + stall_chain_fsm::State::Stalled => { + self.blacklisted_blocks.write().await.clear(); + } + _ => {} } } // FIXME: The block should return only if accepted. The current issue is @@ -277,6 +317,11 @@ impl SimpleFSM { quorum: &payload::Quorum, msg: &Message, ) -> anyhow::Result> { + // Clean up attestation cache + let now = Instant::now(); + self.attestations_cache + .retain(|_, (_, expiry)| *expiry > now); + // FIXME: We should return the whole outcome for this quorum // Basically we need to inform the upper layer if the received quorum is // valid (even if it's a FailedQuorum) diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs index 7d910a3688..8ec3a92cc2 100644 --- a/node/src/chain/stall_chain_fsm.rs +++ b/node/src/chain/stall_chain_fsm.rs @@ -5,14 +5,10 @@ // Copyright (c) DUSK NETWORK. All rights reserved. use node_data::{ - ledger::{to_str, Block}, + ledger::{to_str, Block, Header}, message::payload::Inv, }; -use std::{ - collections::BTreeMap, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::sync::Arc; use tokio::sync::RwLock; use tracing::{error, info, trace, warn}; @@ -26,7 +22,7 @@ use super::acceptor::Acceptor; const STALLED_TIMEOUT: u64 = 60; // seconds -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub(crate) enum State { /// Blocks are being accepted Running, @@ -35,7 +31,7 @@ pub(crate) enum State { /// Node might be stuck on non-main branch and might need to recover Stalled, /// Node is disconnected from the main branch - StalledOnFork, + StalledOnFork([u8; 32], Box), } /// Implements a simple FSM to detect a stalled state of the chain @@ -44,13 +40,9 @@ pub(crate) struct StalledChainFSM acc: Arc>>, state: State, - recovery_blocks: BTreeMap, - - /// Latest finalized block - latest_finalized: Block, /// Tip of the chain with timestamp - tip: (Block, u64), + tip: (Header, u64), } impl StalledChainFSM { @@ -58,29 +50,28 @@ impl StalledChainFSM { acc: Arc>>, ) -> Self { let tip = acc.read().await.get_curr_tip().await; - let latest_finalized = acc - .read() - .await - .get_latest_final_block() - .await - .expect("latest final block should exist"); let mut sm = Self { state: State::Running, - recovery_blocks: BTreeMap::new(), tip: Default::default(), - latest_finalized, acc, }; - sm.update_tip(tip.inner().clone()); + sm.update_tip(tip.inner().header().clone()); sm } + pub(crate) fn reset(&mut self, tip: Header) { + if self.tip.0.hash != tip.hash { + self.update_tip(tip); + self.state_transition(State::Running); + } + } + /// Handles block received event /// /// Returns the new state of the FSM after processing the block - pub(crate) async fn on_block_received(&mut self, blk: &Block) -> State { + pub(crate) async fn on_block_received(&mut self, blk: &Block) -> &State { trace!( event = "chain.block_received", hash = to_str(&blk.header().hash), @@ -88,22 +79,24 @@ impl StalledChainFSM { iter = blk.header().iteration, ); - let tip = self.acc.read().await.get_curr_tip().await; + let tip = self + .acc + .read() + .await + .get_curr_tip() + .await + .inner() + .header() + .clone(); - if self.tip.0.header().hash != tip.inner().header().hash { + if self.tip.0.hash != tip.hash { // Tip has changed, which means a new block is accepted either due // to normal block acceptance or fallback execution - self.recovery_blocks.clear(); - - if tip.is_final() { - self.latest_finalized = tip.inner().clone(); - } - - self.update_tip(tip.inner().clone()); + self.update_tip(tip); self.state_transition(State::Running); } - let curr = self.state; + let curr = &self.state; match curr { State::Running => self.on_running().await, State::Stalled => { @@ -111,25 +104,15 @@ impl StalledChainFSM { error!("Error while processing block: {:?}", err); } } - State::StalledOnFork => warn!("Stalled on fork"), + State::StalledOnFork(_, _) => warn!("Stalled on fork"), }; - self.state - } - - /// Returns recovery blocks as a vector - pub(crate) fn recovery_blocks(&self) -> Vec { - self.recovery_blocks.values().cloned().collect() + &self.state } /// Handles a running state async fn on_running(&mut self) { - if self.tip.1 + STALLED_TIMEOUT - < SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() - { + if self.tip.1 + STALLED_TIMEOUT < node_data::get_current_timestamp() { // While we are still receiving blocks, no block // has been accepted for a long time (tip has not changed // recently) @@ -139,101 +122,57 @@ impl StalledChainFSM { /// Handles block from wire in the `Stalled` state async fn on_stalled(&mut self, new_blk: &Block) -> anyhow::Result<()> { - let key = new_blk.header().height; - self.recovery_blocks - .entry(key) - .or_insert_with(|| new_blk.clone()); - - info!( - event = "chain.recovery_block_added", - hash = to_str(&new_blk.header().hash), - height = new_blk.header().height, - iter = new_blk.header().iteration, - ); - - // Ensure all blocks from local_final until current_tip are - // collected - let from = self.latest_finalized.header().height + 1; - let to = self.tip.0.header().height + 1; - for height in from..to { - if !self.recovery_blocks.contains_key(&height) { - info!(event = "chain.missing_recovery_block", height); - return Ok(()); // wait for more blocks - } + if new_blk.header().height > self.tip.0.height { + // Block is newer than the local tip block + return Ok(()); } - info!( - event = "chain.all_recovery_block_collected", - hash = to_str(&new_blk.header().hash), - height = new_blk.header().height, - iter = new_blk.header().iteration, - ); - - // Detect if collected blocks are valid - for (_, blk) in self.recovery_blocks.iter() { - if blk.header().height > self.tip.0.header().height { - continue; - } + let db: Arc> = self.acc.read().await.db.clone(); + let exists = db + .read() + .await + .view(|t| t.get_block_exists(&new_blk.header().hash))?; - let db: Arc> = self.acc.read().await.db.clone(); + if exists { + // Block already exists in ledger + return Ok(()); + } - let exists = db - .read() - .await - .view(|t| t.get_block_exists(&blk.header().hash))?; + let local_blk = db + .read() + .await + .view(|t| t.fetch_block_by_height(new_blk.header().height))? + .expect("local block should exist"); - if exists { - // Block already exists in ledger - continue; - } + let remote_blk = new_blk; - let local_blk = db - .read() - .await - .view(|t| t.fetch_block_by_height(blk.header().height))?; - - let local_blk = match local_blk { - Some(blk) => blk, - None => { - error!( - event = "recovery failed", - hash = to_str(&blk.header().hash), - err = format!( - "could not find local block at height {}", - blk.header().height - ) - ); - return Ok(()); - } - }; - - let main_branch_blk = blk; - - // If we are here, most probably this is a block from the main - // branch - let res = self - .acc - .read() - .await - .verify_header_against_local( - local_blk.header(), - main_branch_blk.header(), - ) - .await; - - if let Err(err) = res { - error!( - event = "recovery failed", - local_hash = to_str(&local_blk.header().hash), - local_height = local_blk.header().height, - remote_hash = to_str(&main_branch_blk.header().hash), - remote_height = main_branch_blk.header().height, - err = format!("verification err: {:?}", err) - ); - } else { - self.state_transition(State::StalledOnFork); - return Ok(()); - } + // If we are here, this might be a block from the main + // branch + let res = self + .acc + .read() + .await + .verify_header_against_local( + local_blk.header(), + remote_blk.header(), + ) + .await; + + if let Err(err) = res { + error!( + event = "recovery failed", + local_hash = to_str(&local_blk.header().hash), + local_height = local_blk.header().height, + remote_hash = to_str(&remote_blk.header().hash), + remote_height = remote_blk.header().height, + err = format!("verification err: {:?}", err) + ); + } else { + self.state_transition(State::StalledOnFork( + local_blk.header().hash, + Box::new(remote_blk.clone()), + )); + return Ok(()); } Ok(()) @@ -243,8 +182,9 @@ impl StalledChainFSM { /// /// Request missing blocks since last finalized block async fn on_accept_block_timeout(&mut self) { - let from = self.latest_finalized.header().height + 1; - let to = self.tip.0.header().height + 1; + let (_, last_final_block) = self.last_final_block().await; + let from = last_final_block + 1; + let to = self.tip.0.height + 1; info!(event = "chain.requesting_blocks", from, to,); @@ -263,40 +203,48 @@ impl StalledChainFSM { self.state_transition(State::Stalled); } - fn update_tip(&mut self, tip: Block) { + fn update_tip(&mut self, tip: Header) { self.tip.0 = tip; - self.tip.1 = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + self.tip.1 = node_data::get_current_timestamp(); } /// Changes curr state and logs the transition event - fn state_transition(&mut self, state: State) -> State { + fn state_transition(&mut self, state: State) -> &State { if state == self.state { - return state; + return &self.state; } self.state = state; - let state_str: &str = match state { + let state_str: &str = match &self.state { State::Running => "running", State::Stalled => "stalled", - State::StalledOnFork => "stalled_on_fork", + State::StalledOnFork(_, _) => "stalled_on_fork", }; - let hdr = self.tip.0.header(); + let hdr = &self.tip.0; info!( event = format!("chain.{}", state_str), tip_hash = to_str(&hdr.hash), tip_height = hdr.height, tip_iter = hdr.iteration, tip_updated_at = self.tip.1, - recovery_blocks = self.recovery_blocks.len(), - final_block = to_str(&self.latest_finalized.header().hash), - final_block_height = self.latest_finalized.header().height, ); - state + &self.state + } + + async fn last_final_block(&self) -> ([u8; 32], u64) { + let hdr = self + .acc + .read() + .await + .get_latest_final_block() + .await + .unwrap() + .header() + .clone(); + + (hdr.hash, hdr.height) } } From a2d047de265fb238a2de8d941d8b9128bf1a3767 Mon Sep 17 00:00:00 2001 From: Goshawk Date: Tue, 10 Sep 2024 13:18:23 +0300 Subject: [PATCH 12/12] node: Deprecate ACCEPT_BLOCK_TIMEOUT feature due to stalled_fsm impl --- node/src/chain.rs | 32 +++----------------------------- node/src/chain/acceptor.rs | 4 ---- node/src/chain/fsm.rs | 24 ------------------------ 3 files changed, 3 insertions(+), 57 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index efb79d4d55..9e3e068b23 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -45,7 +45,6 @@ const TOPICS: &[u8] = &[ Topics::Quorum as u8, ]; -const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20); const HEARTBEAT_SEC: Duration = Duration::from_secs(1); pub struct ChainSrv { @@ -116,9 +115,6 @@ impl let outbound_chan = acc.read().await.get_outbound_chan().await; let result_chan = acc.read().await.get_result_chan().await; - // Accept_Block timeout is activated when a node is unable to accept a - // valid block within a specified time frame. - let mut timeout = Self::next_timeout(); let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap(); // Message loop for Chain context @@ -157,10 +153,7 @@ impl // By disabling block broadcast, a block may be received from a peer // only after explicit request (on demand). match fsm.on_block_event(*blk, msg.metadata).await { - Ok(None) => {} - Ok(Some(_)) => { - timeout = Self::next_timeout(); - } + Ok(_) => {} Err(err) => { error!(event = "fsm::on_event failed", src = "wire", err = ?err); } @@ -182,11 +175,7 @@ impl warn!("msg discarded: {e}"); } match fsm.on_quorum_msg(payload, &msg).await { - Ok(None) => {} - Ok(Some(_)) => { - // block accepted, timeout reset - timeout = Self::next_timeout(); - } + Ok(_) => {} Err(err) => { warn!(event = "quorum msg", ?err); } @@ -204,11 +193,7 @@ impl // the winner block will be compiled and redirected to the Acceptor. if let Payload::Quorum(quorum) = &msg.payload { match fsm.on_quorum_msg(quorum, &msg).await { - Ok(None) => {} - Ok(Some(_)) => { - // block accepted, timeout reset - timeout = Self::next_timeout(); - } + Ok(_) => {} Err(err) => { warn!(event = "handle quorum msg from internal consensus failed", ?err); } @@ -218,11 +203,6 @@ impl if let Err(e) = network.read().await.broadcast(&msg).await { warn!("Unable to re-route message {e}"); } - }, - // Handles accept_block_timeout event - _ = sleep_until(timeout) => { - fsm.on_idle(ACCEPT_BLOCK_TIMEOUT_SEC).await; - timeout = Self::next_timeout(); }, // Handles heartbeat event _ = sleep_until(heartbeat) => { @@ -316,10 +296,4 @@ impl ChainSrv { Ok(block) } - - fn next_timeout() -> Instant { - Instant::now() - .checked_add(ACCEPT_BLOCK_TIMEOUT_SEC) - .unwrap() - } } diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 256507c311..20959a7c17 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -953,10 +953,6 @@ impl Acceptor { Ok(final_block) } - pub(crate) async fn get_curr_iteration(&self) -> u8 { - self.tip.read().await.inner().header().iteration - } - pub(crate) async fn get_curr_tip(&self) -> BlockWithLabel { self.tip.read().await.clone() } diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 3b0ec8b9a3..492aa621b3 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -108,30 +108,6 @@ impl SimpleFSM { } } - pub async fn on_idle(&mut self, timeout: Duration) { - let acc = self.acc.read().await; - let tip_height = acc.get_curr_height().await; - let iter = acc.get_curr_iteration().await; - if let Ok(last_finalized) = acc.get_latest_final_block().await { - info!( - event = "fsm::idle", - tip_height, - iter, - timeout_sec = timeout.as_secs(), - "finalized_height" = last_finalized.header().height, - ); - - // Clear up all blacklisted blocks - self.blacklisted_blocks.write().await.clear(); - } else { - error!("could not get final block"); - } - - let now = Instant::now(); - self.attestations_cache - .retain(|_, (_, expiry)| *expiry > now); - } - pub async fn on_failed_consensus(&mut self) { self.acc.write().await.restart_consensus().await; }