Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulate candidates and votes from previous iterations, same round #1038

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2536c11
consensus: Reduction steps accumulates votes for any iteration
Sep 5, 2023
61475de
consensus: Initial impl of RoundCtx struct
Sep 5, 2023
b4a4eba
consensus: Integrate RoundCtx into both Reduction steps
Sep 5, 2023
426d49e
node-data: Add Copy trait impl
Sep 5, 2023
13a7b55
consensus: Use different topic IDs for first and second reduction mes…
Sep 5, 2023
ff00966
node-data: Use different topic IDs for first and second reduction mes…
Sep 5, 2023
9480e6b
node: Enable FirstReduction and SecondReduction messages in Chain
Sep 5, 2023
094a707
consensus: compile block with certificate
Sep 11, 2023
c0c7771
consensus: Collect past events and produce agreements from past itera…
Sep 11, 2023
784ff09
rusk-recovery: Enable 64 stakers in genesis
Sep 11, 2023
e9d99f6
consensus: Fix logging info
Sep 11, 2023
7fc0cfe
consensus: Fix double-lock issue
Sep 11, 2023
bd2cb8f
consensus: Implement vote_for_former_candidate
Sep 12, 2023
3176713
node: Broadcast any blocks equal to local tip
Sep 18, 2023
bc7822e
node: Broadcast any blocks equal to local tip
Sep 18, 2023
2d1a585
consensus: Vote on former candidate only if am_member of the step com…
Sep 18, 2023
d74daa8
node: Print blocks received in in_sync mode
Sep 18, 2023
9ef3a84
Merge branch 'master' into collect_past_messages
goshawk-3 Sep 18, 2023
1ade3a3
node: Improve fallback procedure
Sep 20, 2023
5d33233
node: Randomize the unique counter in network::send call
Sep 20, 2023
becdc5e
consensus: Fix clippy warnings
Sep 20, 2023
58495c4
consensus: Add step in Aggregator::get_total
Sep 20, 2023
3cf757d
consensus: Republish a drained message, if valid
Sep 21, 2023
d80c1ae
consensus: Return from firststep::collect call if step_votes belongs …
Sep 29, 2023
ad880cc
consensus: Return from secondstep::collect call if step_votes belongs…
Sep 29, 2023
38dc58d
consensus: Fix error discription
Sep 29, 2023
28f3e7a
node: Request missing blocks from multiple sources if dest_addr is un…
Sep 29, 2023
fc02bc7
consensus: Decrease default consensus_timeout to 5s
Oct 3, 2023
9f1d7da
node: Request updates from random peers when accept_block_timeout eve…
Oct 5, 2023
ce62586
consensus: Decrease default consensus_timeout_ms to 2000
Oct 5, 2023
bbcd402
consensus: Apply a delay in block generator accordingly
Oct 11, 2023
2b531f6
node: Trace duration of accept/finalize call execution
Oct 11, 2023
7ff415f
consensus: Move committee stores from phases into IterCtx
Oct 13, 2023
df539c1
consensus: Maintain the iteraton_ctx object throughout the duration o…
Oct 13, 2023
f45e193
consensus: Refactor/Rename round_ctx into StepVotesRegistry
Oct 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/example/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub fn fetch_blskeys_from_file(

// attempt to load and decode wallet
let ciphertext =
fs::read(&path).expect("path should be valid consensus keys file");
fs::read(path).expect("path should be valid consensus keys file");

// Decrypt
let iv = &ciphertext[..16];
Expand Down
18 changes: 12 additions & 6 deletions consensus/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use tracing::{debug, error, warn};
/// voters.StepVotes Mapping of a block hash to both an aggregated signatures
/// and a cluster of bls voters.
#[derive(Default)]
pub struct Aggregator(BTreeMap<Hash, (AggrSignature, Cluster<PublicKey>)>);
pub struct Aggregator(
BTreeMap<(u8, Hash), (AggrSignature, Cluster<PublicKey>)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need the "step" part in the key.
There's no collision between the block hashes...

Does it have another goal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not introduced we accumulate votes for empty hash during all iterations. Before introducing this fix, I noticed votes for an empty hash were more than 67%.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.
This is due to the ambiguity of votes. The vote should be separate from the block hash so as to distinguish the case of invalid candidate from that of the candidate not being received. Still, in the case of empty candidate, the iteration/step is indeed needed.

That said, I think nodes should not vote on empty hash. Instead, they should vote NIL only on invalid candidates.
That is:

  • if no candidate is received, do not vote
  • if a invalid candidate is received, vote NIL on the candidate

Note that voting nil for not having received a block is also in contrast to waiting (and voting) for previous-iteration blocks, as this could lead to the same provisioner voting NIL (because it didn't receive the block) and then vote the block's hash when receiving it while on a later iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I think nodes should not vote on empty hash.

What metrics do you expect to improve by implementing this patch not voting on empty hash.? Why do you propose it here as a comment instead of posting a DIP with better and complete explanation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote it here as part of the discussion. DIP/Issues can always stem from comments :)
I'll do write a proposal for this, but, in short, I think voting on empty hash yields the same issues as not collecting votes from previous iterations.
Either way, if we vote nil when not receiving a block, we shouldn't vote on previous-iteration blocks, or we could produce a double (and opposite) vote on the same block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an Issue here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the DIP Issue, relative to this PR, if the node votes NIL for a round/iteration after the timeout and then votes on the candidate when receiving it (vote_for_former_candidate) it would produce two votes for the same block, creating potential indeterminism (nodes can choice any of the two votes).

Copy link
Contributor Author

@goshawk-3 goshawk-3 Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply put,
Nil vote means a vote for tuple (empty hash, round, step)
A real vote means a vote for tuple (candidate_block_hash, round, step)

nodes can choice any of the two votes

Not the case at all. Nodes can reach quorum for an empty hash at iteration 1 and then move forward and reach quorum for candidate block of the iteration 1 while running the second iteration. The output of this case is a valid finalized block.

creating potential indeterminism (nodes can choice any of the two votes).

There is no indeterminism as these are not the same blocks. Also, voting NIL only helps provisioners to move forward when a candidate block is not generated, it has not other impact other than that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not currently distinguish between NIL for unknown candidate and NIL for invalid block.
In both cases, the vote will be (round, step, empty_hash).
So, a NIL vote is effectively valid as a negative vote on the candidate, whichever that might be.

If we produce both a NIL and a candidate votes, they would be a condtradictive vote for the same block.
And if I receive both votes I can use either for the quorum, and they would be both valid votes.
In fact, in extreme cases, there could be a quorum on both NIL and the candidate.

Example 1:

  • I'm at round/step
  • I don't receive the block, so I vote NIL (empty,round,step)
  • I then receive the block and vote (candidate,round,step)

Now there are two votes for the same round-step.
Let's say the committee is split in half (half voted NIL, half voted the candidate) and my vote is the one deciding the quorum.
Nodes receiving the NIL vote first will reach quorum on NIL, and move to the next iteration;
Nodes receiving the candidate vote first will reach quorum on the block, and move to the next round.
Both nodes will have a valid quorum for the same round and step, but one is NIL and one is for the candidate.

Example 2:

  • let's say all nodes are at round/step
  • the candidate is delayed, so all provisioners vote NIL
  • then the block is broadcasted and they all vote for it

Depending on which votes they receive first, nodes can either reach quorum on the candidate or on the empty hash.

Not the case at all. Nodes can reach quorum for an empty hash at iteration 1 and then move forward and reach quorum for candidate block of the iteration 1 while running the second iteration. The output of this case is a valid finalized block.

You're only seeing the best-case scenario.
In this case you still have two valid quorums on for the same round/iteration.
If a quorum is reached on iteration 2, it's possible that some nodes will move to the next round while others will accept the first-iteration block and move to the next round.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reaching quorum on a tuple (empty_hash, round, step) does not impact reaching quorum for any candidate block of the same round.

I'd suggest to pair up (when you're available) and review with you the implementation to ensure we're on the same page.

);

impl Aggregator {
pub fn collect_vote(
Expand All @@ -27,6 +29,7 @@ impl Aggregator {
header: &Header,
signature: &[u8; 48],
) -> Option<(Hash, StepVotes)> {
let msg_step = header.step;
// Get weight for this pubkey bls. If votes_for returns None, it means
// the key is not a committee member, respectively we should not
// process a vote from it.
Expand All @@ -35,7 +38,7 @@ impl Aggregator {

let (aggr_sign, cluster) = self
.0
.entry(hash)
.entry((msg_step, hash))
.or_insert((AggrSignature::default(), Cluster::new()));

// Each committee has 64 slots. If a Provisioner is extracted into
Expand Down Expand Up @@ -173,8 +176,8 @@ mod tests {
use rand::rngs::StdRng;
use rand::SeedableRng;
impl Aggregator {
pub fn get_total(&self, hash: Hash) -> Option<usize> {
if let Some(value) = self.0.get(&hash) {
pub fn get_total(&self, step: u8, hash: Hash) -> Option<usize> {
if let Some(value) = self.0.get(&(step, hash)) {
return Some(value.1.total_occurrences());
}
None
Expand Down Expand Up @@ -264,12 +267,15 @@ mod tests {
// Check collected votes
assert!(a.collect_vote(&c, h, signature).is_none());
collected_votes += expected_votes[i];
assert_eq!(a.get_total(block_hash), Some(collected_votes));
assert_eq!(a.get_total(h.step, block_hash), Some(collected_votes));

// Ensure a duplicated vote is discarded
if i == 0 {
assert!(a.collect_vote(&c, h, signature).is_none());
assert_eq!(a.get_total(block_hash), Some(collected_votes));
assert_eq!(
a.get_total(h.step, block_hash),
Some(collected_votes)
);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/agreement/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

#![allow(dead_code)]

use crate::agreement::verifiers;
use crate::user::committee::CommitteeSet;
use crate::user::sortition;
Expand Down Expand Up @@ -136,6 +138,7 @@ impl Accumulator {
/// # Panics
///
/// If workers pool is not spawned, this will panic.

pub async fn process(&mut self, msg: Message) {
assert!(!self.workers.is_empty());

Expand Down
22 changes: 18 additions & 4 deletions consensus/src/agreement/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,19 @@ impl<D: Database> Executor<D> {
}
Payload::Agreement(_) => {
// Accumulate the agreement
self.collect_agreement(acc, msg).await;
return self.collect_agreement(acc, msg).await;
}
_ => {}
};

None
}

async fn collect_agreement(&mut self, acc: &mut Accumulator, msg: Message) {
async fn collect_agreement(
&mut self,
_acc: &mut Accumulator,
msg: Message,
) -> Option<Block> {
// Publish the agreement
self.outbound_queue
.send(msg.clone())
Expand All @@ -211,8 +215,18 @@ impl<D: Database> Executor<D> {
error!("unable to publish a collected agreement msg {:?}", err)
});

// Accumulate the agreement
acc.process(msg.clone()).await;
debug!("compile block with certificate");

match msg.payload {
Payload::Agreement(payload) => {
let (cert, hash) =
(payload.generate_certificate(), msg.header.block_hash);

// Create winning block
self.create_winning_block(&hash, &cert).await
}
_ => None,
}
}

/// Collects accumulator output (a list of agreements) and publishes
Expand Down
4 changes: 3 additions & 1 deletion consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::contract_state::Operations;

use node_data::ledger::*;
use node_data::message;
use node_data::message::Topics;
use tracing::Instrument;

use crate::contract_state::CallParams;
Expand Down Expand Up @@ -80,6 +81,7 @@ pub fn spawn_send_reduction<T: Operations + 'static>(
outbound: AsyncQueue<Message>,
inbound: AsyncQueue<Message>,
executor: Arc<Mutex<T>>,
topic: Topics,
) {
let hash = to_str(&candidate.header().hash);

Expand Down Expand Up @@ -168,7 +170,7 @@ pub fn spawn_send_reduction<T: Operations + 'static>(
round: ru.round,
step,
block_hash: hash,
topic: message::Topics::Reduction as u8,
topic: topic.into(),
};

let signature = hdr.sign(&ru.secret_key, ru.pubkey_bls.inner());
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

/// Maximum number of steps Consensus runs per a single round.
pub const CONSENSUS_MAX_STEP: u8 = 213;
/// Maximum number of iterations Consensus runs per a single round.
pub const CONSENSUS_MAX_ITER: u8 = CONSENSUS_MAX_STEP / 3;

/// Percentage number that determines a quorum.
pub const CONSENSUS_QUORUM_THRESHOLD: f64 = 0.67;
/// Initial step timeout in milliseconds.
Expand Down
58 changes: 51 additions & 7 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
use crate::commons::{ConsensusError, Database, RoundUpdate};
use crate::contract_state::Operations;
use crate::phase::Phase;

use node_data::ledger::{to_str, Block};
use node_data::message::{AsyncQueue, Message, Payload};

use node_data::message::{AsyncQueue, Message, Payload, Topics};

use crate::agreement::step;
use crate::execution_ctx::{ExecutionCtx, IterationCtx};
Expand All @@ -18,6 +20,7 @@ use crate::{config, selection};
use crate::{firststep, secondstep};
use tracing::{error, Instrument};

use crate::round_ctx::RoundCtx;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -164,34 +167,68 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
future_msgs.lock().await.clear_round(ru.round - 1);
}

let round_ctx = Arc::new(Mutex::new(RoundCtx::new(ru.clone())));

let sel_handler =
Arc::new(Mutex::new(selection::handler::Selection::new(
db.clone(),
round_ctx.clone(),
)));

let first_handler =
Arc::new(Mutex::new(firststep::handler::Reduction::new(
db.clone(),
round_ctx.clone(),
)));

let sec_handler = Arc::new(Mutex::new(
secondstep::handler::Reduction::new(round_ctx.clone()),
));

let mut phases = [
Phase::Selection(selection::step::Selection::new(
executor.clone(),
db.clone(),
sel_handler.clone(),
)),
Phase::Reduction1(firststep::step::Reduction::new(
executor.clone(),
db.clone(),
first_handler.clone(),
)),
Phase::Reduction2(secondstep::step::Reduction::new(
executor.clone(),
sec_handler.clone(),
)),
Phase::Reduction2(secondstep::step::Reduction::new(executor)),
];

// Consensus loop
// Initialize and run consensus loop
let mut step: u8 = 0;

let mut iter: u8 = 0;

loop {
iter += 1;

let mut msg = Message::empty();
let mut iter_ctx = IterationCtx::new(ru.round, step + 1);
let mut iter_ctx = IterationCtx::new(
ru.round,
iter,
sel_handler.clone(),
first_handler.clone(),
sec_handler.clone(),
);

// Execute a single iteration
for phase in phases.iter_mut() {
step += 1;
for pos in 0..phases.len() {
let phase = phases.get_mut(pos).unwrap();

let step = (iter - 1) * 3 + (pos as u8 + 1);
let name = phase.name();

// Initialize new phase with message returned by previous
// phase.
phase.reinitialize(&msg, ru.round, step);
phase.reinitialize(&msg, ru.round, step).await;

// Construct phase execution context
let ctx = ExecutionCtx::new(
Expand All @@ -202,6 +239,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
&mut provisioners,
ru.clone(),
step,
executor.clone(),
);

// Execute a phase.
Expand All @@ -220,6 +258,12 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
))
.await?;

// During execution of any step we may encounter that an
// agreement is generated for previous iteration.
if msg.topic() == Topics::Agreement {
break;
}

if step >= config::CONSENSUS_MAX_STEP {
return Err(ConsensusError::MaxStepReached);
}
Expand Down
Loading