Skip to content

Commit

Permalink
Merge branch 'master' into on_idle
Browse files Browse the repository at this point in the history
  • Loading branch information
goshawk-3 authored Nov 2, 2023
2 parents ce97809 + 3d6dd9c commit de1dd16
Show file tree
Hide file tree
Showing 42 changed files with 883 additions and 247 deletions.
18 changes: 12 additions & 6 deletions consensus/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use tracing::{debug, error, warn};
/// voters.StepVotes Mapping of a block hash to both an aggregated signatures
/// and a cluster of bls voters.
#[derive(Default)]
pub struct Aggregator(BTreeMap<Hash, (AggrSignature, Cluster<PublicKey>)>);
pub struct Aggregator(
BTreeMap<(u8, Hash), (AggrSignature, Cluster<PublicKey>)>,
);

impl Aggregator {
pub fn collect_vote(
Expand All @@ -27,6 +29,7 @@ impl Aggregator {
header: &Header,
signature: &[u8; 48],
) -> Option<(Hash, StepVotes)> {
let msg_step = header.step;
// Get weight for this pubkey bls. If votes_for returns None, it means
// the key is not a committee member, respectively we should not
// process a vote from it.
Expand All @@ -35,7 +38,7 @@ impl Aggregator {

let (aggr_sign, cluster) = self
.0
.entry(hash)
.entry((msg_step, hash))
.or_insert((AggrSignature::default(), Cluster::new()));

// Each committee has 64 slots. If a Provisioner is extracted into
Expand Down Expand Up @@ -172,8 +175,8 @@ mod tests {
use node_data::ledger::Seed;
use node_data::message;
impl Aggregator {
pub fn get_total(&self, hash: Hash) -> Option<usize> {
if let Some(value) = self.0.get(&hash) {
pub fn get_total(&self, step: u8, hash: Hash) -> Option<usize> {
if let Some(value) = self.0.get(&(step, hash)) {
return Some(value.1.total_occurrences());
}
None
Expand Down Expand Up @@ -296,12 +299,15 @@ mod tests {
// Check collected votes
assert!(a.collect_vote(&c, h, signature).is_none());
collected_votes += expected_votes[i];
assert_eq!(a.get_total(block_hash), Some(collected_votes));
assert_eq!(a.get_total(h.step, block_hash), Some(collected_votes));

// Ensure a duplicated vote is discarded
if i == 0 {
assert!(a.collect_vote(&c, h, signature).is_none());
assert_eq!(a.get_total(block_hash), Some(collected_votes));
assert_eq!(
a.get_total(h.step, block_hash),
Some(collected_votes)
);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/agreement/verifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::user::committee::CommitteeSet;
use crate::user::sortition;
use bytes::Buf;

use crate::config;
use dusk_bytes::Serializable;
use node_data::bls::PublicKey;
use node_data::message::{marshal_signable_vote, Header, Message, Payload};
Expand Down Expand Up @@ -76,6 +77,7 @@ pub async fn verify_agreement(
seed,
&msg.header,
0,
config::FIRST_REDUCTION_COMMITTEE_SIZE,
)
.await
.map_err(|e| {
Expand All @@ -94,6 +96,7 @@ pub async fn verify_agreement(
seed,
&msg.header,
1,
config::SECOND_REDUCTION_COMMITTEE_SIZE,
)
.await
.map_err(|e| {
Expand All @@ -118,13 +121,14 @@ pub async fn verify_step_votes(
seed: Seed,
hdr: &Header,
step_offset: u8,
committee_size: usize,
) -> Result<(), Error> {
if hdr.step == 0 {
return Err(Error::InvalidStepNum);
}

let step = hdr.step - 1 + step_offset;
let cfg = sortition::Config::new(seed, hdr.round, step, 64);
let cfg = sortition::Config::new(seed, hdr.round, step, committee_size);

verify_votes(
&hdr.block_hash,
Expand Down
4 changes: 3 additions & 1 deletion consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::contract_state::Operations;

use node_data::ledger::*;
use node_data::message;
use node_data::message::Topics;
use tracing::Instrument;

use crate::contract_state::CallParams;
Expand Down Expand Up @@ -80,6 +81,7 @@ pub fn spawn_send_reduction<T: Operations + 'static>(
outbound: AsyncQueue<Message>,
inbound: AsyncQueue<Message>,
executor: Arc<Mutex<T>>,
topic: Topics,
) {
let hash = to_str(&candidate.header().hash);

Expand Down Expand Up @@ -168,7 +170,7 @@ pub fn spawn_send_reduction<T: Operations + 'static>(
round: ru.round,
step,
block_hash: hash,
topic: message::Topics::Reduction as u8,
topic: topic.into(),
};

let signature = hdr.sign(&ru.secret_key, ru.pubkey_bls.inner());
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

/// Maximum number of steps Consensus runs per a single round.
pub const CONSENSUS_MAX_STEP: u8 = 213;
/// Maximum number of iterations Consensus runs per a single round.
pub const CONSENSUS_MAX_ITER: u8 = CONSENSUS_MAX_STEP / 3;

/// Percentage number that determines a quorum.
pub const CONSENSUS_QUORUM_THRESHOLD: f64 = 0.67;
/// Initial step timeout in milliseconds.
pub const CONSENSUS_TIMEOUT_MS: u64 = 20 * 1000;
pub const CONSENSUS_TIMEOUT_MS: u64 = 5 * 1000;

/// Maximum step timeout.
pub const CONSENSUS_MAX_TIMEOUT_MS: u64 = 60 * 1000;
Expand All @@ -21,6 +24,7 @@ pub const SECOND_REDUCTION_COMMITTEE_SIZE: usize = 64;

/// Artifical delay on each selection step.
pub const CONSENSUS_DELAY_MS: u64 = 1000;

/// Default number of workers to process agreements.
pub const ACCUMULATOR_WORKERS_AMOUNT: usize = 6;
pub const ACCUMULATOR_QUEUE_CAP: usize = 100;
Expand Down
69 changes: 59 additions & 10 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
use crate::commons::{ConsensusError, Database, RoundUpdate};
use crate::contract_state::Operations;
use crate::phase::Phase;

use node_data::ledger::{to_str, Block};
use node_data::message::{AsyncQueue, Message, Payload};

use node_data::message::{AsyncQueue, Message, Payload, Topics};

use crate::agreement::step;
use crate::execution_ctx::{ExecutionCtx, IterationCtx};
Expand All @@ -18,6 +20,7 @@ use crate::{config, selection};
use crate::{firststep, secondstep};
use tracing::{error, Instrument};

use crate::step_votes_reg::StepVotesRegistry;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -164,34 +167,69 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
future_msgs.lock().await.clear_round(ru.round - 1);
}

let sv_registry =
Arc::new(Mutex::new(StepVotesRegistry::new(ru.clone())));

let sel_handler =
Arc::new(Mutex::new(selection::handler::Selection::new(
db.clone(),
sv_registry.clone(),
)));

let first_handler =
Arc::new(Mutex::new(firststep::handler::Reduction::new(
db.clone(),
sv_registry.clone(),
)));

let sec_handler = Arc::new(Mutex::new(
secondstep::handler::Reduction::new(sv_registry.clone()),
));

let mut phases = [
Phase::Selection(selection::step::Selection::new(
executor.clone(),
db.clone(),
sel_handler.clone(),
)),
Phase::Reduction1(firststep::step::Reduction::new(
executor.clone(),
db.clone(),
first_handler.clone(),
)),
Phase::Reduction2(secondstep::step::Reduction::new(
executor.clone(),
sec_handler.clone(),
)),
Phase::Reduction2(secondstep::step::Reduction::new(executor)),
];

// Consensus loop
// Initialize and run consensus loop
let mut step: u8 = 0;

let mut iter_num: u8 = 0;
let mut iter_ctx = IterationCtx::new(
ru.round,
iter_num,
sel_handler.clone(),
first_handler.clone(),
sec_handler.clone(),
);

loop {
let mut msg = Message::empty();
let mut iter_ctx = IterationCtx::new(ru.round, step + 1);
iter_num += 1;
iter_ctx.on_iteration_begin(iter_num);

let mut msg = Message::empty();
// Execute a single iteration
for phase in phases.iter_mut() {
step += 1;
for pos in 0..phases.len() {
let phase = phases.get_mut(pos).unwrap();

let step = (iter_num - 1) * 3 + (pos as u8 + 1);
let name = phase.name();

// Initialize new phase with message returned by previous
// phase.
phase.reinitialize(&msg, ru.round, step);
phase.reinitialize(&msg, ru.round, step).await;

// Construct phase execution context
let ctx = ExecutionCtx::new(
Expand All @@ -202,6 +240,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
&mut provisioners,
ru.clone(),
step,
executor.clone(),
);

// Execute a phase.
Expand All @@ -220,15 +259,25 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
))
.await?;

// During execution of any step we may encounter that an
// agreement is generated for a former or current iteration.
if msg.topic() == Topics::Agreement {
Self::send_agreement(
&mut agr_inbound_queue,
msg.clone(),
)
.await;
}

if step >= config::CONSENSUS_MAX_STEP {
return Err(ConsensusError::MaxStepReached);
}
}

iter_ctx.on_iteration_end();

// Delegate (agreement) message result to agreement loop for
// further processing.

Self::send_agreement(&mut agr_inbound_queue, msg.clone()).await;
}
})
}
Expand Down
Loading

0 comments on commit de1dd16

Please sign in to comment.