diff --git a/node/src/chain.rs b/node/src/chain.rs index b6c7b4c56f..9e3e068b23 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -12,6 +12,7 @@ mod genesis; mod header_validation; mod metrics; +mod stall_chain_fsm; use self::acceptor::Acceptor; use self::fsm::SimpleFSM; @@ -44,7 +45,6 @@ const TOPICS: &[u8] = &[ Topics::Quorum as u8, ]; -const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20); const HEARTBEAT_SEC: Duration = Duration::from_secs(1); pub struct ChainSrv { @@ -110,14 +110,11 @@ impl acc.write().await.spawn_task().await; // Start-up FSM instance - let mut fsm = SimpleFSM::new(acc.clone(), network.clone()); + let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await; let outbound_chan = acc.read().await.get_outbound_chan().await; let result_chan = acc.read().await.get_result_chan().await; - // Accept_Block timeout is activated when a node is unable to accept a - // valid block within a specified time frame. - let mut timeout = Self::next_timeout(); let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap(); // Message loop for Chain context @@ -156,10 +153,7 @@ impl // By disabling block broadcast, a block may be received from a peer // only after explicit request (on demand). match fsm.on_block_event(*blk, msg.metadata).await { - Ok(None) => {} - Ok(Some(_)) => { - timeout = Self::next_timeout(); - } + Ok(_) => {} Err(err) => { error!(event = "fsm::on_event failed", src = "wire", err = ?err); } @@ -181,11 +175,7 @@ impl warn!("msg discarded: {e}"); } match fsm.on_quorum_msg(payload, &msg).await { - Ok(None) => {} - Ok(Some(_)) => { - // block accepted, timeout reset - timeout = Self::next_timeout(); - } + Ok(_) => {} Err(err) => { warn!(event = "quorum msg", ?err); } @@ -203,11 +193,7 @@ impl // the winner block will be compiled and redirected to the Acceptor. if let Payload::Quorum(quorum) = &msg.payload { match fsm.on_quorum_msg(quorum, &msg).await { - Ok(None) => {} - Ok(Some(_)) => { - // block accepted, timeout reset - timeout = Self::next_timeout(); - } + Ok(_) => {} Err(err) => { warn!(event = "handle quorum msg from internal consensus failed", ?err); } @@ -217,11 +203,6 @@ impl if let Err(e) = network.read().await.broadcast(&msg).await { warn!("Unable to re-route message {e}"); } - }, - // Handles accept_block_timeout event - _ = sleep_until(timeout) => { - fsm.on_idle(ACCEPT_BLOCK_TIMEOUT_SEC).await; - timeout = Self::next_timeout(); }, // Handles heartbeat event _ = sleep_until(heartbeat) => { @@ -315,10 +296,4 @@ impl ChainSrv { Ok(block) } - - fn next_timeout() -> Instant { - Instant::now() - .checked_add(ACCEPT_BLOCK_TIMEOUT_SEC) - .unwrap() - } } diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 33c6f2863c..558d919d05 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -20,6 +20,7 @@ use node_data::ledger::{ use node_data::message::AsyncQueue; use node_data::message::Payload; +use core::panic; use dusk_consensus::operations::Voter; use execution_core::stake::{Withdraw, STAKE_CONTRACT}; use metrics::{counter, gauge, histogram}; @@ -60,7 +61,7 @@ pub(crate) enum RevertTarget { /// Acceptor also manages the initialization and lifespan of Consensus task. pub(crate) struct Acceptor { /// The tip - tip: RwLock, + pub(crate) tip: RwLock, /// Provisioners needed to verify next block pub(crate) provisioners_list: RwLock, @@ -70,7 +71,7 @@ pub(crate) struct Acceptor { pub(crate) db: Arc>, pub(crate) vm: Arc>, - network: Arc>, + pub(crate) network: Arc>, event_sender: Sender, } @@ -833,7 +834,7 @@ impl Acceptor { // The blockchain tip after reverting let (blk, (_, label)) = self.db.read().await.update(|t| { let mut height = curr_height; - while height != 0 { + loop { let b = Ledger::fetch_block_by_height(t, height)? .ok_or_else(|| anyhow::anyhow!("could not fetch block"))?; let h = b.header(); @@ -846,6 +847,11 @@ impl Acceptor { return Ok((b, label)); } + // the target_state_hash could not be found + if height == 0 { + panic!("revert to genesis block failed"); + } + info!( event = "block deleted", height = h.height, @@ -870,8 +876,6 @@ impl Acceptor { height -= 1; } - - Err(anyhow!("not found")) })?; if blk.header().state_hash != target_state_hash { @@ -931,7 +935,8 @@ impl Acceptor { } pub(crate) async fn get_latest_final_block(&self) -> Result { - let tip = self.tip.read().await; + let tip: tokio::sync::RwLockReadGuard<'_, BlockWithLabel> = + self.tip.read().await; if tip.is_final() { return Ok(tip.inner().clone()); } @@ -960,8 +965,8 @@ impl Acceptor { Ok(final_block) } - pub(crate) async fn get_curr_iteration(&self) -> u8 { - self.tip.read().await.inner().header().iteration + pub(crate) async fn get_curr_tip(&self) -> BlockWithLabel { + self.tip.read().await.clone() } pub(crate) async fn get_result_chan( @@ -1068,6 +1073,49 @@ impl Acceptor { histogram!("dusk_block_disk_size").record(block_size_on_disk as f64); } + + /// Verifies if a block with header `local` can be replaced with a block + /// with header `new` + pub(crate) async fn verify_header_against_local( + &self, + local: &ledger::Header, + new: &ledger::Header, + ) -> Result<()> { + let prev_header = self.db.read().await.view(|t| { + let prev_hash = &local.prev_block_hash; + t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!( + "Unable to find block with hash {}", + to_str(prev_hash) + )) + })?; + + let provisioners_list = self + .vm + .read() + .await + .get_provisioners(prev_header.state_hash)?; + + let mut provisioners_list = ContextProvisioners::new(provisioners_list); + + let changed_provisioners = self + .vm + .read() + .await + .get_changed_provisioners(prev_header.state_hash)?; + provisioners_list.apply_changes(changed_provisioners); + + // Ensure header of the new block is valid according to prev_block + // header + let _ = verify_block_header( + self.db.clone(), + &prev_header, + &provisioners_list, + new, + ) + .await?; + + Ok(()) + } } async fn broadcast(network: &Arc>, msg: &Message) { diff --git a/node/src/chain/fallback.rs b/node/src/chain/fallback.rs index 860dc213ec..7de9bec33e 100644 --- a/node/src/chain/fallback.rs +++ b/node/src/chain/fallback.rs @@ -5,14 +5,12 @@ // Copyright (c) DUSK NETWORK. All rights reserved. use anyhow::{anyhow, Result}; -use dusk_consensus::user::provisioners::ContextProvisioners; -use node_data::ledger::{to_str, Header}; +use node_data::ledger::Header; use std::cmp::Ordering; use tracing::info; use crate::{ - chain::acceptor, - database::{self, Ledger}, + database::{self}, vm, Network, }; @@ -42,17 +40,6 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution> local: &Header, remote: &Header, revert_target: RevertTarget, - ) -> Result<()> { - self.verify_header(local, remote).await?; - self.acc.try_revert(revert_target).await - } - - /// Verifies if a block with header `local` can be replaced with a block - /// with header `remote` - async fn verify_header( - &self, - local: &Header, - remote: &Header, ) -> Result<()> { match (local.height, remote.iteration.cmp(&local.iteration)) { (0, _) => Err(anyhow!("cannot fallback over genesis block")), @@ -68,14 +55,6 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution> _ => Ok(()), }?; - let prev_header = self.acc.db.read().await.view(|t| { - let prev_hash = &local.prev_block_hash; - t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!( - "Unable to find block with hash {}", - to_str(prev_hash) - )) - })?; - info!( event = "execute fallback checks", height = local.height, @@ -83,33 +62,7 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution> target_iter = remote.iteration, ); - let provisioners_list = self - .acc - .vm - .read() - .await - .get_provisioners(prev_header.state_hash)?; - - let mut provisioners_list = ContextProvisioners::new(provisioners_list); - - let changed_provisioners = self - .acc - .vm - .read() - .await - .get_changed_provisioners(prev_header.state_hash)?; - provisioners_list.apply_changes(changed_provisioners); - - // Ensure header of the new block is valid according to prev_block - // header - let _ = acceptor::verify_block_header( - self.acc.db.clone(), - &prev_header, - &provisioners_list, - remote, - ) - .await?; - - Ok(()) + self.acc.verify_header_against_local(local, remote).await?; + self.acc.try_revert(revert_target).await } } diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 20eef7c2cd..d6bde5861c 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -5,6 +5,7 @@ // Copyright (c) DUSK NETWORK. All rights reserved. use super::acceptor::{Acceptor, RevertTarget}; +use super::stall_chain_fsm::{self, StalledChainFSM}; use crate::chain::fallback; use crate::database; use crate::{vm, Network}; @@ -33,7 +34,6 @@ const DEFAULT_ATT_CACHE_EXPIRY: Duration = Duration::from_secs(60); /// Maximum number of hops between the requester and the node that contains the /// requested resource const DEFAULT_HOPS_LIMIT: u16 = 16; -pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5; type SharedHashSet = Arc>>; @@ -80,65 +80,34 @@ pub(crate) struct SimpleFSM { /// Attestations cached from received Quorum messages attestations_cache: HashMap<[u8; 32], (Attestation, Instant)>, + + /// State machine to detect a stalled state of the chain + stalled_sm: StalledChainFSM, } impl SimpleFSM { - pub fn new( + pub async fn new( acc: Arc>>, network: Arc>, ) -> Self { let blacklisted_blocks = Arc::new(RwLock::new(HashSet::new())); + let stalled_sm = StalledChainFSM::new_with_acc(acc.clone()).await; + let curr = State::InSync(InSyncImpl::::new( + acc.clone(), + network.clone(), + blacklisted_blocks.clone(), + )); Self { - curr: State::InSync(InSyncImpl::::new( - acc.clone(), - network.clone(), - blacklisted_blocks.clone(), - )), + curr, acc, - network, + network: network.clone(), blacklisted_blocks, attestations_cache: Default::default(), + stalled_sm, } } - pub async fn on_idle(&mut self, timeout: Duration) { - let acc = self.acc.read().await; - let tip_height = acc.get_curr_height().await; - let iter = acc.get_curr_iteration().await; - if let Ok(last_finalized) = acc.get_latest_final_block().await { - info!( - event = "fsm::idle", - tip_height, - iter, - timeout_sec = timeout.as_secs(), - "finalized_height" = last_finalized.header().height, - ); - - // Clear up all blacklisted blocks - self.blacklisted_blocks.write().await.clear(); - - // Request missing blocks since my last finalized block - let locator = last_finalized.header().hash; - let get_blocks = GetBlocks::new(locator).into(); - if let Err(e) = self - .network - .read() - .await - .send_to_alive_peers(get_blocks, REDUNDANCY_PEER_FACTOR) - .await - { - warn!("Unable to request GetBlocks {e}"); - } - } else { - error!("could not request blocks"); - } - - let now = Instant::now(); - self.attestations_cache - .retain(|_, (_, expiry)| *expiry > now); - } - pub async fn on_failed_consensus(&mut self) { self.acc.write().await.restart_consensus().await; } @@ -209,14 +178,88 @@ impl SimpleFSM { } } } - } + // Try to detect a stalled chain + // Generally speaking, if a node is receiving future blocks from the + // network but it cannot accept a new block for long time, then + // it might be a sign of a getting stalled on non-main branch. + + let res = self.stalled_sm.on_block_received(blk).await.clone(); + + match res { + stall_chain_fsm::State::StalledOnFork( + local_hash_at_fork, + remote_blk, + ) => { + info!( + event = "stalled on fork", + local_hash = to_str(&local_hash_at_fork), + remote_hash = to_str(&remote_blk.header().hash), + remote_height = remote_blk.header().height, + ); + let mut acc = self.acc.write().await; + + let prev_local_state_root = + acc.db.read().await.view(|t| { + let local_blk = t + .fetch_block_header(&local_hash_at_fork)? + .expect("local hash should exist"); + + let prev_blk = t + .fetch_block_header(&local_blk.prev_block_hash)? + .expect("prev block hash should exist"); + + anyhow::Ok(prev_blk.state_hash) + })?; + + match acc + .try_revert(RevertTarget::Commit(prev_local_state_root)) + .await + { + Ok(_) => { + counter!("dusk_revert_count").increment(1); + info!(event = "reverted to last finalized"); + + info!( + event = "recovery block", + height = remote_blk.header().height, + hash = to_str(&remote_blk.header().hash), + ); + + acc.try_accept_block(&remote_blk, true).await?; + + // Black list the block hash to avoid accepting it + // again due to fallback execution + self.blacklisted_blocks + .write() + .await + .insert(local_hash_at_fork); + + // Reset the stalled chain FSM + self.stalled_sm.reset(remote_blk.header().clone()); + } + Err(e) => { + error!( + event = "revert failed", + err = format!("{:?}", e) + ); + return Ok(None); + } + } + } + stall_chain_fsm::State::Stalled => { + self.blacklisted_blocks.write().await.clear(); + } + _ => {} + } + } // FIXME: The block should return only if accepted. The current issue is // that the impl of State::on_block_event doesn't return always the // accepted block, so we can't rely on them // // Due to this issue, we reset the outer timeout even if we are not // accepting the received block + Ok(blk) } @@ -250,6 +293,11 @@ impl SimpleFSM { quorum: &payload::Quorum, msg: &Message, ) -> anyhow::Result> { + // Clean up attestation cache + let now = Instant::now(); + self.attestations_cache + .retain(|_, (_, expiry)| *expiry > now); + // FIXME: We should return the whole outcome for this quorum // Basically we need to inform the upper layer if the received quorum is // valid (even if it's a FailedQuorum) @@ -466,7 +514,6 @@ impl InSyncImpl { } // Ensure that the block height is higher than the last finalized - // TODO: Retrieve the block from memory if remote_height <= acc.get_latest_final_block().await?.header().height { diff --git a/node/src/chain/stall_chain_fsm.rs b/node/src/chain/stall_chain_fsm.rs new file mode 100644 index 0000000000..8ec3a92cc2 --- /dev/null +++ b/node/src/chain/stall_chain_fsm.rs @@ -0,0 +1,250 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Copyright (c) DUSK NETWORK. All rights reserved. + +use node_data::{ + ledger::{to_str, Block, Header}, + message::payload::Inv, +}; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{error, info, trace, warn}; + +use crate::{ + database::{self, Ledger}, + vm::VMExecution, + Network, +}; + +use super::acceptor::Acceptor; + +const STALLED_TIMEOUT: u64 = 60; // seconds + +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum State { + /// Blocks are being accepted + Running, + /// No block has been accepted recently + /// + /// Node might be stuck on non-main branch and might need to recover + Stalled, + /// Node is disconnected from the main branch + StalledOnFork([u8; 32], Box), +} + +/// Implements a simple FSM to detect a stalled state of the chain +pub(crate) struct StalledChainFSM +{ + acc: Arc>>, + + state: State, + + /// Tip of the chain with timestamp + tip: (Header, u64), +} + +impl StalledChainFSM { + pub(crate) async fn new_with_acc( + acc: Arc>>, + ) -> Self { + let tip = acc.read().await.get_curr_tip().await; + + let mut sm = Self { + state: State::Running, + tip: Default::default(), + acc, + }; + + sm.update_tip(tip.inner().header().clone()); + sm + } + + pub(crate) fn reset(&mut self, tip: Header) { + if self.tip.0.hash != tip.hash { + self.update_tip(tip); + self.state_transition(State::Running); + } + } + + /// Handles block received event + /// + /// Returns the new state of the FSM after processing the block + pub(crate) async fn on_block_received(&mut self, blk: &Block) -> &State { + trace!( + event = "chain.block_received", + hash = to_str(&blk.header().hash), + height = blk.header().height, + iter = blk.header().iteration, + ); + + let tip = self + .acc + .read() + .await + .get_curr_tip() + .await + .inner() + .header() + .clone(); + + if self.tip.0.hash != tip.hash { + // Tip has changed, which means a new block is accepted either due + // to normal block acceptance or fallback execution + self.update_tip(tip); + self.state_transition(State::Running); + } + + let curr = &self.state; + match curr { + State::Running => self.on_running().await, + State::Stalled => { + if let Err(err) = self.on_stalled(blk).await { + error!("Error while processing block: {:?}", err); + } + } + State::StalledOnFork(_, _) => warn!("Stalled on fork"), + }; + + &self.state + } + + /// Handles a running state + async fn on_running(&mut self) { + if self.tip.1 + STALLED_TIMEOUT < node_data::get_current_timestamp() { + // While we are still receiving blocks, no block + // has been accepted for a long time (tip has not changed + // recently) + self.on_accept_block_timeout().await + } + } + + /// Handles block from wire in the `Stalled` state + async fn on_stalled(&mut self, new_blk: &Block) -> anyhow::Result<()> { + if new_blk.header().height > self.tip.0.height { + // Block is newer than the local tip block + return Ok(()); + } + + let db: Arc> = self.acc.read().await.db.clone(); + let exists = db + .read() + .await + .view(|t| t.get_block_exists(&new_blk.header().hash))?; + + if exists { + // Block already exists in ledger + return Ok(()); + } + + let local_blk = db + .read() + .await + .view(|t| t.fetch_block_by_height(new_blk.header().height))? + .expect("local block should exist"); + + let remote_blk = new_blk; + + // If we are here, this might be a block from the main + // branch + let res = self + .acc + .read() + .await + .verify_header_against_local( + local_blk.header(), + remote_blk.header(), + ) + .await; + + if let Err(err) = res { + error!( + event = "recovery failed", + local_hash = to_str(&local_blk.header().hash), + local_height = local_blk.header().height, + remote_hash = to_str(&remote_blk.header().hash), + remote_height = remote_blk.header().height, + err = format!("verification err: {:?}", err) + ); + } else { + self.state_transition(State::StalledOnFork( + local_blk.header().hash, + Box::new(remote_blk.clone()), + )); + return Ok(()); + } + + Ok(()) + } + + /// Handles block acceptance timeout + /// + /// Request missing blocks since last finalized block + async fn on_accept_block_timeout(&mut self) { + let (_, last_final_block) = self.last_final_block().await; + let from = last_final_block + 1; + let to = self.tip.0.height + 1; + + info!(event = "chain.requesting_blocks", from, to,); + + let mut inv = Inv::new(0); + for height in from..to { + inv.add_block_from_height(height); + } + + let network = self.acc.read().await.network.clone(); + if let Err(e) = network.read().await.flood_request(&inv, None, 8).await + { + warn!("Unable to request GetBlocks {e}"); + return; + } + + self.state_transition(State::Stalled); + } + + fn update_tip(&mut self, tip: Header) { + self.tip.0 = tip; + self.tip.1 = node_data::get_current_timestamp(); + } + + /// Changes curr state and logs the transition event + fn state_transition(&mut self, state: State) -> &State { + if state == self.state { + return &self.state; + } + + self.state = state; + + let state_str: &str = match &self.state { + State::Running => "running", + State::Stalled => "stalled", + State::StalledOnFork(_, _) => "stalled_on_fork", + }; + + let hdr = &self.tip.0; + info!( + event = format!("chain.{}", state_str), + tip_hash = to_str(&hdr.hash), + tip_height = hdr.height, + tip_iter = hdr.iteration, + tip_updated_at = self.tip.1, + ); + + &self.state + } + + async fn last_final_block(&self) -> ([u8; 32], u64) { + let hdr = self + .acc + .read() + .await + .get_latest_final_block() + .await + .unwrap() + .header() + .clone(); + + (hdr.hash, hdr.height) + } +}