diff --git a/consensus/src/commons.rs b/consensus/src/commons.rs index 558d5aa63e..36f06a5734 100644 --- a/consensus/src/commons.rs +++ b/consensus/src/commons.rs @@ -139,6 +139,8 @@ impl From for ConsensusError { #[async_trait::async_trait] pub trait Database: Send + Sync { fn store_candidate_block(&mut self, b: Block); + async fn get_last_iter(&self) -> (Hash, u8); + async fn store_last_iter(&mut self, data: (Hash, u8)); } #[derive(Clone)] diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 8f84add44c..23efa671d4 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -10,6 +10,7 @@ use crate::operations::Operations; use crate::phase::Phase; use node_data::message::{AsyncQueue, Message, Topics}; +use node_data::StepName; use crate::execution_ctx::ExecutionCtx; use crate::proposal; @@ -184,8 +185,39 @@ impl Consensus { ru.base_timeouts.clone(), ); + let (prev_block_hash, saved_iter) = + db.lock().await.get_last_iter().await; + + if ru.hash() == prev_block_hash { + // If starting from `saved_iter`, we regenerate all committees + // in case they are needed to process past-iteration messages in + // Emergency Mode + while iter <= saved_iter { + iter_ctx.on_begin(iter); + iter_ctx.generate_committee( + StepName::Proposal, + provisioners.as_ref(), + ru.seed(), + ); + iter_ctx.generate_committee( + StepName::Validation, + provisioners.as_ref(), + ru.seed(), + ); + iter_ctx.generate_committee( + StepName::Ratification, + provisioners.as_ref(), + ru.seed(), + ); + iter += 1; + } + + debug!(event = "restored iteration", ru.round, iter); + } + loop { Self::consensus_delay().await; + db.lock().await.store_last_iter((ru.hash(), iter)).await; iter_ctx.on_begin(iter); @@ -197,6 +229,13 @@ impl Consensus { // phase. phase.reinitialize(msg, ru.round, iter).await; + // Generate step committee + iter_ctx.generate_committee( + step_name, + provisioners.as_ref(), + ru.seed(), + ); + // Construct phase execution context let ctx = ExecutionCtx::new( &mut iter_ctx, diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index 016fcf05af..49ed6b43bc 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -13,7 +13,6 @@ use crate::queue::MsgRegistry; use crate::step_votes_reg::SafeAttestationInfoRegistry; use crate::user::committee::Committee; use crate::user::provisioners::Provisioners; -use crate::user::sortition; use node_data::bls::PublicKeyBytes; use node_data::ledger::Block; @@ -106,10 +105,6 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> { committee.is_member(&self.round_update.pubkey_bls) } - pub(crate) fn save_committee(&mut self, step: u8, committee: Committee) { - self.iter_ctx.committees.insert(step, committee); - } - pub(crate) fn get_current_committee(&self) -> Option<&Committee> { self.iter_ctx.committees.get_committee(self.step()) } @@ -456,19 +451,6 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> { None } - pub fn get_sortition_config( - &self, - exclusion: Vec, - ) -> sortition::Config { - sortition::Config::new( - self.round_update.seed(), - self.round_update.round, - self.iteration, - self.step_name(), - exclusion, - ) - } - /// Reports step elapsed time to the client async fn report_elapsed_time(&mut self) { let elapsed = self diff --git a/consensus/src/iteration_ctx.rs b/consensus/src/iteration_ctx.rs index 071fcb4685..9625abd524 100644 --- a/consensus/src/iteration_ctx.rs +++ b/consensus/src/iteration_ctx.rs @@ -7,15 +7,18 @@ use crate::commons::{RoundUpdate, TimeoutSet}; use std::cmp; -use crate::config::{MAX_STEP_TIMEOUT, TIMEOUT_INCREASE}; +use crate::config::{CONSENSUS_MAX_ITER, MAX_STEP_TIMEOUT, TIMEOUT_INCREASE}; use crate::msg_handler::HandleMsgOutput; use crate::msg_handler::MsgHandler; use crate::user::committee::Committee; +use crate::user::provisioners::Provisioners; +use crate::user::sortition; use crate::{ratification, validation}; use node_data::bls::PublicKeyBytes; +use node_data::ledger::Seed; use node_data::message::Message; use std::collections::HashMap; use std::ops::Add; @@ -131,6 +134,101 @@ impl IterationCtx { .expect("valid timeout per step") } + fn get_sortition_config( + &self, + seed: Seed, + step_name: StepName, + exclusion: Vec, + ) -> sortition::Config { + sortition::Config::new( + seed, self.round, self.iter, step_name, exclusion, + ) + } + + pub(crate) fn generate_committee( + &mut self, + step_name: StepName, + provisioners: &Provisioners, + seed: Seed, + ) { + let iteration = self.iter; + let step = step_name.to_step(iteration); + + // Check if we already generated the committee. + // This will be usually the case for all Proposal steps after + // iteration 0 + if self.committees.get_committee(step).is_some() { + return; + } + + // For Validation and Ratification steps we need the next-iteration + // generator for the exclusion list. So we extract, it if necessary. + // + // This is not necessary in the last iteration, so we skip it + if step_name != StepName::Proposal && iteration < CONSENSUS_MAX_ITER - 1 + { + let prop = StepName::Proposal; + let next_prop_step = prop.to_step(iteration + 1); + + // Check if this committee has been already generated. + // This will be typically the case when executing the Ratification + // step after the Validation one + if self.committees.get_committee(next_prop_step).is_none() { + let mut next_cfg = + self.get_sortition_config(seed, prop, vec![]); + next_cfg.step = next_prop_step; + + let next_generator = Committee::new(provisioners, &next_cfg); + self.committees.insert(next_prop_step, next_generator); + } + } + + // Fill up exclusion list + // + // We exclude the generators for the current iteration and the next one + // to avoid conflict of interests + let exclusion = match step_name { + StepName::Proposal => vec![], + _ => { + let mut exclusion_list = vec![]; + // Exclude generator for current iteration + let cur_generator = self + .get_generator(iteration) + .expect("Proposal committee to be already generated"); + + exclusion_list.push(cur_generator); + + // Exclude generator for next iteration + if iteration < CONSENSUS_MAX_ITER - 1 { + let next_generator = + self.get_generator(iteration + 1).expect( + "Next Proposal committee to be already generated", + ); + + exclusion_list.push(next_generator); + } + + exclusion_list + } + }; + + // Generate the committee for the current step + // If the step is Proposal, the only extracted member is the generator + // For Validation and Ratification steps, extracted members are + // delegated to vote on the candidate block + let step_committee = Committee::new( + provisioners, + &self.get_sortition_config(seed, step_name, exclusion), + ); + + debug!( + event = "committee_generated", + members = format!("{}", &step_committee) + ); + + self.committees.insert(step, step_committee); + } + pub(crate) fn get_generator(&self, iter: u8) -> Option { let step = StepName::Proposal.to_step(iter); self.committees diff --git a/consensus/src/phase.rs b/consensus/src/phase.rs index bb0a159311..53e99e37f7 100644 --- a/consensus/src/phase.rs +++ b/consensus/src/phase.rs @@ -5,10 +5,8 @@ // Copyright (c) DUSK NETWORK. All rights reserved. use crate::commons::{ConsensusError, Database}; -use crate::config::CONSENSUS_MAX_ITER; use crate::execution_ctx::ExecutionCtx; use crate::operations::Operations; -use crate::user::committee::Committee; use crate::{proposal, ratification, validation}; use node_data::message::Message; use node_data::StepName; @@ -58,64 +56,9 @@ impl Phase { ) -> Result { ctx.set_start_time(); - let step_name = ctx.step_name(); let timeout = ctx.iter_ctx.get_timeout(ctx.step_name()); debug!(event = "execute_step", ?timeout); - let exclusion = match step_name { - StepName::Proposal => vec![], - _ => { - let mut exclusion_list = vec![]; - let generator = ctx - .iter_ctx - .get_generator(ctx.iteration) - .expect("Proposal committee to be already generated"); - - exclusion_list.push(generator); - - if ctx.iteration < CONSENSUS_MAX_ITER { - let next_generator = - ctx.iter_ctx.get_generator(ctx.iteration + 1).expect( - "Next Proposal committee to be already generated", - ); - - exclusion_list.push(next_generator); - } - - exclusion_list - } - }; - - // Perform deterministic_sortition to generate committee of size=N. - // The extracted members are the provisioners eligible to vote on this - // particular round and step. In the context of Proposal phase, - // the extracted member is the one eligible to generate the candidate - // block. - let step_committee = Committee::new( - ctx.provisioners, - &ctx.get_sortition_config(exclusion), - ); - - if let StepName::Proposal = step_name { - if ctx.iteration < CONSENSUS_MAX_ITER { - let mut cfg_next_iteration = ctx.get_sortition_config(vec![]); - cfg_next_iteration.step = - StepName::Proposal.to_step(ctx.iteration + 1); - - ctx.save_committee( - cfg_next_iteration.step, - Committee::new(ctx.provisioners, &cfg_next_iteration), - ); - } - } - - debug!( - event = "committee_generated", - members = format!("{}", &step_committee) - ); - - ctx.save_committee(ctx.step(), step_committee); - // Execute step await_phase!(self, run(ctx)) } diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index d851447a4f..176a5570a1 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -14,7 +14,7 @@ use dusk_consensus::operations::{ }; use dusk_consensus::queue::MsgRegistry; use dusk_consensus::user::provisioners::ContextProvisioners; -use node_data::ledger::{Block, Fault, Header}; +use node_data::ledger::{Block, Fault, Hash, Header}; use node_data::message::AsyncQueue; use tokio::sync::{oneshot, Mutex, RwLock}; @@ -24,7 +24,7 @@ use tracing::{debug, info, trace, warn}; use crate::chain::header_validation::Validator; use crate::chain::metrics::AverageElapsedTime; use crate::database::rocksdb::{ - MD_AVG_PROPOSAL, MD_AVG_RATIFICATION, MD_AVG_VALIDATION, + MD_AVG_PROPOSAL, MD_AVG_RATIFICATION, MD_AVG_VALIDATION, MD_LAST_ITER, }; use metrics::gauge; use node_data::{ledger, Serializable, StepName}; @@ -211,6 +211,42 @@ impl dusk_consensus::commons::Database for CandidateDB { } } } + async fn get_last_iter(&self) -> (Hash, u8) { + let data = self + .db + .read() + .await + .view(|t| t.op_read(MD_LAST_ITER)) + .unwrap_or_else(|e| { + warn!("Cannot read last_iter from database {e:?}"); + None + }) + .filter(|v| v.len() == 33) + .unwrap_or_else(|| { + warn!("No last_iter saved, falling back to default"); + [0u8; 33].to_vec() + }); + + let mut hash = [0u8; 32]; + hash.copy_from_slice(&data[0..32]); + + let iter = data[32]; + + (hash, iter) + } + async fn store_last_iter(&mut self, (hash, iter): (Hash, u8)) { + let mut to_store = hash.to_vec(); + to_store.push(iter); + + if let Err(e) = self + .db + .read() + .await + .update(|t| t.op_write(MD_LAST_ITER, to_store)) + { + warn!("Cannot write last_iter to database {e:?}"); + } + } } /// Implements Executor trait to mock Contract Storage calls. diff --git a/node/src/database/rocksdb.rs b/node/src/database/rocksdb.rs index 08c7723444..e3b8b83e9d 100644 --- a/node/src/database/rocksdb.rs +++ b/node/src/database/rocksdb.rs @@ -53,6 +53,7 @@ pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key"; pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time"; pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time"; pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time"; +pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter"; #[derive(Clone)] pub struct Backend {