From cd30bdb591ee9dcdd6d1001d0eb07f2bf7cba366 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:48:59 +0300 Subject: [PATCH 01/31] node-data: Use different topic IDs for 1st and 2nd reduction message --- node-data/src/ledger.rs | 2 +- node-data/src/message.rs | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/node-data/src/ledger.rs b/node-data/src/ledger.rs index f90beb06d2..ef368af39a 100644 --- a/node-data/src/ledger.rs +++ b/node-data/src/ledger.rs @@ -243,7 +243,7 @@ impl Block { } } -#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] +#[derive(Debug, Default, Clone, Copy, Eq, Hash, PartialEq)] #[cfg_attr(any(feature = "faker", test), derive(Dummy))] pub struct StepVotes { pub bitset: u64, diff --git a/node-data/src/message.rs b/node-data/src/message.rs index ce30718238..3f4c58af9d 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -118,7 +118,7 @@ impl Serializable for Message { Topics::NewBlock => { Payload::NewBlock(Box::new(payload::NewBlock::read(r)?)) } - Topics::Reduction => { + Topics::FirstReduction | Topics::SecondReduction => { Payload::Reduction(payload::Reduction::read(r)?) } Topics::Agreement => { @@ -312,7 +312,8 @@ fn is_consensus_msg(topic: u8) -> bool { matches!( Topics::from(topic), Topics::NewBlock - | Topics::Reduction + | Topics::FirstReduction + | Topics::SecondReduction | Topics::Agreement | Topics::AggrAgreement ) @@ -642,8 +643,8 @@ pub mod payload { /// Generates a certificate from agreement. pub fn generate_certificate(&self) -> Certificate { Certificate { - first_reduction: self.first_step.clone(), - second_reduction: self.second_step.clone(), + first_reduction: self.first_step, + second_reduction: self.second_step, } } } @@ -898,11 +899,12 @@ pub enum Topics { // Consensus main loop topics Candidate = 15, NewBlock = 16, - Reduction = 17, + FirstReduction = 17, + SecondReduction = 18, // Consensus Agreement loop topics - Agreement = 18, - AggrAgreement = 19, + Agreement = 19, + AggrAgreement = 20, #[default] Unknown = 255, @@ -919,7 +921,8 @@ impl From for Topics { map_topic!(v, Topics::Candidate); map_topic!(v, Topics::GetCandidate); map_topic!(v, Topics::NewBlock); - map_topic!(v, Topics::Reduction); + map_topic!(v, Topics::FirstReduction); + map_topic!(v, Topics::SecondReduction); map_topic!(v, Topics::Agreement); map_topic!(v, Topics::AggrAgreement); From c2c8bce557722ee7d6874c29fe95bbd332e0ec7f Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:51:59 +0300 Subject: [PATCH 02/31] consensus: Apply a delay in block generator based on EST call time execution --- consensus/src/selection/block_generator.rs | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/consensus/src/selection/block_generator.rs b/consensus/src/selection/block_generator.rs index e691faf43b..c1107104d6 100644 --- a/consensus/src/selection/block_generator.rs +++ b/consensus/src/selection/block_generator.rs @@ -18,7 +18,7 @@ use node_data::ledger; use node_data::message::payload::NewBlock; use node_data::message::{Header, Message, Topics}; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::Mutex; use tracing::{debug, info}; @@ -43,6 +43,8 @@ impl Generator { .sign(ru.pubkey_bls.inner(), &ru.seed.inner()[..]) .to_bytes(); + let start = Instant::now(); + let candidate = self .generate_block( &ru.pubkey_bls, @@ -54,6 +56,15 @@ impl Generator { ) .await?; + info!( + event = "gen_candidate", + hash = &to_str(&candidate.header().hash), + state_hash = &to_str(&candidate.header().state_hash), + dur = format!("{:?}ms", start.elapsed().as_millis()), + ); + + debug!("block: {:?}", &candidate); + let msg_header = Header { pubkey_bls: ru.pubkey_bls.clone(), round: ru.round, @@ -64,14 +75,6 @@ impl Generator { let signature = msg_header.sign(&ru.secret_key, ru.pubkey_bls.inner()); - info!( - event = "gen_candidate", - hash = &to_str(&candidate.header().hash), - state_hash = &to_str(&candidate.header().state_hash), - ); - - debug!("block: {:?}", &candidate); - Ok(Message::new_newblock( msg_header, NewBlock { @@ -90,9 +93,7 @@ impl Generator { prev_block_timestamp: i64, iteration: u8, ) -> Result { - // Delay next iteration execution so we avoid consensus-split situation. - tokio::time::sleep(Duration::from_millis(config::CONSENSUS_DELAY_MS)) - .await; + let start_time = Instant::now(); let result = self .executor @@ -128,6 +129,15 @@ impl Generator { iteration, }; + // Apply a delay in block generator accordingly + // In case EST call costs a second (assuming CONSENSUS_DELAY_MS=1000ms), + // we should not sleep here + if let Some(delay) = Duration::from_millis(config::CONSENSUS_DELAY_MS) + .checked_sub(start_time.elapsed()) + { + tokio::time::sleep(delay).await; + } + Ok(Block::new(blk_header, txs).expect("block should be valid")) } From d9798c7c3d259cf6bea844e70f4dc8807e5c5f3b Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:53:34 +0300 Subject: [PATCH 03/31] consensus: Implement StepVotesRegistry struct --- consensus/src/step_votes_reg.rs | 147 ++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 consensus/src/step_votes_reg.rs diff --git a/consensus/src/step_votes_reg.rs b/consensus/src/step_votes_reg.rs new file mode 100644 index 0000000000..ebf7ebfe0c --- /dev/null +++ b/consensus/src/step_votes_reg.rs @@ -0,0 +1,147 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// +// Copyright (c) DUSK NETWORK. All rights reserved. + +use crate::commons::RoundUpdate; +use crate::config::CONSENSUS_MAX_ITER; +use node_data::ledger::to_str; +use node_data::ledger::StepVotes; +use node_data::message::{payload, Message, Topics}; +use std::fmt; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{debug, error}; + +pub(crate) enum SvType { + FirstReduction, + SecondReduction, +} + +#[derive(Default, Copy, Clone)] +struct SvEntry { + // represents candidate block hash + hash: Option<[u8; 32]>, + first_red_sv: StepVotes, + second_red_sv: StepVotes, +} + +impl fmt::Display for SvEntry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let hash = self.hash.unwrap_or_default(); + + write!( + f, + "SvEntry: hash: {}, 1st_red: {:?}, 2nd_red: {:?}", + to_str(&hash), + self.first_red_sv, + self.second_red_sv, + ) + } +} + +impl SvEntry { + pub(crate) fn add_sv( + &mut self, + iter: u8, + hash: [u8; 32], + sv: StepVotes, + svt: SvType, + ) -> bool { + // Discard empty hashes + if hash == [0u8; 32] { + return false; + } + + if let Some(h) = self.hash { + if h != hash { + // Only one hash can be registered per a single iteration + error!(desc = "multiple candidates per iter"); + return false; + } + } else { + self.hash = Some(hash); + } + + match svt { + SvType::FirstReduction => self.first_red_sv = sv, + SvType::SecondReduction => self.second_red_sv = sv, + } + + debug!(event = "add_sv", iter, data = format!("{}", self)); + self.is_ready() + } + + fn is_ready(&self) -> bool { + !self.second_red_sv.is_empty() + && !self.first_red_sv.is_empty() + && self.hash.is_some() + } +} + +pub(crate) type SafeStepVotesRegistry = Arc>; + +pub(crate) struct StepVotesRegistry { + ru: RoundUpdate, + sv_table: [SvEntry; CONSENSUS_MAX_ITER as usize], +} + +impl StepVotesRegistry { + pub(crate) fn new(ru: RoundUpdate) -> Self { + Self { + ru, + sv_table: [SvEntry::default(); CONSENSUS_MAX_ITER as usize], + } + } + + /// Adds step votes per iteration + /// Returns an agreement if both reductions for an iteration are available + pub(crate) fn add_step_votes( + &mut self, + step: u8, + hash: [u8; 32], + sv: StepVotes, + svt: SvType, + ) -> Option { + let iter_num = (step - 1) / 3 + 1; + if iter_num as usize >= self.sv_table.len() { + return None; + } + + let r = &mut self.sv_table[iter_num as usize]; + if r.add_sv(iter_num, hash, sv, svt) { + return Some(Self::build_agreement_msg( + self.ru.clone(), + iter_num, + *r, + )); + } + + None + } + + fn build_agreement_msg( + ru: RoundUpdate, + iteration: u8, + result: SvEntry, + ) -> Message { + let hdr = node_data::message::Header { + pubkey_bls: ru.pubkey_bls.clone(), + round: ru.round, + step: (iteration - 1) * 3 + 3, + block_hash: result.hash.unwrap_or_default(), + topic: Topics::Agreement as u8, + }; + + let signature = hdr.sign(&ru.secret_key, ru.pubkey_bls.inner()); + + let payload = payload::Agreement { + signature, + first_step: result.first_red_sv, + second_step: result.second_red_sv, + }; + + Message::new_agreement(hdr, payload) + } +} From ea8cf124ead19f3fb50a4ae481b25a7262f79151 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:57:08 +0300 Subject: [PATCH 04/31] consensus: Record any generated step_votes in StepVotes registry --- consensus/src/firststep/handler.rs | 38 ++++++++++++++++++++---- consensus/src/secondstep/handler.rs | 45 +++++++++++++++++++++++------ consensus/src/selection/handler.rs | 14 ++++++++- 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/consensus/src/firststep/handler.rs b/consensus/src/firststep/handler.rs index 824c43bec3..b9a4a4754f 100644 --- a/consensus/src/firststep/handler.rs +++ b/consensus/src/firststep/handler.rs @@ -6,14 +6,15 @@ use std::sync::Arc; +use crate::aggregator::Aggregator; use crate::commons::{ConsensusError, Database, RoundUpdate}; use crate::msg_handler::{HandleMsgOutput, MsgHandler}; +use crate::step_votes_reg::{SafeStepVotesRegistry, SvType}; use async_trait::async_trait; use node_data::ledger; use node_data::ledger::{Block, StepVotes}; use tokio::sync::Mutex; -use crate::aggregator::Aggregator; use crate::user::committee::Committee; use node_data::message::{payload, Message, Payload}; @@ -45,25 +46,32 @@ fn final_result_with_timeout( )) } -#[derive(Default)] pub struct Reduction { + sv_registry: SafeStepVotesRegistry, + pub(crate) db: Arc>, pub(crate) aggr: Aggregator, pub(crate) candidate: Block, + curr_step: u8, } impl Reduction { - pub(crate) fn new(db: Arc>) -> Self { + pub(crate) fn new( + db: Arc>, + sv_registry: SafeStepVotesRegistry, + ) -> Self { Self { + sv_registry, db, aggr: Aggregator::default(), candidate: Block::default(), + curr_step: 0, } } - pub(crate) fn reset(&mut self) { - self.aggr = Aggregator::default(); + pub(crate) fn reset(&mut self, curr_step: u8) { self.candidate = Block::default(); + self.curr_step = curr_step; } } @@ -95,7 +103,7 @@ impl MsgHandler for Reduction { &mut self, msg: Message, _ru: &RoundUpdate, - _step: u8, + step: u8, committee: &Committee, ) -> Result { let signature = match &msg.payload { @@ -108,6 +116,24 @@ impl MsgHandler for Reduction { if let Some((hash, sv)) = self.aggr.collect_vote(committee, &msg.header, &signature) { + // Record result in global round registry of all non-Nil step_votes + if hash != [0u8; 32] { + if let Some(m) = self.sv_registry.lock().await.add_step_votes( + step, + hash, + sv, + SvType::FirstReduction, + ) { + return Ok(HandleMsgOutput::FinalResult(m)); + } + + // If step is different from the current one, this means + // * collect * func is being called from different iteration + if step != self.curr_step { + return Ok(HandleMsgOutput::Result(msg)); + } + } + // if the votes converged for an empty hash we invoke halt if hash == [0u8; 32] { tracing::warn!("votes converged for an empty hash"); diff --git a/consensus/src/secondstep/handler.rs b/consensus/src/secondstep/handler.rs index 48f23a2ff3..9180d00903 100644 --- a/consensus/src/secondstep/handler.rs +++ b/consensus/src/secondstep/handler.rs @@ -6,6 +6,7 @@ use crate::commons::{ConsensusError, RoundUpdate}; use crate::msg_handler::{HandleMsgOutput, MsgHandler}; +use crate::step_votes_reg::{SafeStepVotesRegistry, SvType}; use async_trait::async_trait; use node_data::ledger; use node_data::ledger::{Hash, Signature, StepVotes}; @@ -17,8 +18,11 @@ use node_data::message::{payload, Message, Payload, Topics}; use crate::user::committee::Committee; pub struct Reduction { - pub(crate) aggr: Aggregator, + pub(crate) sv_registry: SafeStepVotesRegistry, + + pub(crate) aggregator: Aggregator, pub(crate) first_step_votes: StepVotes, + pub(crate) curr_step: u8, } #[async_trait] @@ -59,12 +63,26 @@ impl MsgHandler for Reduction { }?; // Collect vote, if msg payload is of reduction type - if let Some((block_hash, second_step_votes)) = - self.aggr.collect_vote(committee, &msg.header, &signed_hash) + if let Some((block_hash, second_step_votes)) = self + .aggregator + .collect_vote(committee, &msg.header, &signed_hash) { - // At that point, we have reached a quorum for 2th_reduction on an - // empty on non-empty block. Return an empty message as - // this iteration terminates here. + if block_hash != [0u8; 32] { + // Record result in global round results registry + if let Some(m) = self.sv_registry.lock().await.add_step_votes( + step, + block_hash, + second_step_votes, + SvType::SecondReduction, + ) { + return Ok(HandleMsgOutput::FinalResult(m)); + } + + if step != self.curr_step { + return Ok(HandleMsgOutput::Result(msg)); + } + } + return Ok(HandleMsgOutput::FinalResult(self.build_agreement_msg( ru, step, @@ -87,6 +105,15 @@ impl MsgHandler for Reduction { } impl Reduction { + pub(crate) fn new(sv_registry: SafeStepVotesRegistry) -> Self { + Self { + sv_registry, + aggregator: Default::default(), + first_step_votes: Default::default(), + curr_step: 0, + } + } + fn build_agreement_msg( &self, ru: &RoundUpdate, @@ -105,15 +132,15 @@ impl Reduction { let signature = hdr.sign(&ru.secret_key, ru.pubkey_bls.inner()); let payload = payload::Agreement { signature, - first_step: self.first_step_votes.clone(), + first_step: self.first_step_votes, second_step: second_step_votes, }; Message::new_agreement(hdr, payload) } - pub(crate) fn reset(&mut self) { - self.aggr = Aggregator::default(); + pub(crate) fn reset(&mut self, step: u8) { self.first_step_votes = StepVotes::default(); + self.curr_step = step; } } diff --git a/consensus/src/selection/handler.rs b/consensus/src/selection/handler.rs index a91e418b59..eae05febca 100644 --- a/consensus/src/selection/handler.rs +++ b/consensus/src/selection/handler.rs @@ -7,15 +7,17 @@ use crate::commons::{ConsensusError, Database, RoundUpdate}; use crate::merkle::merkle_root; use crate::msg_handler::{HandleMsgOutput, MsgHandler}; +use crate::step_votes_reg::SafeStepVotesRegistry; use crate::user::committee::Committee; use async_trait::async_trait; + use node_data::message::{Message, Payload}; use std::sync::Arc; use tokio::sync::Mutex; -#[derive(Debug, Default)] pub struct Selection { pub(crate) db: Arc>, + pub(crate) _sv_registry: SafeStepVotesRegistry, } #[async_trait] @@ -65,6 +67,16 @@ impl MsgHandler for Selection { } impl Selection { + pub(crate) fn new( + db: Arc>, + sv_registry: SafeStepVotesRegistry, + ) -> Self { + Self { + db, + _sv_registry: sv_registry, + } + } + fn verify_new_block( &self, msg: &Message, From 79f269567d9faddaed5aa66d38eca74afcb91f50 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 10:58:12 +0300 Subject: [PATCH 05/31] consensus: Implement Committee::get_unique_members --- consensus/src/user/committee.rs | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/consensus/src/user/committee.rs b/consensus/src/user/committee.rs index 50348c33bf..7e3ceed318 100644 --- a/consensus/src/user/committee.rs +++ b/consensus/src/user/committee.rs @@ -10,7 +10,7 @@ use crate::user::sortition; use super::cluster::Cluster; use crate::config; use node_data::bls::PublicKey; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; use std::mem; @@ -22,6 +22,17 @@ pub struct Committee { cfg: sortition::Config, } +impl Iterator for Committee { + type Item = PublicKey; + + fn next(&mut self) -> Option { + self.members + .iter() + .next() + .map(|(public_key, _)| public_key.clone()) + } +} + #[allow(unused)] impl Committee { /// Generates a new committee from the given provisioners state and @@ -176,6 +187,20 @@ impl CommitteeSet { self.get_or_create(cfg).is_member(pubkey) } + /// Returns number of all unique public keys + pub fn get_unique_members(&self) -> usize { + let mut merged = HashSet::new(); + self.committees.iter().for_each(|(_, committee)| { + committee.members.iter().for_each(|(m, s)| { + if *s > 0 { + merged.insert(m.bytes()); + } + }); + }); + + merged.len() + } + pub fn votes_for( &mut self, pubkey: &PublicKey, From a27b5f6f876fd509548eacccdf421e5c22ae6fc1 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:00:20 +0300 Subject: [PATCH 06/31] consensus: Pass topic ID in spawn_send_reduction --- consensus/src/commons.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/consensus/src/commons.rs b/consensus/src/commons.rs index cb01faee87..8c6b2371a0 100644 --- a/consensus/src/commons.rs +++ b/consensus/src/commons.rs @@ -11,6 +11,7 @@ use crate::contract_state::Operations; use node_data::ledger::*; use node_data::message; +use node_data::message::Topics; use tracing::Instrument; use crate::contract_state::CallParams; @@ -80,6 +81,7 @@ pub fn spawn_send_reduction( outbound: AsyncQueue, inbound: AsyncQueue, executor: Arc>, + topic: Topics, ) { let hash = to_str(&candidate.header().hash); @@ -168,7 +170,7 @@ pub fn spawn_send_reduction( round: ru.round, step, block_hash: hash, - topic: message::Topics::Reduction as u8, + topic: topic.into(), }; let signature = hdr.sign(&ru.secret_key, ru.pubkey_bls.inner()); From e18bdf3b9b168b0cc8cc2859bec6915a36f9217a Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:02:25 +0300 Subject: [PATCH 07/31] consensus: Make module handler pub(crate) --- consensus/src/firststep.rs | 2 +- consensus/src/lib.rs | 1 + consensus/src/secondstep.rs | 2 +- consensus/src/selection.rs | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/consensus/src/firststep.rs b/consensus/src/firststep.rs index fc2d27f0e5..5ac78db4d2 100644 --- a/consensus/src/firststep.rs +++ b/consensus/src/firststep.rs @@ -4,5 +4,5 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -mod handler; +pub(crate) mod handler; pub mod step; diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 93add84ba5..47d7a560c9 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -21,6 +21,7 @@ mod phase; mod queue; mod secondstep; mod selection; +mod step_votes_reg; pub mod merkle; diff --git a/consensus/src/secondstep.rs b/consensus/src/secondstep.rs index fc2d27f0e5..5ac78db4d2 100644 --- a/consensus/src/secondstep.rs +++ b/consensus/src/secondstep.rs @@ -4,5 +4,5 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -mod handler; +pub(crate) mod handler; pub mod step; diff --git a/consensus/src/selection.rs b/consensus/src/selection.rs index 98dff72df7..0f2ec9a427 100644 --- a/consensus/src/selection.rs +++ b/consensus/src/selection.rs @@ -5,5 +5,5 @@ // Copyright (c) DUSK NETWORK. All rights reserved. mod block_generator; -mod handler; +pub(crate) mod handler; pub mod step; From 395a47ab9b51a658a35540b7d4f6c5392e50143c Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:08:07 +0300 Subject: [PATCH 08/31] consensus: Introduce vote-for-former candidate patch - Save any generated committee for future use - Collect consensus messages from former iterations - Handle past messages in the context of tue associated step - Try to vote for former candidate, if committee member - Remove a redundant phase::is_valid call --- consensus/src/execution_ctx.rs | 292 ++++++++++++++++++++++++++------- 1 file changed, 237 insertions(+), 55 deletions(-) diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index b26847222b..11cbda0045 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -4,7 +4,11 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. +use crate::commons::spawn_send_reduction; +use crate::commons::Database; use crate::commons::{ConsensusError, RoundUpdate}; +use crate::config::CONSENSUS_MAX_TIMEOUT_MS; +use crate::contract_state::Operations; use crate::msg_handler::HandleMsgOutput::{ FinalResult, FinalResultWithTimeoutIncrease, }; @@ -13,21 +17,27 @@ use crate::queue::Queue; use crate::user::committee::Committee; use crate::user::provisioners::Provisioners; use crate::user::sortition; +use crate::{firststep, secondstep, selection}; +use node_data::ledger::Block; +use node_data::message::Payload; use node_data::message::{AsyncQueue, Message, Topics}; use std::cmp; -use tokio::task::JoinSet; - -use crate::config::CONSENSUS_MAX_TIMEOUT_MS; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; +use tokio::task::JoinSet; use tokio::time; use tokio::time::Instant; use tracing::{debug, error, info, trace}; /// Represents a shared state within a context of the exection of a single /// iteration. -pub struct IterationCtx { +pub struct IterationCtx { + first_reduction_handler: Arc>>, + sec_reduction_handler: Arc>, + selection_handler: Arc>>, + pub join_set: JoinSet<()>, /// verified candidate hash @@ -38,21 +48,78 @@ pub struct IterationCtx { round: u64, iter: u8, + + /// Stores any committee already generated in the execution of any + /// iteration of current round + committees: HashMap, } -impl IterationCtx { - pub fn new(round: u64, step: u8) -> Self { +impl IterationCtx { + pub fn new( + round: u64, + iter: u8, + selection_handler: Arc>>, + first_reduction_handler: Arc>>, + sec_reduction_handler: Arc>, + ) -> Self { Self { round, join_set: JoinSet::new(), - iter: step / 3 + 1, + iter, verified_hash: Arc::new(Mutex::new([0u8; 32])), + selection_handler, + first_reduction_handler, + sec_reduction_handler, + committees: Default::default(), } } -} -impl Drop for IterationCtx { - fn drop(&mut self) { + pub(crate) async fn collect_past_event( + &self, + ru: &RoundUpdate, + msg: &Message, + ) -> Option { + let committee = self.committees.get(&msg.header.step)?; + match msg.topic() { + node_data::message::Topics::NewBlock => { + let mut handler = self.selection_handler.lock().await; + _ = handler + .collect(msg.clone(), ru, msg.header.step, committee) + .await; + } + node_data::message::Topics::FirstReduction => { + let mut handler = self.first_reduction_handler.lock().await; + if let Ok(FinalResult(m)) = handler + .collect(msg.clone(), ru, msg.header.step, committee) + .await + { + return Some(m); + } + } + node_data::message::Topics::SecondReduction => { + let mut handler = self.sec_reduction_handler.lock().await; + if let Ok(FinalResult(m)) = handler + .collect(msg.clone(), ru, msg.header.step, committee) + .await + { + return Some(m); + } + } + _ => {} + }; + + None + } + + pub(crate) fn get_committee(&mut self, step: u8) -> Option<&Committee> { + self.committees.get(&step) + } + + pub(crate) fn on_iteration_begin(&mut self, iter: u8) { + self.iter = iter; + } + + pub(crate) fn on_iteration_end(&mut self) { debug!( event = "iter completed", len = self.join_set.len(), @@ -63,10 +130,16 @@ impl Drop for IterationCtx { } } +impl Drop for IterationCtx { + fn drop(&mut self) { + self.on_iteration_end(); + } +} + /// ExecutionCtx encapsulates all data needed by a single step to be fully /// executed. -pub struct ExecutionCtx<'a> { - pub iter_ctx: &'a mut IterationCtx, +pub struct ExecutionCtx<'a, DB: Database, T> { + pub iter_ctx: &'a mut IterationCtx, /// Messaging-related fields pub inbound: AsyncQueue, @@ -79,19 +152,22 @@ pub struct ExecutionCtx<'a> { // Round/Step parameters pub round_update: RoundUpdate, pub step: u8, + + executor: Arc>, } -impl<'a> ExecutionCtx<'a> { +impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { /// Creates step execution context. #[allow(clippy::too_many_arguments)] pub fn new( - iter_ctx: &'a mut IterationCtx, + iter_ctx: &'a mut IterationCtx, inbound: AsyncQueue, outbound: AsyncQueue, future_msgs: Arc>>, provisioners: &'a mut Provisioners, round_update: RoundUpdate, step: u8, + executor: Arc>, ) -> Self { Self { iter_ctx, @@ -101,9 +177,14 @@ impl<'a> ExecutionCtx<'a> { provisioners, round_update, step, + executor, } } + pub(crate) fn save_committee(&mut self, step: u8, committee: Committee) { + self.iter_ctx.committees.insert(step, committee); + } + /// Runs a loop that collects both inbound messages and timeout event. /// /// It accepts an instance of MsgHandler impl (phase var) and calls its @@ -116,7 +197,7 @@ impl<'a> ExecutionCtx<'a> { pub async fn event_loop>( &mut self, committee: &Committee, - phase: &mut C, + phase: Arc>, timeout_millis: &mut u64, ) -> Result { debug!(event = "run event_loop"); @@ -137,7 +218,7 @@ impl<'a> ExecutionCtx<'a> { if let Some(step_result) = self .process_inbound_msg( committee, - phase, + phase.clone(), msg, timeout_millis, ) @@ -153,12 +234,102 @@ impl<'a> ExecutionCtx<'a> { info!(event = "timeout-ed"); Self::increase_timeout(timeout_millis); - return self.process_timeout_event(phase); + return self.process_timeout_event(phase.clone()).await; } } } } + pub(crate) async fn vote_for_former_candidate( + &mut self, + msg_step: u8, + candidate: &Block, + ) { + debug!( + event = "former candidate received", + hash = node_data::ledger::to_str(&candidate.header().hash), + msg_step, + ); + + if msg_step + 1 <= self.step { + self.try_vote(msg_step + 1, &candidate, Topics::FirstReduction); + } + + if msg_step + 2 <= self.step { + self.try_vote(msg_step + 2, &candidate, Topics::SecondReduction); + } + } + + fn try_vote(&mut self, step: u8, candidate: &Block, topic: Topics) { + if let Some(committee) = self.iter_ctx.get_committee(step) { + if committee.am_member() { + debug!( + event = "vote for former candidate", + step_topic = format!("{:?}", topic), + hash = node_data::ledger::to_str(&candidate.header().hash), + step, + ); + + spawn_send_reduction( + &mut self.iter_ctx.join_set, + Arc::new(Mutex::new([0u8; 32])), + candidate.clone(), + self.round_update.pubkey_bls.clone(), + self.round_update.clone(), + step, + self.outbound.clone(), + self.inbound.clone(), + self.executor.clone(), + topic, + ); + }; + } else { + error!( + event = "committee not found", + step = self.step, + msg_step = step + ); + } + } + + /// Process messages from past + async fn process_past_events(&mut self, msg: &Message) -> Option { + if msg.header.block_hash == [0u8; 32] + || msg.header.round != self.round_update.round + { + return None; + } + + if let Err(e) = self.outbound.send(msg.clone()).await { + error!("could not send msg due to {:?}", e); + } + + // Try to vote for candidate block from former iteration + if let Payload::NewBlock(p) = &msg.payload { + // TODO: Perform block header/ Certificate full verification + // To be addressed with another PR + + self.vote_for_former_candidate(msg.header.step, &p.candidate) + .await; + } + + if let Some(m) = self + .iter_ctx + .collect_past_event(&self.round_update, msg) + .await + { + if m.header.topic == Topics::Agreement as u8 { + debug!( + event = "agreement from former iter", + msg_step = m.header.step + ); + return Some(m); + } + } + + None + } + /// Delegates the received message to the Phase handler for further /// processing. /// @@ -167,17 +338,19 @@ impl<'a> ExecutionCtx<'a> { async fn process_inbound_msg>( &mut self, committee: &Committee, - phase: &mut C, + phase: Arc>, msg: Message, timeout_millis: &mut u64, ) -> Option { - // Check if a message is fully valid. If so, then it can be broadcast. - match phase.is_valid( + // Check if message is valid in the context of current step + let ret = phase.lock().await.is_valid( msg.clone(), &self.round_update, self.step, committee, - ) { + ); + + match ret { Ok(msg) => { // Re-publish the returned message self.outbound.send(msg).await.unwrap_or_else(|err| { @@ -198,23 +371,27 @@ impl<'a> ExecutionCtx<'a> { msg.header.step, msg, ); + + return None; } ConsensusError::PastEvent => { - trace!("discard message from past {:#?}", msg); + return self.process_past_events(&msg).await; } _ => { error!("phase handler err: {:?}", e); + return None; } } - - return None; } } - match phase - .collect(msg.clone(), &self.round_update, self.step, committee) + let ret = phase + .lock() .await - { + .collect(msg.clone(), &self.round_update, self.step, committee) + .await; + + match ret { Ok(output) => { trace!("message collected {:#?}", msg); @@ -233,7 +410,13 @@ impl<'a> ExecutionCtx<'a> { } } Err(e) => { - error!("phase collect return err: {:?}", e); + error!( + event = "failed collect", + err = format!("{:?}", e), + msg_topic = format!("{:?}", msg.topic()), + msg_step = msg.header.step, + msg_round = msg.header.round, + ); } } @@ -242,12 +425,14 @@ impl<'a> ExecutionCtx<'a> { /// Delegates the received event of timeout to the Phase handler for further /// processing. - fn process_timeout_event>( + async fn process_timeout_event>( &mut self, - phase: &mut C, + phase: Arc>, ) -> Result { - if let Ok(FinalResult(msg)) = - phase.handle_timeout(&self.round_update, self.step) + if let Ok(FinalResult(msg)) = phase + .lock() + .await + .handle_timeout(&self.round_update, self.step) { return Ok(msg); } @@ -262,7 +447,7 @@ impl<'a> ExecutionCtx<'a> { pub async fn handle_future_msgs>( &self, committee: &Committee, - phase: &mut C, + phase: Arc>, ) -> Option { if let Some(messages) = self .future_msgs @@ -275,39 +460,36 @@ impl<'a> ExecutionCtx<'a> { } for msg in messages { - if let Ok(msg) = phase.is_valid( + let ret = phase.lock().await.is_valid( msg, &self.round_update, self.step, committee, - ) { - // Check if a message is fully valid. If so, then it can be - // broadcast. - if let Ok(msg) = phase.is_valid( - msg.clone(), - &self.round_update, - self.step, - committee, - ) { - // Re-publish the drained message - debug!( - event = "republish", - src = "future_msgs", - msg_step = msg.header.step, - msg_round = msg.header.round, - msg_topic = - format!("{:?}", Topics::from(msg.header.topic)) - ); + ); + + if let Ok(msg) = ret { + // Re-publish a drained message + debug!( + event = "republish", + src = "future_msgs", + msg_step = msg.header.step, + msg_round = msg.header.round, + msg_topic = + format!("{:?}", Topics::from(msg.header.topic)) + ); - self.outbound.send(msg).await.unwrap_or_else(|err| { + self.outbound.send(msg.clone()).await.unwrap_or_else( + |err| { error!( "unable to re-publish a drained msg {:?}", err ) - }); - } + }, + ); if let Ok(FinalResult(msg)) = phase + .lock() + .await .collect(msg, &self.round_update, self.step, committee) .await { From c1f3118a58b7671ce32de70e94e71aa8db865935 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:12:03 +0300 Subject: [PATCH 09/31] consensus: Upgrade main consensus loop - Share StepVotesRegistry instance amongst all steps - Share Message Handlers between Phase array and IterationCtx instance - Send agreement message for former iterations --- consensus/src/consensus.rs | 63 +++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 0ed81207b1..4b949ffb8a 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -7,8 +7,10 @@ use crate::commons::{ConsensusError, Database, RoundUpdate}; use crate::contract_state::Operations; use crate::phase::Phase; + use node_data::ledger::{to_str, Block}; -use node_data::message::{AsyncQueue, Message, Payload}; + +use node_data::message::{AsyncQueue, Message, Payload, Topics}; use crate::agreement::step; use crate::execution_ctx::{ExecutionCtx, IterationCtx}; @@ -18,6 +20,7 @@ use crate::{config, selection}; use crate::{firststep, secondstep}; use tracing::{error, Instrument}; +use crate::step_votes_reg::StepVotesRegistry; use std::sync::Arc; use tokio::sync::{oneshot, Mutex}; use tokio::task::JoinHandle; @@ -164,34 +167,69 @@ impl Consensus { future_msgs.lock().await.clear_round(ru.round - 1); } + let sv_registry = + Arc::new(Mutex::new(StepVotesRegistry::new(ru.clone()))); + + let sel_handler = + Arc::new(Mutex::new(selection::handler::Selection::new( + db.clone(), + sv_registry.clone(), + ))); + + let first_handler = + Arc::new(Mutex::new(firststep::handler::Reduction::new( + db.clone(), + sv_registry.clone(), + ))); + + let sec_handler = Arc::new(Mutex::new( + secondstep::handler::Reduction::new(sv_registry.clone()), + )); + let mut phases = [ Phase::Selection(selection::step::Selection::new( executor.clone(), db.clone(), + sel_handler.clone(), )), Phase::Reduction1(firststep::step::Reduction::new( executor.clone(), db.clone(), + first_handler.clone(), + )), + Phase::Reduction2(secondstep::step::Reduction::new( + executor.clone(), + sec_handler.clone(), )), - Phase::Reduction2(secondstep::step::Reduction::new(executor)), ]; // Consensus loop // Initialize and run consensus loop - let mut step: u8 = 0; + + let mut iter_num: u8 = 0; + let mut iter_ctx = IterationCtx::new( + ru.round, + iter_num, + sel_handler.clone(), + first_handler.clone(), + sec_handler.clone(), + ); loop { - let mut msg = Message::empty(); - let mut iter_ctx = IterationCtx::new(ru.round, step + 1); + iter_num += 1; + iter_ctx.on_iteration_begin(iter_num); + let mut msg = Message::empty(); // Execute a single iteration - for phase in phases.iter_mut() { - step += 1; + for pos in 0..phases.len() { + let phase = phases.get_mut(pos).unwrap(); + + let step = (iter_num - 1) * 3 + (pos as u8 + 1); let name = phase.name(); // Initialize new phase with message returned by previous // phase. - phase.reinitialize(&msg, ru.round, step); + phase.reinitialize(&msg, ru.round, step).await; // Construct phase execution context let ctx = ExecutionCtx::new( @@ -202,6 +240,7 @@ impl Consensus { &mut provisioners, ru.clone(), step, + executor.clone(), ); // Execute a phase. @@ -220,11 +259,19 @@ impl Consensus { )) .await?; + // During execution of any step we may encounter that an + // agreement is generated for a former iteration. + if msg.topic() == Topics::Agreement { + break; + } + if step >= config::CONSENSUS_MAX_STEP { return Err(ConsensusError::MaxStepReached); } } + iter_ctx.on_iteration_end(); + // Delegate (agreement) message result to agreement loop for // further processing. From b0c1d90d51d039c516cac4090a87fad2684f7b26 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:13:38 +0300 Subject: [PATCH 10/31] consensus: Save any generated committee on step initialization --- consensus/src/phase.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/consensus/src/phase.rs b/consensus/src/phase.rs index 2af3d24120..0d8cec4959 100644 --- a/consensus/src/phase.rs +++ b/consensus/src/phase.rs @@ -7,6 +7,7 @@ use crate::commons::{ConsensusError, Database}; use crate::contract_state::Operations; use crate::execution_ctx::ExecutionCtx; + use node_data::message::Message; use crate::user::committee::Committee; @@ -42,19 +43,19 @@ macro_rules! call_phase { pub enum Phase { Selection(selection::step::Selection), Reduction1(firststep::step::Reduction), - Reduction2(secondstep::step::Reduction), + Reduction2(secondstep::step::Reduction), } impl Phase { - pub fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { + pub async fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { trace!(event = "init step", msg = format!("{:#?}", msg),); - call_phase!(self, reinitialize(msg, round, step)) + await_phase!(self, reinitialize(msg, round, step)) } pub async fn run( &mut self, - ctx: ExecutionCtx<'_>, + mut ctx: ExecutionCtx<'_, D, T>, ) -> Result { debug!(event = "execute_step", timeout = self.get_timeout()); @@ -76,6 +77,8 @@ impl Phase { members = format!("{}", &step_committee) ); + ctx.save_committee(ctx.step, step_committee.clone()); + await_phase!(self, run(ctx, step_committee)) } From 448e4a4453db6eee4e7a99b2e580c101cc84ed82 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:24:49 +0300 Subject: [PATCH 11/31] consensus: Pass committee_size param to both verify_agreement and verify_step_votes --- consensus/src/agreement/verifiers.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/consensus/src/agreement/verifiers.rs b/consensus/src/agreement/verifiers.rs index 20dcc216a4..eb5353593d 100644 --- a/consensus/src/agreement/verifiers.rs +++ b/consensus/src/agreement/verifiers.rs @@ -11,6 +11,7 @@ use crate::user::committee::CommitteeSet; use crate::user::sortition; use bytes::Buf; +use crate::config; use dusk_bytes::Serializable; use node_data::bls::PublicKey; use node_data::message::{marshal_signable_vote, Header, Message, Payload}; @@ -76,6 +77,7 @@ pub async fn verify_agreement( seed, &msg.header, 0, + config::FIRST_REDUCTION_COMMITTEE_SIZE, ) .await .map_err(|e| { @@ -94,6 +96,7 @@ pub async fn verify_agreement( seed, &msg.header, 1, + config::SECOND_REDUCTION_COMMITTEE_SIZE, ) .await .map_err(|e| { @@ -118,13 +121,14 @@ pub async fn verify_step_votes( seed: Seed, hdr: &Header, step_offset: u8, + committee_size: usize, ) -> Result<(), Error> { if hdr.step == 0 { return Err(Error::InvalidStepNum); } let step = hdr.step - 1 + step_offset; - let cfg = sortition::Config::new(seed, hdr.round, step, 64); + let cfg = sortition::Config::new(seed, hdr.round, step, committee_size); verify_votes( &hdr.block_hash, From 5fb969d27388a1b43cd5c12caa792eaa876dd3ff Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:38:50 +0300 Subject: [PATCH 12/31] consensus: Decrease CONSENSUS_TIMEOUT_MS from 20s to 5s --- consensus/src/config.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/consensus/src/config.rs b/consensus/src/config.rs index 05b0b2f907..167ed2f7c4 100644 --- a/consensus/src/config.rs +++ b/consensus/src/config.rs @@ -6,10 +6,13 @@ /// Maximum number of steps Consensus runs per a single round. pub const CONSENSUS_MAX_STEP: u8 = 213; +/// Maximum number of iterations Consensus runs per a single round. +pub const CONSENSUS_MAX_ITER: u8 = CONSENSUS_MAX_STEP / 3; + /// Percentage number that determines a quorum. pub const CONSENSUS_QUORUM_THRESHOLD: f64 = 0.67; /// Initial step timeout in milliseconds. -pub const CONSENSUS_TIMEOUT_MS: u64 = 20 * 1000; +pub const CONSENSUS_TIMEOUT_MS: u64 = 5 * 1000; /// Maximum step timeout. pub const CONSENSUS_MAX_TIMEOUT_MS: u64 = 60 * 1000; @@ -21,6 +24,7 @@ pub const SECOND_REDUCTION_COMMITTEE_SIZE: usize = 64; /// Artifical delay on each selection step. pub const CONSENSUS_DELAY_MS: u64 = 1000; + /// Default number of workers to process agreements. pub const ACCUMULATOR_WORKERS_AMOUNT: usize = 6; pub const ACCUMULATOR_QUEUE_CAP: usize = 100; From f4d41c0786fda65c722d845cf98d48fd14dea881 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:41:11 +0300 Subject: [PATCH 13/31] consensus: Wrap msg handlers in Arc> --- consensus/src/firststep/step.rs | 43 +++++++++++++++++---------- consensus/src/secondstep/step.rs | 51 ++++++++++++++++++++------------ consensus/src/selection/step.rs | 31 ++++++++++++------- 3 files changed, 81 insertions(+), 44 deletions(-) diff --git a/consensus/src/firststep/step.rs b/consensus/src/firststep/step.rs index 21fd1b9d99..7d461a8a53 100644 --- a/consensus/src/firststep/step.rs +++ b/consensus/src/firststep/step.rs @@ -9,9 +9,10 @@ use crate::config; use crate::contract_state::Operations; use crate::execution_ctx::ExecutionCtx; use crate::firststep::handler; + use crate::user::committee::Committee; use node_data::ledger::to_str; -use node_data::message::{Message, Payload}; +use node_data::message::{Message, Payload, Topics}; use std::ops::Deref; use std::sync::Arc; use tokio::sync::Mutex; @@ -20,24 +21,29 @@ use tracing::debug; #[allow(unused)] pub struct Reduction { timeout_millis: u64, - handler: handler::Reduction, + handler: Arc>>, executor: Arc>, } - impl Reduction { - pub fn new(executor: Arc>, db: Arc>) -> Self { + pub(crate) fn new( + executor: Arc>, + _db: Arc>, + handler: Arc>>, + ) -> Self { Self { timeout_millis: config::CONSENSUS_TIMEOUT_MS, - handler: handler::Reduction::new(db), + handler, executor, } } - pub fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { - self.handler.reset(); + pub async fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { + let mut handler = self.handler.lock().await; + + handler.reset(step); if let Payload::NewBlock(p) = msg.clone().payload { - self.handler.candidate = p.deref().candidate.clone(); + handler.candidate = p.deref().candidate.clone(); } debug!( @@ -46,39 +52,46 @@ impl Reduction { round = round, step = step, timeout = self.timeout_millis, - hash = to_str(&self.handler.candidate.header().hash), + hash = to_str(&handler.candidate.header().hash), ) } pub async fn run( &mut self, - mut ctx: ExecutionCtx<'_>, + mut ctx: ExecutionCtx<'_, DB, T>, committee: Committee, ) -> Result { if committee.am_member() { + let candidate = self.handler.lock().await.candidate.clone(); // Send reduction async spawn_send_reduction( &mut ctx.iter_ctx.join_set, ctx.iter_ctx.verified_hash.clone(), - self.handler.candidate.clone(), + candidate, committee.get_my_pubkey().clone(), ctx.round_update.clone(), ctx.step, ctx.outbound.clone(), ctx.inbound.clone(), self.executor.clone(), + Topics::FirstReduction, ); } // handle queued messages for current round and step. - if let Some(m) = - ctx.handle_future_msgs(&committee, &mut self.handler).await + if let Some(m) = ctx + .handle_future_msgs(&committee, self.handler.clone()) + .await { return Ok(m); } - ctx.event_loop(&committee, &mut self.handler, &mut self.timeout_millis) - .await + ctx.event_loop( + &committee, + self.handler.clone(), + &mut self.timeout_millis, + ) + .await } pub fn name(&self) -> &'static str { diff --git a/consensus/src/secondstep/step.rs b/consensus/src/secondstep/step.rs index e50c03b7bf..0ab89d2d0c 100644 --- a/consensus/src/secondstep/step.rs +++ b/consensus/src/secondstep/step.rs @@ -4,44 +4,51 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -use crate::commons::{spawn_send_reduction, ConsensusError}; +use crate::commons::{spawn_send_reduction, ConsensusError, Database}; use crate::config; use crate::contract_state::Operations; use crate::execution_ctx::ExecutionCtx; +use std::marker::PhantomData; + use crate::secondstep::handler; use crate::user::committee::Committee; use node_data::ledger::{to_str, Block}; -use node_data::message::{Message, Payload}; +use node_data::message::{Message, Payload, Topics}; use std::sync::Arc; use tokio::sync::Mutex; #[allow(unused)] -pub struct Reduction { - handler: handler::Reduction, +pub struct Reduction { + handler: Arc>, candidate: Option, timeout_millis: u64, executor: Arc>, + + marker: PhantomData, } -impl Reduction { - pub fn new(executor: Arc>) -> Self { +impl Reduction { + pub(crate) fn new( + executor: Arc>, + handler: Arc>, + ) -> Self { Self { - handler: handler::Reduction { - aggr: Default::default(), - first_step_votes: Default::default(), - }, + handler, candidate: None, timeout_millis: config::CONSENSUS_TIMEOUT_MS, executor, + marker: PhantomData, } } - pub fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { + pub async fn reinitialize(&mut self, msg: &Message, round: u64, step: u8) { + let mut handler = self.handler.lock().await; + self.candidate = None; - self.handler.reset(); + handler.reset(step); if let Payload::StepVotesWithCandidate(p) = msg.payload.clone() { - self.handler.first_step_votes = p.sv; + handler.first_step_votes = p.sv; self.candidate = Some(p.candidate); } @@ -59,13 +66,13 @@ impl Reduction { .header() .hash ), - fsv_bitset = self.handler.first_step_votes.bitset, + fsv_bitset = handler.first_step_votes.bitset, ) } pub async fn run( &mut self, - mut ctx: ExecutionCtx<'_>, + mut ctx: ExecutionCtx<'_, DB, T>, committee: Committee, ) -> Result { if committee.am_member() { @@ -81,19 +88,25 @@ impl Reduction { ctx.outbound.clone(), ctx.inbound.clone(), self.executor.clone(), + Topics::SecondReduction, ); } } // handle queued messages for current round and step. - if let Some(m) = - ctx.handle_future_msgs(&committee, &mut self.handler).await + if let Some(m) = ctx + .handle_future_msgs(&committee, self.handler.clone()) + .await { return Ok(m); } - ctx.event_loop(&committee, &mut self.handler, &mut self.timeout_millis) - .await + ctx.event_loop( + &committee, + self.handler.clone(), + &mut self.timeout_millis, + ) + .await } pub fn name(&self) -> &'static str { diff --git a/consensus/src/selection/step.rs b/consensus/src/selection/step.rs index 0762ce15eb..286ade4f6d 100644 --- a/consensus/src/selection/step.rs +++ b/consensus/src/selection/step.rs @@ -22,21 +22,25 @@ pub struct Selection where T: Operations, { - handler: handler::Selection, + handler: Arc>>, bg: Generator, timeout_millis: u64, } -impl Selection { - pub fn new(executor: Arc>, db: Arc>) -> Self { +impl Selection { + pub fn new( + executor: Arc>, + _db: Arc>, + handler: Arc>>, + ) -> Self { Self { timeout_millis: config::CONSENSUS_TIMEOUT_MS, - handler: handler::Selection { db }, + handler, bg: Generator::new(executor), } } - pub fn reinitialize(&mut self, _msg: &Message, round: u64, step: u8) { + pub async fn reinitialize(&mut self, _msg: &Message, round: u64, step: u8) { // To be aligned with the original impl, Selection does not double its // timeout settings self.timeout_millis = config::CONSENSUS_TIMEOUT_MS; @@ -52,7 +56,7 @@ impl Selection { pub async fn run( &mut self, - mut ctx: ExecutionCtx<'_>, + mut ctx: ExecutionCtx<'_, D, T>, committee: Committee, ) -> Result { if committee.am_member() { @@ -69,6 +73,8 @@ impl Selection { // register new candidate in local state match self .handler + .lock() + .await .collect( msg.clone(), &ctx.round_update, @@ -92,14 +98,19 @@ impl Selection { } // handle queued messages for current round and step. - if let Some(m) = - ctx.handle_future_msgs(&committee, &mut self.handler).await + if let Some(m) = ctx + .handle_future_msgs(&committee, self.handler.clone()) + .await { return Ok(m); } - ctx.event_loop(&committee, &mut self.handler, &mut self.timeout_millis) - .await + ctx.event_loop( + &committee, + self.handler.clone(), + &mut self.timeout_millis, + ) + .await } pub fn name(&self) -> &'static str { From 626fce2043a92c172572ba95c4b4ce42fd17dad5 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:43:18 +0300 Subject: [PATCH 14/31] consensus: Add step number to aggregator key --- consensus/src/aggregator.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/consensus/src/aggregator.rs b/consensus/src/aggregator.rs index 87c2311f77..92c9c627a9 100644 --- a/consensus/src/aggregator.rs +++ b/consensus/src/aggregator.rs @@ -18,7 +18,9 @@ use tracing::{debug, error, warn}; /// voters.StepVotes Mapping of a block hash to both an aggregated signatures /// and a cluster of bls voters. #[derive(Default)] -pub struct Aggregator(BTreeMap)>); +pub struct Aggregator( + BTreeMap<(u8, Hash), (AggrSignature, Cluster)>, +); impl Aggregator { pub fn collect_vote( @@ -27,6 +29,7 @@ impl Aggregator { header: &Header, signature: &[u8; 48], ) -> Option<(Hash, StepVotes)> { + let msg_step = header.step; // Get weight for this pubkey bls. If votes_for returns None, it means // the key is not a committee member, respectively we should not // process a vote from it. @@ -35,7 +38,7 @@ impl Aggregator { let (aggr_sign, cluster) = self .0 - .entry(hash) + .entry((msg_step, hash)) .or_insert((AggrSignature::default(), Cluster::new())); // Each committee has 64 slots. If a Provisioner is extracted into @@ -173,8 +176,8 @@ mod tests { use rand::rngs::StdRng; use rand::SeedableRng; impl Aggregator { - pub fn get_total(&self, hash: Hash) -> Option { - if let Some(value) = self.0.get(&hash) { + pub fn get_total(&self, step: u8, hash: Hash) -> Option { + if let Some(value) = self.0.get(&(step, hash)) { return Some(value.1.total_occurrences()); } None @@ -264,12 +267,15 @@ mod tests { // Check collected votes assert!(a.collect_vote(&c, h, signature).is_none()); collected_votes += expected_votes[i]; - assert_eq!(a.get_total(block_hash), Some(collected_votes)); + assert_eq!(a.get_total(h.step, block_hash), Some(collected_votes)); // Ensure a duplicated vote is discarded if i == 0 { assert!(a.collect_vote(&c, h, signature).is_none()); - assert_eq!(a.get_total(block_hash), Some(collected_votes)); + assert_eq!( + a.get_total(h.step, block_hash), + Some(collected_votes) + ); } } } From 6c54f9e229c552f5e5e4d728b94ca19b7e699721 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:47:10 +0300 Subject: [PATCH 15/31] consensus: Fix clippy --- consensus/src/execution_ctx.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index 11cbda0045..d8062de8fa 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -251,12 +251,12 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { msg_step, ); - if msg_step + 1 <= self.step { - self.try_vote(msg_step + 1, &candidate, Topics::FirstReduction); + if msg_step < self.step { + self.try_vote(msg_step + 1, candidate, Topics::FirstReduction); } if msg_step + 2 <= self.step { - self.try_vote(msg_step + 2, &candidate, Topics::SecondReduction); + self.try_vote(msg_step + 2, candidate, Topics::SecondReduction); } } From 2ab1822592af721ffea4b9286d84bf67304ae911 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 11:55:23 +0300 Subject: [PATCH 16/31] node: Apply new msg IDs. Pass committee size to verify_step_votes. --- node/src/chain.rs | 3 ++- node/src/chain/acceptor.rs | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index 6d3999b9a2..bc1954a200 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -43,7 +43,8 @@ use self::fsm::SimpleFSM; const TOPICS: &[u8] = &[ Topics::Block as u8, Topics::NewBlock as u8, - Topics::Reduction as u8, + Topics::FirstReduction as u8, + Topics::SecondReduction as u8, Topics::AggrAgreement as u8, Topics::Agreement as u8, ]; diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 98b2a6b185..435e3eeb41 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -31,6 +31,7 @@ use tokio::sync::{oneshot, Mutex, RwLock}; use tokio::task::JoinHandle; use tracing::{error, info, warn}; +use dusk_consensus::config; use std::any; use super::consensus::Task; @@ -454,6 +455,7 @@ async fn verify_block_cert( curr_seed, &hdr, 0, + config::FIRST_REDUCTION_COMMITTEE_SIZE, ) .await { @@ -467,6 +469,7 @@ async fn verify_block_cert( curr_seed, &hdr, 1, + config::SECOND_REDUCTION_COMMITTEE_SIZE, ) .await { From e597ea70a6b96cdbe93be1cbf3c0d1a4543c6d04 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 23 Oct 2023 13:30:55 +0300 Subject: [PATCH 17/31] consensus: Send agreement msg once it was generated without breaking the iteration loop --- consensus/src/consensus.rs | 10 ++++++---- consensus/src/execution_ctx.rs | 14 +++++--------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 4b949ffb8a..3de6f2cdfa 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -260,9 +260,13 @@ impl Consensus { .await?; // During execution of any step we may encounter that an - // agreement is generated for a former iteration. + // agreement is generated for a former or current iteration. if msg.topic() == Topics::Agreement { - break; + Self::send_agreement( + &mut agr_inbound_queue, + msg.clone(), + ) + .await; } if step >= config::CONSENSUS_MAX_STEP { @@ -274,8 +278,6 @@ impl Consensus { // Delegate (agreement) message result to agreement loop for // further processing. - - Self::send_agreement(&mut agr_inbound_queue, msg.clone()).await; } }) } diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index d8062de8fa..87838a66c6 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -260,14 +260,14 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { } } - fn try_vote(&mut self, step: u8, candidate: &Block, topic: Topics) { - if let Some(committee) = self.iter_ctx.get_committee(step) { + fn try_vote(&mut self, msg_step: u8, candidate: &Block, topic: Topics) { + if let Some(committee) = self.iter_ctx.get_committee(msg_step) { if committee.am_member() { debug!( event = "vote for former candidate", step_topic = format!("{:?}", topic), hash = node_data::ledger::to_str(&candidate.header().hash), - step, + msg_step, ); spawn_send_reduction( @@ -276,7 +276,7 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { candidate.clone(), self.round_update.pubkey_bls.clone(), self.round_update.clone(), - step, + msg_step, self.outbound.clone(), self.inbound.clone(), self.executor.clone(), @@ -284,11 +284,7 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> { ); }; } else { - error!( - event = "committee not found", - step = self.step, - msg_step = step - ); + error!(event = "committee not found", step = self.step, msg_step); } } From 4cb9ce5a80186cb66ff5d99b63e65c7fa84f27d9 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Mon, 23 Oct 2023 15:24:47 +0200 Subject: [PATCH 18/31] rusk-prover: bump to `0.3.0` version The plonk upgrade made by #1091 should be considered a major upgrade --- rusk-prover/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rusk-prover/Cargo.toml b/rusk-prover/Cargo.toml index 7cddd90c67..84353071bb 100644 --- a/rusk-prover/Cargo.toml +++ b/rusk-prover/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rusk-prover" -version = "0.2.0" +version = "0.3.0" edition = "2021" autobins = false From 5fea76786f34e1fe92d56904217598ebf69b11a1 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Mon, 23 Oct 2023 15:25:06 +0200 Subject: [PATCH 19/31] rusk: upgrade `rusk-prover` to `0.3.0` version --- rusk/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rusk/Cargo.toml b/rusk/Cargo.toml index cb7d0e90dd..ad179fdb8a 100644 --- a/rusk/Cargo.toml +++ b/rusk/Cargo.toml @@ -62,7 +62,7 @@ async-trait = "0.1" transfer-circuits = { version = "0.5", path = "../circuits/transfer" } rusk-profile = { version = "0.6", path = "../rusk-profile" } rusk-abi = { version = "0.11", path = "../rusk-abi", default-features = false, features = ["host"] } -rusk-prover = { version = "0.2", path = "../rusk-prover" } +rusk-prover = { version = "0.3", path = "../rusk-prover" } node = { version = "0.1", path = "../node" } dusk-consensus = { version = "0.1.1-rc.3", path = "../consensus" } node-data = { version = "0.1", path = "../node-data" } From b1d323558ad73b98516201231308848d50bef1d1 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 24 Oct 2023 16:43:48 +0200 Subject: [PATCH 20/31] rusk: update dusk consensus key See also dusk-network/wallet-cli#214 --- rusk/src/assets/dusk.cpk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rusk/src/assets/dusk.cpk b/rusk/src/assets/dusk.cpk index d1ab51151e..89c3862df5 100644 --- a/rusk/src/assets/dusk.cpk +++ b/rusk/src/assets/dusk.cpk @@ -1 +1 @@ -`dl;`N5x֥7͂P̾C s)+5y!O+7BaIԾYae^*P \ No newline at end of file +ND Date: Tue, 24 Oct 2023 16:44:29 +0200 Subject: [PATCH 21/31] examples: update consensus key See also dusk-network/wallet-cli#214 --- examples/consensus.keys | Bin 240 -> 240 bytes examples/genesis.toml | 6 +++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/consensus.keys b/examples/consensus.keys index d368bb4c2e427d02ff8b1013eddcdea03a37c836..44be476a45fadd8c7f82bf1b81aeb51d1bb0958e 100644 GIT binary patch literal 240 zcmV7ULgRFjqUDCJqN#Szuvq$(PF!N_ZS z>4p%@SKu#L>0!3Hf~#87&~oW3mOe!C8GbJ2)tDSmYa7a{R*Je=ldVq4qeY%%JoAJ;a~9+xitpY=ZaPlABQYIMqV^U)1IlX2@f zsizp{)ulIoI~tj2r!sgt!y-m|@L<4yXnp=cVx_x5SinwIY{|Fh--fwU*8MxdrnkOI zEL59ieL|ccr>avcoKJ&c({!(!S;ds`5QyJ;Q!^PE7S424`!q|a|WjgtAa0?oERVMTXd5q+<-0Dd0pn5|~`w<1x#0Fyh{ qvT>a@@j=i+&y$JwC`V04X-e*KH Date: Tue, 24 Oct 2023 16:48:00 +0200 Subject: [PATCH 22/31] node-data: Fix consensus keys decrypt Resolves #1101 --- node-data/Cargo.toml | 4 +++- node-data/src/bls.rs | 41 ++++++++++++++++++++++++++--------------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/node-data/Cargo.toml b/node-data/Cargo.toml index d5e4c0dc37..bebe437ca2 100644 --- a/node-data/Cargo.toml +++ b/node-data/Cargo.toml @@ -7,7 +7,8 @@ edition = "2021" phoenix-core = { version = "0.21", features = ["alloc"] } dusk-bytes = "^0.1" bytes = "0.6" -sha3 = { version = "0.10" } +sha3 = "0.10" +sha2 = "0.10" fake = { version = "2.5", features = ['derive'], optional = true } rand = { version = "0.8", optional = true } hex = { version = "0.4", optional = true } @@ -24,6 +25,7 @@ base64 = "0.13" async-channel = "1.7" chrono = "0.4" bs58 = { version = "0.4" } +tracing = "0.1" [dev-dependencies] diff --git a/node-data/src/bls.rs b/node-data/src/bls.rs index 21248a740a..43c0b9c665 100644 --- a/node-data/src/bls.rs +++ b/node-data/src/bls.rs @@ -6,16 +6,18 @@ use aes::Aes256; use block_modes::block_padding::Pkcs7; -use block_modes::{BlockMode, Cbc}; +use block_modes::{BlockMode, BlockModeError, Cbc}; use dusk_bls12_381_sign::SecretKey; use dusk_bytes::DeserializableSlice; use dusk_bytes::Serializable; use rand::rngs::StdRng; use rand_core::SeedableRng; +use sha2::{Digest, Sha256}; use std::cmp::Ordering; use std::fs; use std::path::PathBuf; +use tracing::warn; pub const PUBLIC_BLS_SIZE: usize = dusk_bls12_381_sign::PublicKey::SIZE; @@ -106,10 +108,8 @@ pub fn load_keys( path: String, pwd: String, ) -> (dusk_bls12_381_sign::SecretKey, PublicKey) { - let pwd = blake3::hash(pwd.as_bytes()); - let path_buf = PathBuf::from(path); - let (pk, sk) = read_from_file(path_buf, pwd); + let (pk, sk) = read_from_file(path_buf, &pwd); (sk, PublicKey::new(pk)) } @@ -119,13 +119,12 @@ pub fn load_keys( /// Panics on any error. fn read_from_file( path: PathBuf, - pwd: blake3::Hash, + pwd: &str, ) -> ( dusk_bls12_381_sign::PublicKey, dusk_bls12_381_sign::SecretKey, ) { use serde::Deserialize; - type Aes256Cbc = Cbc; /// Bls key pair helper structure #[derive(Deserialize)] @@ -139,13 +138,18 @@ fn read_from_file( let ciphertext = fs::read(path).expect("path should be valid consensus keys file"); - // Decrypt - let iv = &ciphertext[..16]; - let enc = &ciphertext[16..]; + let mut hasher = Sha256::new(); + hasher.update(pwd.as_bytes()); + let hashed_pwd = hasher.finalize().to_vec(); - let cipher = - Aes256Cbc::new_from_slices(pwd.as_bytes(), iv).expect("valid data"); - let bytes = cipher.decrypt_vec(enc).expect("pwd should be valid"); + let bytes = decrypt(&ciphertext[..], &hashed_pwd).unwrap_or_else(|_| { + let hashed_pwd = blake3::hash(pwd.as_bytes()); + let bytes = decrypt(&ciphertext[..], hashed_pwd.as_bytes()) + .expect("Invalid consensus keys password"); + warn!("Your consensus keys are in the old format"); + warn!("Consider to export them using a new version of the wallet"); + bytes + }); let keys: BlsKeyPair = serde_json::from_slice(&bytes).expect("keys files should contain json"); @@ -163,6 +167,15 @@ fn read_from_file( (pk, sk) } +fn decrypt(data: &[u8], pwd: &[u8]) -> Result, BlockModeError> { + type Aes256Cbc = Cbc; + let iv = &data[..16]; + let enc = &data[16..]; + + let cipher = Aes256Cbc::new_from_slices(pwd, iv).expect("valid data"); + cipher.decrypt_vec(enc) +} + /// Loads wallet files from $DUSK_WALLET_DIR and returns a vector of all loaded /// consensus keys. /// @@ -175,14 +188,12 @@ pub fn load_provisioners_keys( let dir = std::env::var("DUSK_WALLET_DIR").unwrap(); let pwd = std::env::var("DUSK_CONSENSUS_KEYS_PASS").unwrap(); - let pwd = blake3::hash(pwd.as_bytes()); - for i in 0..n { let mut path = dir.clone(); path.push_str(&format!("node_{i}.keys")); let path_buf = PathBuf::from(path); - let (pk, sk) = read_from_file(path_buf, pwd); + let (pk, sk) = read_from_file(path_buf, &pwd); keys.push((sk, PublicKey::new(pk))); } From c334ec4d54767290755161b69f627c3a1adb6ba8 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 09:49:57 +0200 Subject: [PATCH 23/31] rusk: upgrade `rustc_tools_util` to `v0.3` --- rusk/Cargo.toml | 2 +- rusk/build.rs | 13 +------------ 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/rusk/Cargo.toml b/rusk/Cargo.toml index ad179fdb8a..87d95187f5 100644 --- a/rusk/Cargo.toml +++ b/rusk/Cargo.toml @@ -82,7 +82,7 @@ rusk-recovery = { version = "0.6", path = "../rusk-recovery", features = ["state ff = { version = "0.13", default-features = false } [build-dependencies] -rustc_tools_util = "=0.2.0" +rustc_tools_util = "0.3" [features] default = ["ephemeral"] diff --git a/rusk/build.rs b/rusk/build.rs index ff4f078d00..f2c5c4a9b8 100644 --- a/rusk/build.rs +++ b/rusk/build.rs @@ -12,18 +12,7 @@ fn main() -> Result<(), Box> { println!("cargo:rerun-if-changed=../Cargo.lock"); // Get crate version + commit + toolchain for `-v` arg support. - println!( - "cargo:rustc-env=GIT_HASH={}", - rustc_tools_util::get_commit_hash().unwrap_or_default() - ); - println!( - "cargo:rustc-env=COMMIT_DATE={}", - rustc_tools_util::get_commit_date().unwrap_or_default() - ); - println!( - "cargo:rustc-env=RUSTC_RELEASE_CHANNEL={}", - rustc_tools_util::get_channel().unwrap_or_default() - ); + rustc_tools_util::setup_version_info!(); Ok(()) } From 694a75d031e53246b5a2b2d1ea328216114d33bd Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 09:53:12 +0200 Subject: [PATCH 24/31] rusk: improve `version` module - Add support for pre-release info - Add `crate::VERSION` - Move `version` module to the lib --- rusk/src/bin/args.rs | 4 +--- rusk/src/bin/main.rs | 1 - rusk/src/lib/lib.rs | 3 +++ rusk/src/{bin => lib}/version.rs | 22 +++++++++++++--------- 4 files changed, 17 insertions(+), 13 deletions(-) rename rusk/src/{bin => lib}/version.rs (56%) diff --git a/rusk/src/bin/args.rs b/rusk/src/bin/args.rs index acd71c43d7..ee6f731b08 100644 --- a/rusk/src/bin/args.rs +++ b/rusk/src/bin/args.rs @@ -9,12 +9,10 @@ use std::path::PathBuf; use clap::builder::PossibleValuesParser; use clap::Parser; -use crate::version::VERSION_BUILD; - #[derive(Parser, Debug)] #[command( author="Dusk Network B.V. All Rights Reserved.", - version = &VERSION_BUILD[..], + version = &rusk::VERSION_BUILD[..], about = "Rusk server node", )] pub struct Args { diff --git a/rusk/src/bin/main.rs b/rusk/src/bin/main.rs index 1eea90e906..905a6d3ccc 100644 --- a/rusk/src/bin/main.rs +++ b/rusk/src/bin/main.rs @@ -10,7 +10,6 @@ mod args; mod config; #[cfg(feature = "ephemeral")] mod ephemeral; -mod version; use clap::Parser; use node::database::rocksdb; diff --git a/rusk/src/lib/lib.rs b/rusk/src/lib/lib.rs index 09f245882b..56ebdbaf0c 100644 --- a/rusk/src/lib/lib.rs +++ b/rusk/src/lib/lib.rs @@ -17,6 +17,7 @@ pub mod chain; pub mod error; pub mod http; pub mod prover; +mod version; mod vm; use dusk_bytes::DeserializableSlice; @@ -44,6 +45,8 @@ use rusk_abi::{ use rusk_profile::to_rusk_state_id_path; use sha3::{Digest, Sha3_256}; +pub use version::{VERSION, VERSION_BUILD}; + const A: usize = 4; pub type Result = core::result::Result; diff --git a/rusk/src/bin/version.rs b/rusk/src/lib/version.rs similarity index 56% rename from rusk/src/bin/version.rs rename to rusk/src/lib/version.rs index d24b399f23..c94e927b47 100644 --- a/rusk/src/bin/version.rs +++ b/rusk/src/lib/version.rs @@ -9,25 +9,29 @@ use std::sync::LazyLock; -use rustc_tools_util::VersionInfo; - #[inline] -pub(crate) fn show_version(info: VersionInfo) -> String { - let version = format!("{}.{}.{}", info.major, info.minor, info.patch); +pub(crate) fn show_version(verbose: bool) -> String { + let info = rustc_tools_util::get_version_info!(); + let pre = std::env!("CARGO_PKG_VERSION_PRE"); + let version = if pre.is_empty() { + format!("{}.{}.{}", info.major, info.minor, info.patch) + } else { + format!("{}.{}.{}-{}", info.major, info.minor, info.patch, pre) + }; let build = format!( "{} {}", info.commit_hash.unwrap_or_default(), info.commit_date.unwrap_or_default() ); - if build.len() > 1 { + if verbose && build.trim().len() > 1 { format!("{version} ({build})") } else { version } } -pub static VERSION_BUILD: LazyLock = LazyLock::new(|| { - let info = rustc_tools_util::get_version_info!(); - show_version(info) -}); +pub static VERSION_BUILD: LazyLock = + LazyLock::new(|| show_version(true)); + +pub static VERSION: LazyLock = LazyLock::new(|| show_version(false)); From 1b6ab0a74450a5ec60f517dcf9f6945fe162279b Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 10:00:27 +0200 Subject: [PATCH 25/31] rusk: Fix HTTP header parsing - Fix parsing for unquoted strings - Fix parsing for empty values --- rusk/src/lib/http.rs | 6 +++++- rusk/src/lib/http/event.rs | 15 ++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index eb6722c983..533d621bd1 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -361,7 +361,11 @@ where for (k, v) in x_headers { let k = HeaderName::from_str(&k)?; - let v = HeaderValue::from_str(&v.to_string())?; + let v = match v { + serde_json::Value::String(s) => HeaderValue::from_str(&s), + serde_json::Value::Null => HeaderValue::from_str(""), + _ => HeaderValue::from_str(&v.to_string()), + }?; resp.headers_mut().append(k, v); } diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 748629accb..26946a3856 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -125,11 +125,16 @@ impl MessageRequest { let headers = req .headers() .iter() - .filter_map(|(k, v)| { - let a = v.as_bytes(); - serde_json::from_slice::(a) - .ok() - .map(|v| (k.to_string(), v)) + .map(|(k, v)| { + let v = if v.is_empty() { + serde_json::Value::Null + } else { + serde_json::from_slice::(v.as_bytes()) + .unwrap_or(serde_json::Value::String( + v.to_str().unwrap().to_string(), + )) + }; + (k.to_string().to_lowercase(), v) }) .collect(); let (event, is_binary) = Event::from_request(req).await?; From 9af68fb7f2ad7f93c7a5c170baead156705a1b80 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 10:05:05 +0200 Subject: [PATCH 26/31] rusk: Add HTTP version handshake Resolves: #1072 --- rusk/src/lib/http.rs | 29 ++++++++++++++++++----------- rusk/src/lib/http/event.rs | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 533d621bd1..ee0e89991b 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -44,10 +44,12 @@ use tungstenite::protocol::{CloseFrame, Message}; use futures_util::{stream, SinkExt, StreamExt}; use crate::chain::RuskNode; -use crate::Rusk; +use crate::{Rusk, VERSION}; use self::event::MessageRequest; +const RUSK_VERSION_HEADER: &str = "Rusk-Version"; + pub struct HttpServer { handle: task::JoinHandle<()>, local_addr: SocketAddr, @@ -92,6 +94,7 @@ impl HandleRequest for DataSources { "Received {:?}:{} request", request.event.target, request.event.topic ); + request.check_rusk_version()?; match request.event.to_route() { (Target::Contract(_), ..) | (_, "rusk", _) => { self.rusk.handle_request(request).await @@ -348,7 +351,7 @@ where let (execution_request, is_binary) = MessageRequest::from_request(req).await?; - let x_headers = execution_request.x_headers(); + let mut resp_headers = execution_request.x_headers(); let (responder, mut receiver) = mpsc::unbounded_channel(); handle_execution(sources, execution_request, responder).await; @@ -357,9 +360,10 @@ where .recv() .await .expect("An execution should always return a response"); + resp_headers.extend(execution_response.headers.clone()); let mut resp = execution_response.into_http(is_binary)?; - for (k, v) in x_headers { + for (k, v) in resp_headers { let k = HeaderName::from_str(&k)?; let v = match v { serde_json::Value::String(s) => HeaderValue::from_str(&s), @@ -380,7 +384,7 @@ async fn handle_execution( ) where H: HandleRequest, { - let rsp = sources + let mut rsp = sources .handle(&request) .await .map(|data| EventResponse { @@ -390,6 +394,7 @@ async fn handle_execution( }) .unwrap_or_else(|e| request.to_error(e.to_string())); + rsp.set_header(RUSK_VERSION_HEADER, serde_json::json!(*VERSION)); let _ = responder.send(rsp); } @@ -499,27 +504,26 @@ mod tests { data: RequestData::Text("Not used".into()), topic: "stream".into(), }; - let headers: serde_json::Map = + let request_x_header: serde_json::Map = serde_json::from_str(r#"{"X-requestid": "100"}"#) .expect("headers to be serialized"); let request = MessageRequest { event, - headers: headers.clone(), + headers: request_x_header.clone(), }; let request = serde_json::to_string(&request).unwrap(); stream - .write_message(Message::Text(request)) + .send(Message::Text(request)) .expect("Sending request to the server should succeed"); let mut responses = vec![]; - // Vec::::with_capacity(request_num); while responses.len() < STREAMED_DATA.len() { let msg = stream - .read_message() + .read() .expect("Response should be received without error"); let msg = match msg { @@ -528,9 +532,12 @@ mod tests { }; let response: EventResponse = serde_json::from_str(&msg) .expect("Response should deserialize successfully"); + + let mut response_x_header = response.headers.clone(); + response_x_header.retain(|k, _| k.to_lowercase().starts_with("x-")); assert_eq!( - response.headers, headers, - "x- headers to be propagated back" + response_x_header, request_x_header, + "x-headers to be propagated back" ); assert!(matches!(response.error, None), "There should be noerror"); match response.data { diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 26946a3856..d326457391 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -4,9 +4,11 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. +use super::RUSK_VERSION_HEADER; use futures_util::{stream, StreamExt}; use hyper::header::{InvalidHeaderName, InvalidHeaderValue}; use hyper::Body; +use semver::{Version, VersionReq}; use serde::{Deserialize, Serialize}; use serde_with::{self, serde_as}; use std::collections::HashMap; @@ -143,6 +145,23 @@ impl MessageRequest { Ok((req, is_binary)) } + + pub fn check_rusk_version(&self) -> anyhow::Result<()> { + if let Some(v) = self.header(RUSK_VERSION_HEADER) { + let req = match v.as_str() { + Some(v) => VersionReq::from_str(v), + None => VersionReq::from_str(&v.to_string()), + }?; + + let current = Version::from_str(&crate::VERSION)?; + if !req.matches(¤t) { + return Err(anyhow::anyhow!( + "Mismatched rusk version: requested {req} - current {current}", + )); + } + } + Ok(()) + } } #[derive(Debug, Deserialize, Serialize)] @@ -199,6 +218,20 @@ impl MessageResponse { Ok(hyper::Response::new(body)) } + + pub fn set_header(&mut self, key: &str, value: serde_json::Value) { + // search for the key in a case-insensitive way + let v = self + .headers + .iter_mut() + .find_map(|(k, v)| k.eq_ignore_ascii_case(key).then_some(v)); + + if let Some(v) = v { + *v = value; + } else { + self.headers.insert(key.into(), value); + } + } } #[derive(Debug, Serialize, Deserialize)] From 72df460de5716bd9736376f9c5f27fa14bc0249d Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 10:05:22 +0200 Subject: [PATCH 27/31] rusk: bump version to `0.7.0-rc` --- rusk/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rusk/Cargo.toml b/rusk/Cargo.toml index 87d95187f5..1aa9018a33 100644 --- a/rusk/Cargo.toml +++ b/rusk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rusk" -version = "0.6.0" +version = "0.7.0-rc.0" edition = "2021" autobins = false From 75a9a03843371ec55dab035d2695cd5129f4ec79 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 16:50:12 +0200 Subject: [PATCH 28/31] rusk: add HTTP support for `application/json` content type --- rusk/src/lib/http/chain.rs | 4 ++-- rusk/src/lib/http/event.rs | 41 ++++++++++++++++++++++++++++---------- rusk/src/lib/http/rusk.rs | 6 +++--- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/rusk/src/lib/http/chain.rs b/rusk/src/lib/http/chain.rs index cd6c3aa171..ec18cc2ca0 100644 --- a/rusk/src/lib/http/chain.rs +++ b/rusk/src/lib/http/chain.rs @@ -88,7 +88,7 @@ impl RuskNode { if !errors.is_empty() { return Err(anyhow::anyhow!("{errors:?}")); } - let data = serde_json::to_string(&data) + let data = serde_json::to_value(&data) .map_err(|e| anyhow::anyhow!("Cannot parse response {e}"))?; Ok(data.into()) } @@ -108,6 +108,6 @@ impl RuskNode { async fn alive_nodes(&self, amount: usize) -> anyhow::Result { let nodes = self.0.network().read().await.alive_nodes(amount).await; let nodes: Vec<_> = nodes.iter().map(|n| n.to_string()).collect(); - Ok(serde_json::to_string(&nodes)?.into()) + Ok(serde_json::to_value(nodes)?.into()) } } diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index d326457391..519bb942c7 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::str::FromStr; use std::sync::mpsc; +use tungstenite::http::HeaderValue; /// A request sent by the websocket client. #[derive(Debug, Serialize, Deserialize)] @@ -194,6 +195,8 @@ impl MessageResponse { .body(hyper::Body::from(error.to_string()))?); } + let mut headers = HashMap::new(); + let body = { match self.data { ResponseData::Binary(wrapper) => { @@ -204,19 +207,28 @@ impl MessageResponse { Body::from(data) } ResponseData::Text(text) => Body::from(text), - ResponseData::Channel(channel) => Body::wrap_stream( - stream::iter(channel).map(move |e| match is_binary { - true => Ok::<_, anyhow::Error>(e), - false => Ok::<_, anyhow::Error>( - hex::encode(e).as_bytes().to_vec(), - ), - }), // Ok::<_, anyhow::Error>), - ), + ResponseData::Json(value) => { + headers.insert(CONTENT_TYPE, CONTENT_TYPE_JSON.clone()); + Body::from(value.to_string()) + } + ResponseData::Channel(channel) => { + Body::wrap_stream(stream::iter(channel).map(move |e| { + match is_binary { + true => Ok::<_, anyhow::Error>(e), + false => Ok::<_, anyhow::Error>( + hex::encode(e).as_bytes().to_vec(), + ), + } + })) + } ResponseData::None => Body::empty(), } }; - - Ok(hyper::Response::new(body)) + let mut response = hyper::Response::new(body); + for (k, v) in headers { + response.headers_mut().insert(k, v); + } + Ok(response) } pub fn set_header(&mut self, key: &str, value: serde_json::Value) { @@ -276,12 +288,19 @@ impl From> for RequestData { pub enum ResponseData { Binary(BinaryWrapper), Text(String), + Json(serde_json::Value), #[serde(skip)] Channel(mpsc::Receiver>), #[default] None, } +impl From for ResponseData { + fn from(value: serde_json::Value) -> Self { + Self::Json(value) + } +} + impl From for ResponseData { fn from(text: String) -> Self { Self::Text(text) @@ -348,6 +367,8 @@ impl Event { } const CONTENT_TYPE: &str = "Content-Type"; const CONTENT_TYPE_BINARY: &str = "application/octet-stream"; +static CONTENT_TYPE_JSON: HeaderValue = + HeaderValue::from_static("application/json"); fn parse_len(bytes: &[u8]) -> anyhow::Result<(usize, &[u8])> { if bytes.len() < 4 { diff --git a/rusk/src/lib/http/rusk.rs b/rusk/src/lib/http/rusk.rs index 09f2daaf8c..ef6773cbc6 100644 --- a/rusk/src/lib/http/rusk.rs +++ b/rusk/src/lib/http/rusk.rs @@ -106,7 +106,7 @@ impl Rusk { } fn get_provisioners(&self) -> anyhow::Result { - let prov = self + let prov: Vec<_> = self .provisioners() .unwrap() .iter() @@ -119,9 +119,9 @@ impl Rusk { key, }) }) - .collect::>(); + .collect(); - Ok(serde_json::to_string(&prov)?.into()) + Ok(serde_json::to_value(prov)?.into()) } } From 822c965194f4dcac1fbe1c52bf6f4370752afdd6 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 16:43:20 +0200 Subject: [PATCH 29/31] node: add `Network::conf()` --- node/src/network.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/node/src/network.rs b/node/src/network.rs index 8e3e500b4c..e80d791c2c 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -148,6 +148,10 @@ impl Kadcast { pub async fn alive_nodes(&self, amount: usize) -> Vec { self.peer.alive_nodes(amount).await } + + pub fn conf(&self) -> &Config { + &self.conf + } } #[async_trait] From d2bab2e9c098b743b73a3deb0c2aa24f3aeecae4 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 16:51:41 +0200 Subject: [PATCH 30/31] rusk: add `info` HTTP endpoint --- rusk/src/lib/chain.rs | 5 +++++ rusk/src/lib/http/chain.rs | 15 +++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/rusk/src/lib/chain.rs b/rusk/src/lib/chain.rs index 5aedaccd25..946081688f 100644 --- a/rusk/src/lib/chain.rs +++ b/rusk/src/lib/chain.rs @@ -13,8 +13,13 @@ use crate::Rusk; #[derive(Clone)] pub struct RuskNode(pub node::Node, Backend, Rusk>); + impl RuskNode { pub fn db(&self) -> Arc> { self.0.database() as Arc> } + + pub fn network(&self) -> Arc>> { + self.0.network() as Arc>> + } } diff --git a/rusk/src/lib/http/chain.rs b/rusk/src/lib/http/chain.rs index ec18cc2ca0..311016d08f 100644 --- a/rusk/src/lib/http/chain.rs +++ b/rusk/src/lib/http/chain.rs @@ -26,6 +26,7 @@ use super::event::{ Event, MessageRequest, MessageResponse, RequestData, ResponseData, Target, }; use crate::http::RuskNode; +use crate::{VERSION, VERSION_BUILD}; const GQL_VAR_PREFIX: &str = "rusk-gqlvar-"; @@ -61,6 +62,7 @@ impl RuskNode { let amount = request.event.data.as_string().trim().parse()?; self.alive_nodes(amount).await } + (Target::Host(_), "Chain", "info") => self.get_info().await, _ => anyhow::bail!("Unsupported"), } } @@ -110,4 +112,17 @@ impl RuskNode { let nodes: Vec<_> = nodes.iter().map(|n| n.to_string()).collect(); Ok(serde_json::to_value(nodes)?.into()) } + + async fn get_info(&self) -> anyhow::Result { + let mut info: HashMap<&str, serde_json::Value> = HashMap::new(); + info.insert("version", VERSION.as_str().into()); + info.insert("version_build", VERSION_BUILD.as_str().into()); + + let n_conf = self.network().read().await.conf().clone(); + info.insert("bootstrapping_nodes", n_conf.bootstrapping_nodes.into()); + info.insert("chain_id", n_conf.kadcast_id.into()); + info.insert("kadcast_address", n_conf.public_address.into()); + + Ok(serde_json::to_value(&info)?.into()) + } } From 7186742603e05faa59ca20ed59d4d3ec11d3ae5c Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 26 Oct 2023 18:19:53 +0200 Subject: [PATCH 31/31] rusk: fix binary HTTP event detection --- rusk/src/lib/http/event.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 519bb942c7..0e5728eda0 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -346,9 +346,8 @@ impl Event { let is_binary = parts .headers .get(CONTENT_TYPE) - .and_then(|h| { - h.to_str().ok().map(|s| s.starts_with(CONTENT_TYPE_BINARY)) - }) + .and_then(|h| h.to_str().ok()) + .map(|v| v.eq_ignore_ascii_case(CONTENT_TYPE_BINARY)) .unwrap_or_default(); let target = parts.uri.path().try_into()?;