From cd30bdb591ee9dcdd6d1001d0eb07f2bf7cba366 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:48:59 +0300 Subject: [PATCH 01/17] node-data: Use different topic IDs for 1st and 2nd reduction message --- node-data/src/ledger.rs | 2 +- node-data/src/message.rs | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/node-data/src/ledger.rs b/node-data/src/ledger.rs index f90beb06d2..ef368af39a 100644 --- a/node-data/src/ledger.rs +++ b/node-data/src/ledger.rs @@ -243,7 +243,7 @@ impl Block { } } -#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] +#[derive(Debug, Default, Clone, Copy, Eq, Hash, PartialEq)] #[cfg_attr(any(feature = "faker", test), derive(Dummy))] pub struct StepVotes { pub bitset: u64, diff --git a/node-data/src/message.rs b/node-data/src/message.rs index ce30718238..3f4c58af9d 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -118,7 +118,7 @@ impl Serializable for Message { Topics::NewBlock => { Payload::NewBlock(Box::new(payload::NewBlock::read(r)?)) } - Topics::Reduction => { + Topics::FirstReduction | Topics::SecondReduction => { Payload::Reduction(payload::Reduction::read(r)?) } Topics::Agreement => { @@ -312,7 +312,8 @@ fn is_consensus_msg(topic: u8) -> bool { matches!( Topics::from(topic), Topics::NewBlock - | Topics::Reduction + | Topics::FirstReduction + | Topics::SecondReduction | Topics::Agreement | Topics::AggrAgreement ) @@ -642,8 +643,8 @@ pub mod payload { /// Generates a certificate from agreement. pub fn generate_certificate(&self) -> Certificate { Certificate { - first_reduction: self.first_step.clone(), - second_reduction: self.second_step.clone(), + first_reduction: self.first_step, + second_reduction: self.second_step, } } } @@ -898,11 +899,12 @@ pub enum Topics { // Consensus main loop topics Candidate = 15, NewBlock = 16, - Reduction = 17, + FirstReduction = 17, + SecondReduction = 18, // Consensus Agreement loop topics - Agreement = 18, - AggrAgreement = 19, + Agreement = 19, + AggrAgreement = 20, #[default] Unknown = 255, @@ -919,7 +921,8 @@ impl From for Topics { map_topic!(v, Topics::Candidate); map_topic!(v, Topics::GetCandidate); map_topic!(v, Topics::NewBlock); - map_topic!(v, Topics::Reduction); + map_topic!(v, Topics::FirstReduction); + map_topic!(v, Topics::SecondReduction); map_topic!(v, Topics::Agreement); map_topic!(v, Topics::AggrAgreement); From c2c8bce557722ee7d6874c29fe95bbd332e0ec7f Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:51:59 +0300 Subject: [PATCH 02/17] consensus: Apply a delay in block generator based on EST call time execution --- consensus/src/selection/block_generator.rs | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/consensus/src/selection/block_generator.rs b/consensus/src/selection/block_generator.rs index e691faf43b..c1107104d6 100644 --- a/consensus/src/selection/block_generator.rs +++ b/consensus/src/selection/block_generator.rs @@ -18,7 +18,7 @@ use node_data::ledger; use node_data::message::payload::NewBlock; use node_data::message::{Header, Message, Topics}; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::Mutex; use tracing::{debug, info}; @@ -43,6 +43,8 @@ impl Generator { .sign(ru.pubkey_bls.inner(), &ru.seed.inner()[..]) .to_bytes(); + let start = Instant::now(); + let candidate = self .generate_block( &ru.pubkey_bls, @@ -54,6 +56,15 @@ impl Generator { ) .await?; + info!( + event = "gen_candidate", + hash = &to_str(&candidate.header().hash), + state_hash = &to_str(&candidate.header().state_hash), + dur = format!("{:?}ms", start.elapsed().as_millis()), + ); + + debug!("block: {:?}", &candidate); + let msg_header = Header { pubkey_bls: ru.pubkey_bls.clone(), round: ru.round, @@ -64,14 +75,6 @@ impl Generator { let signature = msg_header.sign(&ru.secret_key, ru.pubkey_bls.inner()); - info!( - event = "gen_candidate", - hash = &to_str(&candidate.header().hash), - state_hash = &to_str(&candidate.header().state_hash), - ); - - debug!("block: {:?}", &candidate); - Ok(Message::new_newblock( msg_header, NewBlock { @@ -90,9 +93,7 @@ impl Generator { prev_block_timestamp: i64, iteration: u8, ) -> Result { - // Delay next iteration execution so we avoid consensus-split situation. - tokio::time::sleep(Duration::from_millis(config::CONSENSUS_DELAY_MS)) - .await; + let start_time = Instant::now(); let result = self .executor @@ -128,6 +129,15 @@ impl Generator { iteration, }; + // Apply a delay in block generator accordingly + // In case EST call costs a second (assuming CONSENSUS_DELAY_MS=1000ms), + // we should not sleep here + if let Some(delay) = Duration::from_millis(config::CONSENSUS_DELAY_MS) + .checked_sub(start_time.elapsed()) + { + tokio::time::sleep(delay).await; + } + Ok(Block::new(blk_header, txs).expect("block should be valid")) } From d9798c7c3d259cf6bea844e70f4dc8807e5c5f3b Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:53:34 +0300 Subject: [PATCH 03/17] consensus: Implement StepVotesRegistry struct --- consensus/src/step_votes_reg.rs | 147 ++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 consensus/src/step_votes_reg.rs diff --git a/consensus/src/step_votes_reg.rs b/consensus/src/step_votes_reg.rs new file mode 100644 index 0000000000..ebf7ebfe0c --- /dev/null +++ b/consensus/src/step_votes_reg.rs @@ -0,0 +1,147 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Copyright (c) DUSK NETWORK. All rights reserved. + +use crate::commons::RoundUpdate; +use crate::config::CONSENSUS_MAX_ITER; +use node_data::ledger::to_str; +use node_data::ledger::StepVotes; +use node_data::message::{payload, Message, Topics}; +use std::fmt; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{debug, error}; + +pub(crate) enum SvType { + FirstReduction, + SecondReduction, +} + +#[derive(Default, Copy, Clone)] +struct SvEntry { + // represents candidate block hash + hash: Option<[u8; 32]>, + first_red_sv: StepVotes, + second_red_sv: StepVotes, +} + +impl fmt::Display for SvEntry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let hash = self.hash.unwrap_or_default(); + + write!( + f, + "SvEntry: hash: {}, 1st_red: {:?}, 2nd_red: {:?}", + to_str(&hash), + self.first_red_sv, + self.second_red_sv, + ) + } +} + +impl SvEntry { + pub(crate) fn add_sv( + &mut self, + iter: u8, + hash: [u8; 32], + sv: StepVotes, + svt: SvType, + ) -> bool { + // Discard empty hashes + if hash == [0u8; 32] { + return false; + } + + if let Some(h) = self.hash { + if h != hash { + // Only one hash can be registered per a single iteration + error!(desc = "multiple candidates per iter"); + return false; + } + } else { + self.hash = Some(hash); + } + + match svt { + SvType::FirstReduction => self.first_red_sv = sv, + SvType::SecondReduction => self.second_red_sv = sv, + } + + debug!(event = "add_sv", iter, data = format!("{}", self)); + self.is_ready() + } + + fn is_ready(&self) -> bool { + !self.second_red_sv.is_empty() + && !self.first_red_sv.is_empty() + && self.hash.is_some() + } +} + +pub(crate) type SafeStepVotesRegistry = Arc>; + +pub(crate) struct StepVotesRegistry { + ru: RoundUpdate, + sv_table: [SvEntry; CONSENSUS_MAX_ITER as usize], +} + +impl StepVotesRegistry { + pub(crate) fn new(ru: RoundUpdate) -> Self { + Self { + ru, + sv_table: [SvEntry::default(); CONSENSUS_MAX_ITER as usize], + } + } + + /// Adds step votes per iteration + /// Returns an agreement if both reductions for an iteration are available + pub(crate) fn add_step_votes( + &mut self, + step: u8, + hash: [u8; 32], + sv: StepVotes, + svt: SvType, + ) -> Option { + let iter_num = (step - 1) / 3 + 1; + if iter_num as usize >= self.sv_table.len() { + return None; + } + + let r = &mut self.sv_table[iter_num as usize]; + if r.add_sv(iter_num, hash, sv, svt) { + return Some(Self::build_agreement_msg( + self.ru.clone(), + iter_num, + *r, + )); + } + + None + } + + fn build_agreement_msg( + ru: RoundUpdate, + iteration: u8, + result: SvEntry, + ) -> Message { + let hdr = node_data::message::Header { + pubkey_bls: ru.pubkey_bls.clone(), + round: ru.round, + step: (iteration - 1) * 3 + 3, + block_hash: result.hash.unwrap_or_default(), + topic: Topics::Agreement as u8, + }; + + let signature = hdr.sign(&ru.secret_key, ru.pubkey_bls.inner()); + + let payload = payload::Agreement { + signature, + first_step: result.first_red_sv, + second_step: result.second_red_sv, + }; + + Message::new_agreement(hdr, payload) + } +} From ea8cf124ead19f3fb50a4ae481b25a7262f79151 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:57:08 +0300 Subject: [PATCH 04/17] consensus: Record any generated step_votes in StepVotes registry --- consensus/src/firststep/handler.rs | 38 ++++++++++++++++++++---- consensus/src/secondstep/handler.rs | 45 +++++++++++++++++++++++------ consensus/src/selection/handler.rs | 14 ++++++++- 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/consensus/src/firststep/handler.rs b/consensus/src/firststep/handler.rs index 824c43bec3..b9a4a4754f 100644 --- a/consensus/src/firststep/handler.rs +++ b/consensus/src/firststep/handler.rs @@ -6,14 +6,15 @@ use std::sync::Arc; +use crate::aggregator::Aggregator; use crate::commons::{ConsensusError, Database, RoundUpdate}; use crate::msg_handler::{HandleMsgOutput, MsgHandler}; +use crate::step_votes_reg::{SafeStepVotesRegistry, SvType}; use async_trait::async_trait; use node_data::ledger; use node_data::ledger::{Block, StepVotes}; use tokio::sync::Mutex; -use crate::aggregator::Aggregator; use crate::user::committee::Committee; use node_data::message::{payload, Message, Payload}; @@ -45,25 +46,32 @@ fn final_result_with_timeout( )) } -#[derive(Default)] pub struct Reduction { + sv_registry: SafeStepVotesRegistry, + pub(crate) db: Arc>, pub(crate) aggr: Aggregator, pub(crate) candidate: Block, + curr_step: u8, } impl Reduction { - pub(crate) fn new(db: Arc>) -> Self { + pub(crate) fn new( + db: Arc>, + sv_registry: SafeStepVotesRegistry, + ) -> Self { Self { + sv_registry, db, aggr: Aggregator::default(), candidate: Block::default(), + curr_step: 0, } } - pub(crate) fn reset(&mut self) { - self.aggr = Aggregator::default(); + pub(crate) fn reset(&mut self, curr_step: u8) { self.candidate = Block::default(); + self.curr_step = curr_step; } } @@ -95,7 +103,7 @@ impl MsgHandler for Reduction { &mut self, msg: Message, _ru: &RoundUpdate, - _step: u8, + step: u8, committee: &Committee, ) -> Result { let signature = match &msg.payload { @@ -108,6 +116,24 @@ impl MsgHandler for Reduction { if let Some((hash, sv)) = self.aggr.collect_vote(committee, &msg.header, &signature) { + // Record result in global round registry of all non-Nil step_votes + if hash != [0u8; 32] { + if let Some(m) = self.sv_registry.lock().await.add_step_votes( + step, + hash, + sv, + SvType::FirstReduction, + ) { + return Ok(HandleMsgOutput::FinalResult(m)); + } + + // If step is different from the current one, this means + // * collect * func is being called from different iteration + if step != self.curr_step { + return Ok(HandleMsgOutput::Result(msg)); + } + } + // if the votes converged for an empty hash we invoke halt if hash == [0u8; 32] { tracing::warn!("votes converged for an empty hash"); diff --git a/consensus/src/secondstep/handler.rs b/consensus/src/secondstep/handler.rs index 48f23a2ff3..9180d00903 100644 --- a/consensus/src/secondstep/handler.rs +++ b/consensus/src/secondstep/handler.rs @@ -6,6 +6,7 @@ use crate::commons::{ConsensusError, RoundUpdate}; use crate::msg_handler::{HandleMsgOutput, MsgHandler}; +use crate::step_votes_reg::{SafeStepVotesRegistry, SvType}; use async_trait::async_trait; use node_data::ledger; use node_data::ledger::{Hash, Signature, StepVotes}; @@ -17,8 +18,11 @@ use node_data::message::{payload, Message, Payload, Topics}; use crate::user::committee::Committee; pub struct Reduction { - pub(crate) aggr: Aggregator, + pub(crate) sv_registry: SafeStepVotesRegistry, + + pub(crate) aggregator: Aggregator, pub(crate) first_step_votes: StepVotes, + pub(crate) curr_step: u8, } #[async_trait] @@ -59,12 +63,26 @@ impl MsgHandler for Reduction { }?; // Collect vote, if msg payload is of reduction type - if let Some((block_hash, second_step_votes)) = - self.aggr.collect_vote(committee, &msg.header, &signed_hash) + if let Some((block_hash, second_step_votes)) = self + .aggregator + .collect_vote(committee, &msg.header, &signed_hash) { - // At that point, we have reached a quorum for 2th_reduction on an - // empty on non-empty block. Return an empty message as - // this iteration terminates here. + if block_hash != [0u8; 32] { + // Record result in global round results registry + if let Some(m) = self.sv_registry.lock().await.add_step_votes( + step, + block_hash, + second_step_votes, + SvType::SecondReduction, + ) { + return Ok(HandleMsgOutput::FinalResult(m)); + } + + if step != self.curr_step { + return Ok(HandleMsgOutput::Result(msg)); + } + } + return Ok(HandleMsgOutput::FinalResult(self.build_agreement_msg( ru, step, @@ -87,6 +105,15 @@ impl MsgHandler for Reduction { } impl Reduction { + pub(crate) fn new(sv_registry: SafeStepVotesRegistry) -> Self { + Self { + sv_registry, + aggregator: Default::default(), + first_step_votes: Default::default(), + curr_step: 0, + } + } + fn build_agreement_msg( &self, ru: &RoundUpdate, @@ -105,15 +132,15 @@ impl Reduction { let signature = hdr.sign(&ru.secret_key, ru.pubkey_bls.inner()); let payload = payload::Agreement { signature, - first_step: self.first_step_votes.clone(), + first_step: self.first_step_votes, second_step: second_step_votes, }; Message::new_agreement(hdr, payload) } - pub(crate) fn reset(&mut self) { - self.aggr = Aggregator::default(); + pub(crate) fn reset(&mut self, step: u8) { self.first_step_votes = StepVotes::default(); + self.curr_step = step; } } diff --git a/consensus/src/selection/handler.rs b/consensus/src/selection/handler.rs index a91e418b59..eae05febca 100644 --- a/consensus/src/selection/handler.rs +++ b/consensus/src/selection/handler.rs @@ -7,15 +7,17 @@ use crate::commons::{ConsensusError, Database, RoundUpdate}; use crate::merkle::merkle_root; use crate::msg_handler::{HandleMsgOutput, MsgHandler}; +use crate::step_votes_reg::SafeStepVotesRegistry; use crate::user::committee::Committee; use async_trait::async_trait; + use node_data::message::{Message, Payload}; use std::sync::Arc; use tokio::sync::Mutex; -#[derive(Debug, Default)] pub struct Selection { pub(crate) db: Arc>, + pub(crate) _sv_registry: SafeStepVotesRegistry, } #[async_trait] @@ -65,6 +67,16 @@ impl MsgHandler for Selection { } impl Selection { + pub(crate) fn new( + db: Arc>, + sv_registry: SafeStepVotesRegistry, + ) -> Self { + Self { + db, + _sv_registry: sv_registry, + } + } + fn verify_new_block( &self, msg: &Message, From 79f269567d9faddaed5aa66d38eca74afcb91f50 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:58:12 +0300 Subject: [PATCH 05/17] consensus: Implement Committee::get_unique_members --- consensus/src/user/committee.rs | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/consensus/src/user/committee.rs b/consensus/src/user/committee.rs index 50348c33bf..7e3ceed318 100644 --- a/consensus/src/user/committee.rs +++ b/consensus/src/user/committee.rs @@ -10,7 +10,7 @@ use crate::user::sortition; use super::cluster::Cluster; use crate::config; use node_data::bls::PublicKey; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; use std::mem; @@ -22,6 +22,17 @@ pub struct Committee { cfg: sortition::Config, } +impl Iterator for Committee { + type Item = PublicKey; + + fn next(&mut self) -> Option { + self.members + .iter() + .next() + .map(|(public_key, _)| public_key.clone()) + } +} + #[allow(unused)] impl Committee { /// Generates a new committee from the given provisioners state and @@ -176,6 +187,20 @@ impl CommitteeSet { self.get_or_create(cfg).is_member(pubkey) } + /// Returns number of all unique public keys + pub fn get_unique_members(&self) -> usize { + let mut merged = HashSet::new(); + self.committees.iter().for_each(|(_, committee)| { + committee.members.iter().for_each(|(m, s)| { + if *s > 0 { + merged.insert(m.bytes()); + } + }); + }); + + merged.len() + } + pub fn votes_for( &mut self, pubkey: &PublicKey, From a27b5f6f876fd509548eacccdf421e5c22ae6fc1 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:00:20 +0300 Subject: [PATCH 06/17] consensus: Pass topic ID in spawn_send_reduction --- consensus/src/commons.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/consensus/src/commons.rs b/consensus/src/commons.rs index cb01faee87..8c6b2371a0 100644 --- a/consensus/src/commons.rs +++ b/consensus/src/commons.rs @@ -11,6 +11,7 @@ use crate::contract_state::Operations; use node_data::ledger::*; use node_data::message; +use node_data::message::Topics; use tracing::Instrument; use crate::contract_state::CallParams; @@ -80,6 +81,7 @@ pub fn spawn_send_reduction( outbound: AsyncQueue, inbound: AsyncQueue, executor: Arc>, + topic: Topics, ) { let hash = to_str(&candidate.header().hash); @@ -168,7 +170,7 @@ pub fn spawn_send_reduction( round: ru.round, step, block_hash: hash, - topic: message::Topics::Reduction as u8, + topic: topic.into(), }; let signature = hdr.sign(&ru.secret_key, ru.pubkey_bls.inner()); From e18bdf3b9b168b0cc8cc2859bec6915a36f9217a Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:02:25 +0300 Subject: [PATCH 07/17] consensus: Make module handler pub(crate) --- consensus/src/firststep.rs | 2 +- consensus/src/lib.rs | 1 + consensus/src/secondstep.rs | 2 +- consensus/src/selection.rs | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/consensus/src/firststep.rs b/consensus/src/firststep.rs index fc2d27f0e5..5ac78db4d2 100644 --- a/consensus/src/firststep.rs +++ b/consensus/src/firststep.rs @@ -4,5 +4,5 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -mod handler; +pub(crate) mod handler; pub mod step; diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 93add84ba5..47d7a560c9 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -21,6 +21,7 @@ mod phase; mod queue; mod secondstep; mod selection; +mod step_votes_reg; pub mod merkle; diff --git a/consensus/src/secondstep.rs b/consensus/src/secondstep.rs index fc2d27f0e5..5ac78db4d2 100644 --- a/consensus/src/secondstep.rs +++ b/consensus/src/secondstep.rs @@ -4,5 +4,5 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -mod handler; +pub(crate) mod handler; pub mod step; diff --git a/consensus/src/selection.rs b/consensus/src/selection.rs index 98dff72df7..0f2ec9a427 100644 --- a/consensus/src/selection.rs +++ b/consensus/src/selection.rs @@ -5,5 +5,5 @@ // Copyright (c) DUSK NETWORK. All rights reserved. mod block_generator; -mod handler; +pub(crate) mod handler; pub mod step; From 395a47ab9b51a658a35540b7d4f6c5392e50143c Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:08:07 +0300 Subject: [PATCH 08/17] consensus: Introduce vote-for-former candidate patch - Save any generated committee for future use - Collect consensus messages from former iterations - Handle past messages in the context of tue associated step - Try to vote for former candidate, if committee member - Remove a redundant phase::is_valid call --- consensus/src/execution_ctx.rs | 292 ++++++++++++++++++++++++++------- 1 file changed, 237 insertions(+), 55 deletions(-) diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index b26847222b..11cbda0045 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -4,7 +4,11 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. +use crate::commons::spawn_send_reduction; +use crate::commons::Database; use crate::commons::{ConsensusError, RoundUpdate}; +use crate::config::CONSENSUS_MAX_TIMEOUT_MS; +use crate::contract_state::Operations; use crate::msg_handler::HandleMsgOutput::{ FinalResult, FinalResultWithTimeoutIncrease, }; @@ -13,21 +17,27 @@ use crate::queue::Queue; use crate::user::committee::Committee; use crate::user::provisioners::Provisioners; use crate::user::sortition; +use crate::{firststep, secondstep, selection}; +use node_data::ledger::Block; +use node_data::message::Payload; use node_data::message::{AsyncQueue, Message, Topics}; use std::cmp; -use tokio::task::JoinSet; - -use crate::config::CONSENSUS_MAX_TIMEOUT_MS; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; +use tokio::task::JoinSet; use tokio::time; use tokio::time::Instant; use tracing::{debug, error, info, trace}; /// Represents a shared state within a context of the exection of a single /// iteration. -pub struct IterationCtx { +pub struct IterationCtx { + first_reduction_handler: Arc>>, + sec_reduction_handler: Arc>, + selection_handler: Arc>>, + pub join_set: JoinSet<()>, /// verified candidate hash @@ -38,21 +48,78 @@ pub struct IterationCtx { round: u64, iter: u8, + + /// Stores any committee already generated in the execution of any + /// iteration of current round + committees: HashMap, } -impl IterationCtx { - pub fn new(round: u64, step: u8) -> Self { +impl IterationCtx { + pub fn new( + round: u64, + iter: u8, + selection_handler: Arc>>, + first_reduction_handler: Arc>>, + sec_reduction_handler: Arc>, + ) -> Self { Self { round, join_set: JoinSet::new(), - iter: step / 3 + 1, + iter, verified_hash: Arc::new(Mutex::new([0u8; 32])), + selection_handler, + first_reduction_handler, + sec_reduction_handler, + committees: Default::default(), } } -} -impl Drop for IterationCtx { - fn drop(&mut self) { + pub(crate) async fn collect_past_event( + &self, + ru: &RoundUpdate, + msg: &Message, + ) -> Option { + let committee = self.committees.get(&msg.header.step)?; + match msg.topic() { + node_data::message::Topics::NewBlock => { + let mut handler = self.selection_handler.lock().await; + _ = handler + .collect(msg.clone(), ru, msg.header.step, committee) + .await; + } + node_data::message::Topics::FirstReduction => { + let mut handler = self.first_reduction_handler.lock().await; + if let Ok(FinalResult(m)) = handler + .collect(msg.clone(), ru, msg.header.step, committee) + .await + { + return Some(m); + } + } + node_data::message::Topics::SecondReduction => { + let mut handler = self.sec_reduction_handler.lock().await; + if let Ok(FinalResult(m)) = handler + .collect(msg.clone(), ru, msg.header.step, committee) + .await + { + return Some(m); + } + } + _ => {} + }; + + None + } + + pub(crate) fn get_committee(&mut self, step: u8) -> Option<&Committee> { + self.committees.get(&step) + } + + pub(crate) fn on_iteration_begin(&mut self, iter: u8) { + self.iter = iter; + } + + pub(crate) fn on_iteration_end(&mut self) { debug!( event = "iter completed", len = self.join_set.len(), @@ -63,10 +130,16 @@ impl Drop for IterationCtx { } } +impl Drop for IterationCtx { + fn drop(&mut self) { + self.on_iteration_end(); + } +} + /// ExecutionCtx encapsulates all data needed by a single step to be fully /// executed. -pub struct ExecutionCtx<'a> { - pub iter_ctx: &'a mut IterationCtx, +pub struct ExecutionCtx<'a, DB: Database, T> { + pub iter_ctx: &'a mut IterationCtx, /// Messaging-related fields pub inbound: AsyncQueue, @@ -79,19 +152,22 @@ pub struct ExecutionCtx<'a> { // Round/Step parameters pub round_update: RoundUpdate, pub step: u8, + + executor: Arc>, } -impl<'a> ExecutionCtx<'a> { +impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { /// Creates step execution context. #[allow(clippy::too_many_arguments)] pub fn new( - iter_ctx: &'a mut IterationCtx, + iter_ctx: &'a mut IterationCtx, inbound: AsyncQueue, outbound: AsyncQueue, future_msgs: Arc>>, provisioners: &'a mut Provisioners, round_update: RoundUpdate, step: u8, + executor: Arc>, ) -> Self { Self { iter_ctx, @@ -101,9 +177,14 @@ impl<'a> ExecutionCtx<'a> { provisioners, round_update, step, + executor, } } + pub(crate) fn save_committee(&mut self, step: u8, committee: Committee) { + self.iter_ctx.committees.insert(step, committee); + } + /// Runs a loop that collects both inbound messages and timeout event. /// /// It accepts an instance of MsgHandler impl (phase var) and calls its @@ -116,7 +197,7 @@ impl<'a> ExecutionCtx<'a> { pub async fn event_loop>( &mut self, committee: &Committee, - phase: &mut C, + phase: Arc>, timeout_millis: &mut u64, ) -> Result { debug!(event = "run event_loop"); @@ -137,7 +218,7 @@ impl<'a> ExecutionCtx<'a> { if let Some(step_result) = self .process_inbound_msg( committee, - phase, + phase.clone(), msg, timeout_millis, ) @@ -153,12 +234,102 @@ impl<'a> ExecutionCtx<'a> { info!(event = "timeout-ed"); Self::increase_timeout(timeout_millis); - return self.process_timeout_event(phase); + return self.process_timeout_event(phase.clone()).await; } } } } + pub(crate) async fn vote_for_former_candidate( + &mut self, + msg_step: u8, + candidate: &Block, + ) { + debug!( + event = "former candidate received", + hash = node_data::ledger::to_str(&candidate.header().hash), + msg_step, + ); + + if msg_step + 1 <= self.step { + self.try_vote(msg_step + 1, &candidate, Topics::FirstReduction); + } + + if msg_step + 2 <= self.step { + self.try_vote(msg_step + 2, &candidate, Topics::SecondReduction); + } + } + + fn try_vote(&mut self, step: u8, candidate: &Block, topic: Topics) { + if let Some(committee) = self.iter_ctx.get_committee(step) { + if committee.am_member() { + debug!( + event = "vote for former candidate", + step_topic = format!("{:?}", topic), + hash = node_data::ledger::to_str(&candidate.header().hash), + step, + ); + + spawn_send_reduction( + &mut self.iter_ctx.join_set, + Arc::new(Mutex::new([0u8; 32])), + candidate.clone(), + self.round_update.pubkey_bls.clone(), + self.round_update.clone(), + step, + self.outbound.clone(), + self.inbound.clone(), + self.executor.clone(), + topic, + ); + }; + } else { + error!( + event = "committee not found", + step = self.step, + msg_step = step + ); + } + } + + /// Process messages from past + async fn process_past_events(&mut self, msg: &Message) -> Option { + if msg.header.block_hash == [0u8; 32] + || msg.header.round != self.round_update.round + { + return None; + } + + if let Err(e) = self.outbound.send(msg.clone()).await { + error!("could not send msg due to {:?}", e); + } + + // Try to vote for candidate block from former iteration + if let Payload::NewBlock(p) = &msg.payload { + // TODO: Perform block header/ Certificate full verification + // To be addressed with another PR + + self.vote_for_former_candidate(msg.header.step, &p.candidate) + .await; + } + + if let Some(m) = self + .iter_ctx + .collect_past_event(&self.round_update, msg) + .await + { + if m.header.topic == Topics::Agreement as u8 { + debug!( + event = "agreement from former iter", + msg_step = m.header.step + ); + return Some(m); + } + } + + None + } + /// Delegates the received message to the Phase handler for further /// processing. /// @@ -167,17 +338,19 @@ impl<'a> ExecutionCtx<'a> { async fn process_inbound_msg>( &mut self, committee: &Committee, - phase: &mut C, + phase: Arc>, msg: Message, timeout_millis: &mut u64, ) -> Option { - // Check if a message is fully valid. If so, then it can be broadcast. - match phase.is_valid( + // Check if message is valid in the context of current step + let ret = phase.lock().await.is_valid( msg.clone(), &self.round_update, self.step, committee, - ) { + ); + + match ret { Ok(msg) => { // Re-publish the returned message self.outbound.send(msg).await.unwrap_or_else(|err| { @@ -198,23 +371,27 @@ impl<'a> ExecutionCtx<'a> { msg.header.step, msg, ); + + return None; } ConsensusError::PastEvent => { - trace!("discard message from past {:#?}", msg); + return self.process_past_events(&msg).await; } _ => { error!("phase handler err: {:?}", e); + return None; } } - - return None; } } - match phase - .collect(msg.clone(), &self.round_update, self.step, committee) + let ret = phase + .lock() .await - { + .collect(msg.clone(), &self.round_update, self.step, committee) + .await; + + match ret { Ok(output) => { trace!("message collected {:#?}", msg); @@ -233,7 +410,13 @@ impl<'a> ExecutionCtx<'a> { } } Err(e) => { - error!("phase collect return err: {:?}", e); + error!( + event = "failed collect", + err = format!("{:?}", e), + msg_topic = format!("{:?}", msg.topic()), + msg_step = msg.header.step, + msg_round = msg.header.round, + ); } } @@ -242,12 +425,14 @@ impl<'a> ExecutionCtx<'a> { /// Delegates the received event of timeout to the Phase handler for further /// processing. - fn process_timeout_event>( + async fn process_timeout_event>( &mut self, - phase: &mut C, + phase: Arc>, ) -> Result { - if let Ok(FinalResult(msg)) = - phase.handle_timeout(&self.round_update, self.step) + if let Ok(FinalResult(msg)) = phase + .lock() + .await + .handle_timeout(&self.round_update, self.step) { return Ok(msg); } @@ -262,7 +447,7 @@ impl<'a> ExecutionCtx<'a> { pub async fn handle_future_msgs>( &self, committee: &Committee, - phase: &mut C, + phase: Arc>, ) -> Option { if let Some(messages) = self .future_msgs @@ -275,39 +460,36 @@ impl<'a> ExecutionCtx<'a> { } for msg in messages { - if let Ok(msg) = phase.is_valid( + let ret = phase.lock().await.is_valid( msg, &self.round_update, self.step, committee, - ) { - // Check if a message is fully valid. If so, then it can be - // broadcast. - if let Ok(msg) = phase.is_valid( - msg.clone(), - &self.round_update, - self.step, - committee, - ) { - // Re-publish the drained message - debug!( - event = "republish", - src = "future_msgs", - msg_step = msg.header.step, - msg_round = msg.header.round, - msg_topic = - format!("{:?}", Topics::from(msg.header.topic)) - ); + ); + + if let Ok(msg) = ret { + // Re-publish a drained message + debug!( + event = "republish", + src = "future_msgs", + msg_step = msg.header.step, + msg_round = msg.header.round, + msg_topic = + format!("{:?}", Topics::from(msg.header.topic)) + ); - self.outbound.send(msg).await.unwrap_or_else(|err| { + self.outbound.send(msg.clone()).await.unwrap_or_else( + |err| { error!( "unable to re-publish a drained msg {:?}", err ) - }); - } + }, + ); if let Ok(FinalResult(msg)) = phase + .lock() + .await .collect(msg, &self.round_update, self.step, committee) .await { From c1f3118a58b7671ce32de70e94e71aa8db865935 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:12:03 +0300 Subject: [PATCH 09/17] consensus: Upgrade main consensus loop - Share StepVotesRegistry instance amongst all steps - Share Message Handlers between Phase array and IterationCtx instance - Send agreement message for former iterations --- consensus/src/consensus.rs | 63 +++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 0ed81207b1..4b949ffb8a 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -7,8 +7,10 @@ use crate::commons::{ConsensusError, Database, RoundUpdate}; use crate::contract_state::Operations; use crate::phase::Phase; + use node_data::ledger::{to_str, Block}; -use node_data::message::{AsyncQueue, Message, Payload}; + +use node_data::message::{AsyncQueue, Message, Payload, Topics}; use crate::agreement::step; use crate::execution_ctx::{ExecutionCtx, IterationCtx}; @@ -18,6 +20,7 @@ use crate::{config, selection}; use crate::{firststep, secondstep}; use tracing::{error, Instrument}; +use crate::step_votes_reg::StepVotesRegistry; use std::sync::Arc; use tokio::sync::{oneshot, Mutex}; use tokio::task::JoinHandle; @@ -164,34 +167,69 @@ impl Consensus { future_msgs.lock().await.clear_round(ru.round - 1); } + let sv_registry = + Arc::new(Mutex::new(StepVotesRegistry::new(ru.clone()))); + + let sel_handler = + Arc::new(Mutex::new(selection::handler::Selection::new( + db.clone(), + sv_registry.clone(), + ))); + + let first_handler = + Arc::new(Mutex::new(firststep::handler::Reduction::new( + db.clone(), + sv_registry.clone(), + ))); + + let sec_handler = Arc::new(Mutex::new( + secondstep::handler::Reduction::new(sv_registry.clone()), + )); + let mut phases = [ Phase::Selection(selection::step::Selection::new( executor.clone(), db.clone(), + sel_handler.clone(), )), Phase::Reduction1(firststep::step::Reduction::new( executor.clone(), db.clone(), + first_handler.clone(), + )), + Phase::Reduction2(secondstep::step::Reduction::new( + executor.clone(), + sec_handler.clone(), )), - Phase::Reduction2(secondstep::step::Reduction::new(executor)), ]; // Consensus loop // Initialize and run consensus loop - let mut step: u8 = 0; + + let mut iter_num: u8 = 0; + let mut iter_ctx = IterationCtx::new( + ru.round, + iter_num, + sel_handler.clone(), + first_handler.clone(), + sec_handler.clone(), + ); loop { - let mut msg = Message::empty(); - let mut iter_ctx = IterationCtx::new(ru.round, step + 1); + iter_num += 1; + iter_ctx.on_iteration_begin(iter_num); + let mut msg = Message::empty(); // Execute a single iteration - for phase in phases.iter_mut() { - step += 1; + for pos in 0..phases.len() { + let phase = phases.get_mut(pos).unwrap(); + + let step = (iter_num - 1) * 3 + (pos as u8 + 1); let name = phase.name(); // Initialize new phase with message returned by previous // phase. - phase.reinitialize(&msg, ru.round, step); + phase.reinitialize(&msg, ru.round, step).await; // Construct phase execution context let ctx = ExecutionCtx::new( @@ -202,6 +240,7 @@ impl Consensus { &mut provisioners, ru.clone(), step, + executor.clone(), ); // Execute a phase. @@ -220,11 +259,19 @@ impl Consensus { )) .await?; + // During execution of any step we may encounter that an + // agreement is generated for a former iteration. + if msg.topic() == Topics::Agreement { + break; + } + if step >= config::CONSENSUS_MAX_STEP { return Err(ConsensusError::MaxStepReached); } } + iter_ctx.on_iteration_end(); + // Delegate (agreement) message result to agreement loop for // further processing. From b0c1d90d51d039c516cac4090a87fad2684f7b26 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:13:38 +0300 Subject: [PATCH 10/17] consensus: Save any generated committee on step initialization --- consensus/src/phase.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/consensus/src/phase.rs b/consensus/src/phase.rs index 2af3d24120..0d8cec4959 100644 --- a/consensus/src/phase.rs +++ b/consensus/src/phase.rs @@ -7,6 +7,7 @@ use crate::commons::{ConsensusError, Database}; use crate::contract_state::Operations; use crate::execution_ctx::ExecutionCtx; + use node_data::message::Message; use crate::user::committee::Committee; @@ -42,19 +43,19 @@ macro_rules! call_phase { pub enum Phase { Selection(selection::step::Selection), Reduction1(firststep::step::Reduction), - Reduction2(secondstep::step::Reduction), + Reduction2(secondstep::step::Reduction), } impl Phase { - pub fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { + pub async fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { trace!(event = "init step", msg = format!("{:#?}", msg),); - call_phase!(self, reinitialize(msg, round, step)) + await_phase!(self, reinitialize(msg, round, step)) } pub async fn run( &mut self, - ctx: ExecutionCtx<'_>, + mut ctx: ExecutionCtx<'_, D, T>, ) -> Result { debug!(event = "execute_step", timeout = self.get_timeout()); @@ -76,6 +77,8 @@ impl Phase { members = format!("{}", &step_committee) ); + ctx.save_committee(ctx.step, step_committee.clone()); + await_phase!(self, run(ctx, step_committee)) } From 448e4a4453db6eee4e7a99b2e580c101cc84ed82 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:24:49 +0300 Subject: [PATCH 11/17] consensus: Pass committee_size param to both verify_agreement and verify_step_votes --- consensus/src/agreement/verifiers.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/consensus/src/agreement/verifiers.rs b/consensus/src/agreement/verifiers.rs index 20dcc216a4..eb5353593d 100644 --- a/consensus/src/agreement/verifiers.rs +++ b/consensus/src/agreement/verifiers.rs @@ -11,6 +11,7 @@ use crate::user::committee::CommitteeSet; use crate::user::sortition; use bytes::Buf; +use crate::config; use dusk_bytes::Serializable; use node_data::bls::PublicKey; use node_data::message::{marshal_signable_vote, Header, Message, Payload}; @@ -76,6 +77,7 @@ pub async fn verify_agreement( seed, &msg.header, 0, + config::FIRST_REDUCTION_COMMITTEE_SIZE, ) .await .map_err(|e| { @@ -94,6 +96,7 @@ pub async fn verify_agreement( seed, &msg.header, 1, + config::SECOND_REDUCTION_COMMITTEE_SIZE, ) .await .map_err(|e| { @@ -118,13 +121,14 @@ pub async fn verify_step_votes( seed: Seed, hdr: &Header, step_offset: u8, + committee_size: usize, ) -> Result<(), Error> { if hdr.step == 0 { return Err(Error::InvalidStepNum); } let step = hdr.step - 1 + step_offset; - let cfg = sortition::Config::new(seed, hdr.round, step, 64); + let cfg = sortition::Config::new(seed, hdr.round, step, committee_size); verify_votes( &hdr.block_hash, From 5fb969d27388a1b43cd5c12caa792eaa876dd3ff Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:38:50 +0300 Subject: [PATCH 12/17] consensus: Decrease CONSENSUS_TIMEOUT_MS from 20s to 5s --- consensus/src/config.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/consensus/src/config.rs b/consensus/src/config.rs index 05b0b2f907..167ed2f7c4 100644 --- a/consensus/src/config.rs +++ b/consensus/src/config.rs @@ -6,10 +6,13 @@ /// Maximum number of steps Consensus runs per a single round. pub const CONSENSUS_MAX_STEP: u8 = 213; +/// Maximum number of iterations Consensus runs per a single round. +pub const CONSENSUS_MAX_ITER: u8 = CONSENSUS_MAX_STEP / 3; + /// Percentage number that determines a quorum. pub const CONSENSUS_QUORUM_THRESHOLD: f64 = 0.67; /// Initial step timeout in milliseconds. -pub const CONSENSUS_TIMEOUT_MS: u64 = 20 * 1000; +pub const CONSENSUS_TIMEOUT_MS: u64 = 5 * 1000; /// Maximum step timeout. pub const CONSENSUS_MAX_TIMEOUT_MS: u64 = 60 * 1000; @@ -21,6 +24,7 @@ pub const SECOND_REDUCTION_COMMITTEE_SIZE: usize = 64; /// Artifical delay on each selection step. pub const CONSENSUS_DELAY_MS: u64 = 1000; + /// Default number of workers to process agreements. pub const ACCUMULATOR_WORKERS_AMOUNT: usize = 6; pub const ACCUMULATOR_QUEUE_CAP: usize = 100; From f4d41c0786fda65c722d845cf98d48fd14dea881 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:41:11 +0300 Subject: [PATCH 13/17] consensus: Wrap msg handlers in Arc> --- consensus/src/firststep/step.rs | 43 +++++++++++++++++---------- consensus/src/secondstep/step.rs | 51 ++++++++++++++++++++------------ consensus/src/selection/step.rs | 31 ++++++++++++------- 3 files changed, 81 insertions(+), 44 deletions(-) diff --git a/consensus/src/firststep/step.rs b/consensus/src/firststep/step.rs index 21fd1b9d99..7d461a8a53 100644 --- a/consensus/src/firststep/step.rs +++ b/consensus/src/firststep/step.rs @@ -9,9 +9,10 @@ use crate::config; use crate::contract_state::Operations; use crate::execution_ctx::ExecutionCtx; use crate::firststep::handler; + use crate::user::committee::Committee; use node_data::ledger::to_str; -use node_data::message::{Message, Payload}; +use node_data::message::{Message, Payload, Topics}; use std::ops::Deref; use std::sync::Arc; use tokio::sync::Mutex; @@ -20,24 +21,29 @@ use tracing::debug; #[allow(unused)] pub struct Reduction { timeout_millis: u64, - handler: handler::Reduction, + handler: Arc>>, executor: Arc>, } - impl Reduction { - pub fn new(executor: Arc>, db: Arc>) -> Self { + pub(crate) fn new( + executor: Arc>, + _db: Arc>, + handler: Arc>>, + ) -> Self { Self { timeout_millis: config::CONSENSUS_TIMEOUT_MS, - handler: handler::Reduction::new(db), + handler, executor, } } - pub fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { - self.handler.reset(); + pub async fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { + let mut handler = self.handler.lock().await; + + handler.reset(step); if let Payload::NewBlock(p) = msg.clone().payload { - self.handler.candidate = p.deref().candidate.clone(); + handler.candidate = p.deref().candidate.clone(); } debug!( @@ -46,39 +52,46 @@ impl Reduction { round = round, step = step, timeout = self.timeout_millis, - hash = to_str(&self.handler.candidate.header().hash), + hash = to_str(&handler.candidate.header().hash), ) } pub async fn run( &mut self, - mut ctx: ExecutionCtx<'_>, + mut ctx: ExecutionCtx<'_, DB, T>, committee: Committee, ) -> Result { if committee.am_member() { + let candidate = self.handler.lock().await.candidate.clone(); // Send reduction async spawn_send_reduction( &mut ctx.iter_ctx.join_set, ctx.iter_ctx.verified_hash.clone(), - self.handler.candidate.clone(), + candidate, committee.get_my_pubkey().clone(), ctx.round_update.clone(), ctx.step, ctx.outbound.clone(), ctx.inbound.clone(), self.executor.clone(), + Topics::FirstReduction, ); } // handle queued messages for current round and step. - if let Some(m) = - ctx.handle_future_msgs(&committee, &mut self.handler).await + if let Some(m) = ctx + .handle_future_msgs(&committee, self.handler.clone()) + .await { return Ok(m); } - ctx.event_loop(&committee, &mut self.handler, &mut self.timeout_millis) - .await + ctx.event_loop( + &committee, + self.handler.clone(), + &mut self.timeout_millis, + ) + .await } pub fn name(&self) -> &'static str { diff --git a/consensus/src/secondstep/step.rs b/consensus/src/secondstep/step.rs index e50c03b7bf..0ab89d2d0c 100644 --- a/consensus/src/secondstep/step.rs +++ b/consensus/src/secondstep/step.rs @@ -4,44 +4,51 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -use crate::commons::{spawn_send_reduction, ConsensusError}; +use crate::commons::{spawn_send_reduction, ConsensusError, Database}; use crate::config; use crate::contract_state::Operations; use crate::execution_ctx::ExecutionCtx; +use std::marker::PhantomData; + use crate::secondstep::handler; use crate::user::committee::Committee; use node_data::ledger::{to_str, Block}; -use node_data::message::{Message, Payload}; +use node_data::message::{Message, Payload, Topics}; use std::sync::Arc; use tokio::sync::Mutex; #[allow(unused)] -pub struct Reduction { - handler: handler::Reduction, +pub struct Reduction { + handler: Arc>, candidate: Option, timeout_millis: u64, executor: Arc>, + + marker: PhantomData, } -impl Reduction { - pub fn new(executor: Arc>) -> Self { +impl Reduction { + pub(crate) fn new( + executor: Arc>, + handler: Arc>, + ) -> Self { Self { - handler: handler::Reduction { - aggr: Default::default(), - first_step_votes: Default::default(), - }, + handler, candidate: None, timeout_millis: config::CONSENSUS_TIMEOUT_MS, executor, + marker: PhantomData, } } - pub fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { + pub async fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { + let mut handler = self.handler.lock().await; + self.candidate = None; - self.handler.reset(); + handler.reset(step); if let Payload::StepVotesWithCandidate(p) = msg.payload.clone() { - self.handler.first_step_votes = p.sv; + handler.first_step_votes = p.sv; self.candidate = Some(p.candidate); } @@ -59,13 +66,13 @@ impl Reduction { .header() .hash ), - fsv_bitset = self.handler.first_step_votes.bitset, + fsv_bitset = handler.first_step_votes.bitset, ) } pub async fn run( &mut self, - mut ctx: ExecutionCtx<'_>, + mut ctx: ExecutionCtx<'_, DB, T>, committee: Committee, ) -> Result { if committee.am_member() { @@ -81,19 +88,25 @@ impl Reduction { ctx.outbound.clone(), ctx.inbound.clone(), self.executor.clone(), + Topics::SecondReduction, ); } } // handle queued messages for current round and step. - if let Some(m) = - ctx.handle_future_msgs(&committee, &mut self.handler).await + if let Some(m) = ctx + .handle_future_msgs(&committee, self.handler.clone()) + .await { return Ok(m); } - ctx.event_loop(&committee, &mut self.handler, &mut self.timeout_millis) - .await + ctx.event_loop( + &committee, + self.handler.clone(), + &mut self.timeout_millis, + ) + .await } pub fn name(&self) -> &'static str { diff --git a/consensus/src/selection/step.rs b/consensus/src/selection/step.rs index 0762ce15eb..286ade4f6d 100644 --- a/consensus/src/selection/step.rs +++ b/consensus/src/selection/step.rs @@ -22,21 +22,25 @@ pub struct Selection where T: Operations, { - handler: handler::Selection, + handler: Arc>>, bg: Generator, timeout_millis: u64, } -impl Selection { - pub fn new(executor: Arc>, db: Arc>) -> Self { +impl Selection { + pub fn new( + executor: Arc>, + _db: Arc>, + handler: Arc>>, + ) -> Self { Self { timeout_millis: config::CONSENSUS_TIMEOUT_MS, - handler: handler::Selection { db }, + handler, bg: Generator::new(executor), } } - pub fn reinitialize(&mut self, _msg: &Message, round: u64, step: u8) { + pub async fn reinitialize(&mut self, _msg: &Message, round: u64, step: u8) { // To be aligned with the original impl, Selection does not double its // timeout settings self.timeout_millis = config::CONSENSUS_TIMEOUT_MS; @@ -52,7 +56,7 @@ impl Selection { pub async fn run( &mut self, - mut ctx: ExecutionCtx<'_>, + mut ctx: ExecutionCtx<'_, D, T>, committee: Committee, ) -> Result { if committee.am_member() { @@ -69,6 +73,8 @@ impl Selection { // register new candidate in local state match self .handler + .lock() + .await .collect( msg.clone(), &ctx.round_update, @@ -92,14 +98,19 @@ impl Selection { } // handle queued messages for current round and step. - if let Some(m) = - ctx.handle_future_msgs(&committee, &mut self.handler).await + if let Some(m) = ctx + .handle_future_msgs(&committee, self.handler.clone()) + .await { return Ok(m); } - ctx.event_loop(&committee, &mut self.handler, &mut self.timeout_millis) - .await + ctx.event_loop( + &committee, + self.handler.clone(), + &mut self.timeout_millis, + ) + .await } pub fn name(&self) -> &'static str { From 626fce2043a92c172572ba95c4b4ce42fd17dad5 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:43:18 +0300 Subject: [PATCH 14/17] consensus: Add step number to aggregator key --- consensus/src/aggregator.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/consensus/src/aggregator.rs b/consensus/src/aggregator.rs index 87c2311f77..92c9c627a9 100644 --- a/consensus/src/aggregator.rs +++ b/consensus/src/aggregator.rs @@ -18,7 +18,9 @@ use tracing::{debug, error, warn}; /// voters.StepVotes Mapping of a block hash to both an aggregated signatures /// and a cluster of bls voters. #[derive(Default)] -pub struct Aggregator(BTreeMap)>); +pub struct Aggregator( + BTreeMap<(u8, Hash), (AggrSignature, Cluster)>, +); impl Aggregator { pub fn collect_vote( @@ -27,6 +29,7 @@ impl Aggregator { header: &Header, signature: &[u8; 48], ) -> Option<(Hash, StepVotes)> { + let msg_step = header.step; // Get weight for this pubkey bls. If votes_for returns None, it means // the key is not a committee member, respectively we should not // process a vote from it. @@ -35,7 +38,7 @@ impl Aggregator { let (aggr_sign, cluster) = self .0 - .entry(hash) + .entry((msg_step, hash)) .or_insert((AggrSignature::default(), Cluster::new())); // Each committee has 64 slots. If a Provisioner is extracted into @@ -173,8 +176,8 @@ mod tests { use rand::rngs::StdRng; use rand::SeedableRng; impl Aggregator { - pub fn get_total(&self, hash: Hash) -> Option { - if let Some(value) = self.0.get(&hash) { + pub fn get_total(&self, step: u8, hash: Hash) -> Option { + if let Some(value) = self.0.get(&(step, hash)) { return Some(value.1.total_occurrences()); } None @@ -264,12 +267,15 @@ mod tests { // Check collected votes assert!(a.collect_vote(&c, h, signature).is_none()); collected_votes += expected_votes[i]; - assert_eq!(a.get_total(block_hash), Some(collected_votes)); + assert_eq!(a.get_total(h.step, block_hash), Some(collected_votes)); // Ensure a duplicated vote is discarded if i == 0 { assert!(a.collect_vote(&c, h, signature).is_none()); - assert_eq!(a.get_total(block_hash), Some(collected_votes)); + assert_eq!( + a.get_total(h.step, block_hash), + Some(collected_votes) + ); } } } From 6c54f9e229c552f5e5e4d728b94ca19b7e699721 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:47:10 +0300 Subject: [PATCH 15/17] consensus: Fix clippy --- consensus/src/execution_ctx.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index 11cbda0045..d8062de8fa 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -251,12 +251,12 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { msg_step, ); - if msg_step + 1 <= self.step { - self.try_vote(msg_step + 1, &candidate, Topics::FirstReduction); + if msg_step < self.step { + self.try_vote(msg_step + 1, candidate, Topics::FirstReduction); } if msg_step + 2 <= self.step { - self.try_vote(msg_step + 2, &candidate, Topics::SecondReduction); + self.try_vote(msg_step + 2, candidate, Topics::SecondReduction); } } From 2ab1822592af721ffea4b9286d84bf67304ae911 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:55:23 +0300 Subject: [PATCH 16/17] node: Apply new msg IDs. Pass committee size to verify_step_votes. --- node/src/chain.rs | 3 ++- node/src/chain/acceptor.rs | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index 6d3999b9a2..bc1954a200 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -43,7 +43,8 @@ use self::fsm::SimpleFSM; const TOPICS: &[u8] = &[ Topics::Block as u8, Topics::NewBlock as u8, - Topics::Reduction as u8, + Topics::FirstReduction as u8, + Topics::SecondReduction as u8, Topics::AggrAgreement as u8, Topics::Agreement as u8, ]; diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 98b2a6b185..435e3eeb41 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -31,6 +31,7 @@ use tokio::sync::{oneshot, Mutex, RwLock}; use tokio::task::JoinHandle; use tracing::{error, info, warn}; +use dusk_consensus::config; use std::any; use super::consensus::Task; @@ -454,6 +455,7 @@ async fn verify_block_cert( curr_seed, &hdr, 0, + config::FIRST_REDUCTION_COMMITTEE_SIZE, ) .await { @@ -467,6 +469,7 @@ async fn verify_block_cert( curr_seed, &hdr, 1, + config::SECOND_REDUCTION_COMMITTEE_SIZE, ) .await { From e597ea70a6b96cdbe93be1cbf3c0d1a4543c6d04 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 13:30:55 +0300 Subject: [PATCH 17/17] consensus: Send agreement msg once it was generated without breaking the iteration loop --- consensus/src/consensus.rs | 10 ++++++---- consensus/src/execution_ctx.rs | 14 +++++--------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 4b949ffb8a..3de6f2cdfa 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -260,9 +260,13 @@ impl Consensus { .await?; // During execution of any step we may encounter that an - // agreement is generated for a former iteration. + // agreement is generated for a former or current iteration. if msg.topic() == Topics::Agreement { - break; + Self::send_agreement( + &mut agr_inbound_queue, + msg.clone(), + ) + .await; } if step >= config::CONSENSUS_MAX_STEP { @@ -274,8 +278,6 @@ impl Consensus { // Delegate (agreement) message result to agreement loop for // further processing. - - Self::send_agreement(&mut agr_inbound_queue, msg.clone()).await; } }) } diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index d8062de8fa..87838a66c6 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -260,14 +260,14 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { } } - fn try_vote(&mut self, step: u8, candidate: &Block, topic: Topics) { - if let Some(committee) = self.iter_ctx.get_committee(step) { + fn try_vote(&mut self, msg_step: u8, candidate: &Block, topic: Topics) { + if let Some(committee) = self.iter_ctx.get_committee(msg_step) { if committee.am_member() { debug!( event = "vote for former candidate", step_topic = format!("{:?}", topic), hash = node_data::ledger::to_str(&candidate.header().hash), - step, + msg_step, ); spawn_send_reduction( @@ -276,7 +276,7 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { candidate.clone(), self.round_update.pubkey_bls.clone(), self.round_update.clone(), - step, + msg_step, self.outbound.clone(), self.inbound.clone(), self.executor.clone(), @@ -284,11 +284,7 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { ); }; } else { - error!( - event = "committee not found", - step = self.step, - msg_step = step - ); + error!(event = "committee not found", step = self.step, msg_step); } }