Skip to content

Commit

Permalink
Merge pull request #2229 from dusk-network/1908-do-not-start-from-ite…
Browse files Browse the repository at this point in the history
…ration-0-when-the-node-restarts

consensus: Start from saved iteration on restart
  • Loading branch information
fed-franz authored Sep 6, 2024
2 parents 7e52446 + 69c2165 commit 9825441
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 78 deletions.
2 changes: 2 additions & 0 deletions consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ impl From<BlsSigError> for ConsensusError {
#[async_trait::async_trait]
pub trait Database: Send + Sync {
fn store_candidate_block(&mut self, b: Block);
async fn get_last_iter(&self) -> (Hash, u8);
async fn store_last_iter(&mut self, data: (Hash, u8));
}

#[derive(Clone)]
Expand Down
39 changes: 39 additions & 0 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::operations::Operations;
use crate::phase::Phase;

use node_data::message::{AsyncQueue, Message, Topics};
use node_data::StepName;

use crate::execution_ctx::ExecutionCtx;
use crate::proposal;
Expand Down Expand Up @@ -184,8 +185,39 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
ru.base_timeouts.clone(),
);

let (prev_block_hash, saved_iter) =
db.lock().await.get_last_iter().await;

if ru.hash() == prev_block_hash {
// If starting from `saved_iter`, we regenerate all committees
// in case they are needed to process past-iteration messages in
// Emergency Mode
while iter <= saved_iter {
iter_ctx.on_begin(iter);
iter_ctx.generate_committee(
StepName::Proposal,
provisioners.as_ref(),
ru.seed(),
);
iter_ctx.generate_committee(
StepName::Validation,
provisioners.as_ref(),
ru.seed(),
);
iter_ctx.generate_committee(
StepName::Ratification,
provisioners.as_ref(),
ru.seed(),
);
iter += 1;
}

debug!(event = "restored iteration", ru.round, iter);
}

loop {
Self::consensus_delay().await;
db.lock().await.store_last_iter((ru.hash(), iter)).await;

iter_ctx.on_begin(iter);

Expand All @@ -197,6 +229,13 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
// phase.
phase.reinitialize(msg, ru.round, iter).await;

// Generate step committee
iter_ctx.generate_committee(
step_name,
provisioners.as_ref(),
ru.seed(),
);

// Construct phase execution context
let ctx = ExecutionCtx::new(
&mut iter_ctx,
Expand Down
18 changes: 0 additions & 18 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::queue::MsgRegistry;
use crate::step_votes_reg::SafeAttestationInfoRegistry;
use crate::user::committee::Committee;
use crate::user::provisioners::Provisioners;
use crate::user::sortition;

use node_data::bls::PublicKeyBytes;
use node_data::ledger::Block;
Expand Down Expand Up @@ -106,10 +105,6 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> {
committee.is_member(&self.round_update.pubkey_bls)
}

pub(crate) fn save_committee(&mut self, step: u8, committee: Committee) {
self.iter_ctx.committees.insert(step, committee);
}

pub(crate) fn get_current_committee(&self) -> Option<&Committee> {
self.iter_ctx.committees.get_committee(self.step())
}
Expand Down Expand Up @@ -456,19 +451,6 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> {
None
}

pub fn get_sortition_config(
&self,
exclusion: Vec<PublicKeyBytes>,
) -> sortition::Config {
sortition::Config::new(
self.round_update.seed(),
self.round_update.round,
self.iteration,
self.step_name(),
exclusion,
)
}

/// Reports step elapsed time to the client
async fn report_elapsed_time(&mut self) {
let elapsed = self
Expand Down
100 changes: 99 additions & 1 deletion consensus/src/iteration_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
use crate::commons::{RoundUpdate, TimeoutSet};
use std::cmp;

use crate::config::{MAX_STEP_TIMEOUT, TIMEOUT_INCREASE};
use crate::config::{CONSENSUS_MAX_ITER, MAX_STEP_TIMEOUT, TIMEOUT_INCREASE};
use crate::msg_handler::HandleMsgOutput;
use crate::msg_handler::MsgHandler;

use crate::user::committee::Committee;
use crate::user::provisioners::Provisioners;
use crate::user::sortition;

use crate::{ratification, validation};
use node_data::bls::PublicKeyBytes;

use node_data::ledger::Seed;
use node_data::message::Message;
use std::collections::HashMap;
use std::ops::Add;
Expand Down Expand Up @@ -131,6 +134,101 @@ impl IterationCtx {
.expect("valid timeout per step")
}

fn get_sortition_config(
&self,
seed: Seed,
step_name: StepName,
exclusion: Vec<PublicKeyBytes>,
) -> sortition::Config {
sortition::Config::new(
seed, self.round, self.iter, step_name, exclusion,
)
}

pub(crate) fn generate_committee(
&mut self,
step_name: StepName,
provisioners: &Provisioners,
seed: Seed,
) {
let iteration = self.iter;
let step = step_name.to_step(iteration);

// Check if we already generated the committee.
// This will be usually the case for all Proposal steps after
// iteration 0
if self.committees.get_committee(step).is_some() {
return;
}

// For Validation and Ratification steps we need the next-iteration
// generator for the exclusion list. So we extract, it if necessary.
//
// This is not necessary in the last iteration, so we skip it
if step_name != StepName::Proposal && iteration < CONSENSUS_MAX_ITER - 1
{
let prop = StepName::Proposal;
let next_prop_step = prop.to_step(iteration + 1);

// Check if this committee has been already generated.
// This will be typically the case when executing the Ratification
// step after the Validation one
if self.committees.get_committee(next_prop_step).is_none() {
let mut next_cfg =
self.get_sortition_config(seed, prop, vec![]);
next_cfg.step = next_prop_step;

let next_generator = Committee::new(provisioners, &next_cfg);
self.committees.insert(next_prop_step, next_generator);
}
}

// Fill up exclusion list
//
// We exclude the generators for the current iteration and the next one
// to avoid conflict of interests
let exclusion = match step_name {
StepName::Proposal => vec![],
_ => {
let mut exclusion_list = vec![];
// Exclude generator for current iteration
let cur_generator = self
.get_generator(iteration)
.expect("Proposal committee to be already generated");

exclusion_list.push(cur_generator);

// Exclude generator for next iteration
if iteration < CONSENSUS_MAX_ITER - 1 {
let next_generator =
self.get_generator(iteration + 1).expect(
"Next Proposal committee to be already generated",
);

exclusion_list.push(next_generator);
}

exclusion_list
}
};

// Generate the committee for the current step
// If the step is Proposal, the only extracted member is the generator
// For Validation and Ratification steps, extracted members are
// delegated to vote on the candidate block
let step_committee = Committee::new(
provisioners,
&self.get_sortition_config(seed, step_name, exclusion),
);

debug!(
event = "committee_generated",
members = format!("{}", &step_committee)
);

self.committees.insert(step, step_committee);
}

pub(crate) fn get_generator(&self, iter: u8) -> Option<PublicKeyBytes> {
let step = StepName::Proposal.to_step(iter);
self.committees
Expand Down
57 changes: 0 additions & 57 deletions consensus/src/phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database};
use crate::config::CONSENSUS_MAX_ITER;
use crate::execution_ctx::ExecutionCtx;
use crate::operations::Operations;
use crate::user::committee::Committee;
use crate::{proposal, ratification, validation};
use node_data::message::Message;
use node_data::StepName;
Expand Down Expand Up @@ -58,64 +56,9 @@ impl<T: Operations + 'static, D: Database + 'static> Phase<T, D> {
) -> Result<Message, ConsensusError> {
ctx.set_start_time();

let step_name = ctx.step_name();
let timeout = ctx.iter_ctx.get_timeout(ctx.step_name());
debug!(event = "execute_step", ?timeout);

let exclusion = match step_name {
StepName::Proposal => vec![],
_ => {
let mut exclusion_list = vec![];
let generator = ctx
.iter_ctx
.get_generator(ctx.iteration)
.expect("Proposal committee to be already generated");

exclusion_list.push(generator);

if ctx.iteration < CONSENSUS_MAX_ITER {
let next_generator =
ctx.iter_ctx.get_generator(ctx.iteration + 1).expect(
"Next Proposal committee to be already generated",
);

exclusion_list.push(next_generator);
}

exclusion_list
}
};

// Perform deterministic_sortition to generate committee of size=N.
// The extracted members are the provisioners eligible to vote on this
// particular round and step. In the context of Proposal phase,
// the extracted member is the one eligible to generate the candidate
// block.
let step_committee = Committee::new(
ctx.provisioners,
&ctx.get_sortition_config(exclusion),
);

if let StepName::Proposal = step_name {
if ctx.iteration < CONSENSUS_MAX_ITER {
let mut cfg_next_iteration = ctx.get_sortition_config(vec![]);
cfg_next_iteration.step =
StepName::Proposal.to_step(ctx.iteration + 1);

ctx.save_committee(
cfg_next_iteration.step,
Committee::new(ctx.provisioners, &cfg_next_iteration),
);
}
}

debug!(
event = "committee_generated",
members = format!("{}", &step_committee)
);

ctx.save_committee(ctx.step(), step_committee);

// Execute step
await_phase!(self, run(ctx))
}
Expand Down
40 changes: 38 additions & 2 deletions node/src/chain/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use dusk_consensus::operations::{
};
use dusk_consensus::queue::MsgRegistry;
use dusk_consensus::user::provisioners::ContextProvisioners;
use node_data::ledger::{Block, Fault, Header};
use node_data::ledger::{Block, Fault, Hash, Header};
use node_data::message::AsyncQueue;

use tokio::sync::{oneshot, Mutex, RwLock};
Expand All @@ -24,7 +24,7 @@ use tracing::{debug, info, trace, warn};
use crate::chain::header_validation::Validator;
use crate::chain::metrics::AverageElapsedTime;
use crate::database::rocksdb::{
MD_AVG_PROPOSAL, MD_AVG_RATIFICATION, MD_AVG_VALIDATION,
MD_AVG_PROPOSAL, MD_AVG_RATIFICATION, MD_AVG_VALIDATION, MD_LAST_ITER,
};
use metrics::gauge;
use node_data::{ledger, Serializable, StepName};
Expand Down Expand Up @@ -211,6 +211,42 @@ impl<DB: database::DB> dusk_consensus::commons::Database for CandidateDB<DB> {
}
}
}
async fn get_last_iter(&self) -> (Hash, u8) {
let data = self
.db
.read()
.await
.view(|t| t.op_read(MD_LAST_ITER))
.unwrap_or_else(|e| {
warn!("Cannot read last_iter from database {e:?}");
None
})
.filter(|v| v.len() == 33)
.unwrap_or_else(|| {
warn!("No last_iter saved, falling back to default");
[0u8; 33].to_vec()
});

let mut hash = [0u8; 32];
hash.copy_from_slice(&data[0..32]);

let iter = data[32];

(hash, iter)
}
async fn store_last_iter(&mut self, (hash, iter): (Hash, u8)) {
let mut to_store = hash.to_vec();
to_store.push(iter);

if let Err(e) = self
.db
.read()
.await
.update(|t| t.op_write(MD_LAST_ITER, to_store))
{
warn!("Cannot write last_iter to database {e:?}");
}
}
}

/// Implements Executor trait to mock Contract Storage calls.
Expand Down
1 change: 1 addition & 0 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";

#[derive(Clone)]
pub struct Backend {
Expand Down

0 comments on commit 9825441

Please sign in to comment.