Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce failed_certificates field in candidate block #1137

Merged
merged 50 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
5093140
consensus: Share step_votes registry with ExecutionCtx
Nov 15, 2023
1fb22ab
consensus: Impl get_nil_certificates
Nov 15, 2023
a024af4
consensus: Pass failed_certificates to block_generator
Nov 15, 2023
8be1672
consensus: Rename FinalResult to Ready and Result to Pending in Handl…
Nov 15, 2023
a5c23c1
consensus: Collect any reduction message from former iteration in a s…
Nov 15, 2023
c9e4cc7
consensus: Enable NIL voting for 2nd reduction
Nov 15, 2023
1679187
consensus: Wrap up send_agreement function into AgreementSender struct
Nov 21, 2023
584bdba
consensus: Send an agreement msg if quorum of a former iteration is r…
Nov 21, 2023
c51f350
node-data: Impl failed_iterations field of type IterationsInfo in Blo…
Nov 22, 2023
36abc52
consensus: Use nil_quorum fn for empty hash
Nov 22, 2023
b9a80b4
consensus: Pass failed_iterations in generate_block fn
Nov 22, 2023
885c970
node: Verify failed iterations list
Nov 22, 2023
9bc09d5
consensus: Iterations greater than RELAX_ITERATION_THRESHOLD shall no…
Nov 22, 2023
d0188f8
consensus: Refactor step_votes_registry:
Nov 22, 2023
35a97c4
node-data: Make Certificate Copyable
Nov 22, 2023
b72f74f
consensus: Add flags to mark quorum_reached state
Nov 23, 2023
9c265c1
consensus: Record signatures with/without quorum
Nov 23, 2023
35a1c0a
consensus: Aggregator always returns quorum_reached flag with current…
Nov 23, 2023
86a0560
consensus: Make quorum_check optional for verify_votes fn
Nov 23, 2023
c447f22
node: Enable quorum check for both winner cert and prev_block_cert. D…
Nov 23, 2023
c14e0c6
consensus: Resolve unit-test failure
Nov 23, 2023
3912145
consensus: Share step_votes registry with ExecutionCtx
Nov 15, 2023
cd1ff8d
consensus: Impl get_nil_certificates
Nov 15, 2023
0c6cc64
consensus: Pass failed_certificates to block_generator
Nov 15, 2023
741b3f4
consensus: Rename FinalResult to Ready and Result to Pending in Handl…
Nov 15, 2023
5735dd0
consensus: Collect any reduction message from former iteration in a s…
Nov 15, 2023
601deb2
consensus: Enable NIL voting for 2nd reduction
Nov 15, 2023
08b784d
consensus: Wrap up send_agreement function into AgreementSender struct
Nov 21, 2023
644411f
consensus: Send an agreement msg if quorum of a former iteration is r…
Nov 21, 2023
42ffca2
node-data: Impl failed_iterations field of type IterationsInfo in Blo…
Nov 22, 2023
3712004
consensus: Use nil_quorum fn for empty hash
Nov 22, 2023
0b8c06b
consensus: Pass failed_iterations in generate_block fn
Nov 22, 2023
04f7850
node: Verify failed iterations list
Nov 22, 2023
f2deb5c
consensus: Iterations greater than RELAX_ITERATION_THRESHOLD shall no…
Nov 22, 2023
ab34e1a
consensus: Refactor step_votes_registry:
Nov 22, 2023
7fc1f97
node-data: Make Certificate Copyable
Nov 22, 2023
be24fbe
consensus: Add flags to mark quorum_reached state
Nov 23, 2023
a392ce7
consensus: Record signatures with/without quorum
Nov 23, 2023
22dc6af
consensus: Aggregator always returns quorum_reached flag with current…
Nov 23, 2023
052ea7d
consensus: Make quorum_check optional for verify_votes fn
Nov 23, 2023
d3f54d5
node: Enable quorum check for both winner cert and prev_block_cert. D…
Nov 23, 2023
c02dc83
consensus: Resolve unit-test failure
Nov 23, 2023
3c3f5f0
Merge branch 'master' into failed_iterations
goshawk-3 Nov 24, 2023
0ef65cf
Merge branch 'master' into failed_iterations
goshawk-3 Nov 27, 2023
5d179cb
Merge branch 'failed_iterations' of github.com:dusk-network/rusk into…
Nov 28, 2023
80062fd
consensus: Remove aggr_agreement file to fix merge conflicts
Nov 28, 2023
3ca6371
Merge branch 'master' into failed_iterations
goshawk-3 Nov 28, 2023
a514bd5
consensus: Rename AgreementInfo to CertificateInfo together with all …
Nov 28, 2023
e86c26c
Update consensus/src/step_votes_reg.rs
goshawk-3 Nov 30, 2023
0a5845a
consensus: Address PR comments
Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions consensus/example/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@ fn spawn_node(
txroot: [0u8; 32],
gas_limit: 0,
iteration: 0,
prev_block_cert: Default::default(),
hash: [0u8; 32],
cert: Default::default(),
..Default::default()
},
vec![],
)
Expand Down
41 changes: 23 additions & 18 deletions consensus/src/aggregator.rs
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Aggregator {
committee: &Committee,
header: &Header,
signature: &[u8; 48],
) -> Option<(Hash, StepVotes)> {
) -> Option<(Hash, StepVotes, bool)> {
let msg_step = header.step;
// Get weight for this pubkey bls. If votes_for returns None, it means
// the key is not a committee member, respectively we should not
Expand Down Expand Up @@ -94,17 +94,17 @@ impl Aggregator {
}
};

if quorum_reached {
let s = aggr_sign
.aggregated_bytes()
.expect("Signature to exist after quorum reached");
let bitset = committee.bits(cluster);
let s = aggr_sign
.aggregated_bytes()
.expect("Signature to exist after aggregating");
let bitset = committee.bits(cluster);

let step_votes = StepVotes {
bitset,
aggregate_signature: Signature::from(s),
};
let step_votes = StepVotes {
bitset,
aggregate_signature: Signature::from(s),
};

if quorum_reached {
tracing::info!(
event = "reduction, quorum reached",
hash = to_str(&hash),
Expand All @@ -114,9 +114,9 @@ impl Aggregator {
step = header.step,
signature = to_str(&s),
);

return Some((hash, step_votes));
}

return Some((hash, step_votes, quorum_reached));
}

None
Expand Down Expand Up @@ -291,10 +291,12 @@ mod tests {
// Last member's vote should reach the quorum
if i == winning_index {
// (hash, sv) is only returned in case we reach the quorum
let (hash, sv) = a
let (hash, sv, quorum_reached) = a
.collect_vote(&c, h, signature)
.expect("failed to reach quorum");

assert_eq!(quorum_reached, true, "quorum should be reached");

// Check expected block hash
assert_eq!(hash, block_hash);

Expand All @@ -307,17 +309,20 @@ mod tests {
}

// Check collected votes
assert!(a.collect_vote(&c, h, signature).is_none());
let (_, _, quorum_reached) =
a.collect_vote(&c, h, signature).unwrap();

assert_eq!(
quorum_reached, false,
"quorum should not be reached yet"
);

collected_votes += expected_votes[i];
assert_eq!(a.get_total(h.step, block_hash), Some(collected_votes));

// Ensure a duplicated vote is discarded
if i == 0 {
assert!(a.collect_vote(&c, h, signature).is_none());
assert_eq!(
a.get_total(h.step, block_hash),
Some(collected_votes)
);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/agreement/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl<D: Database> Executor<D> {
match self.db.lock().await.get_candidate_block_by_hash(hash).await {
Ok(mut block) => {
debug!(event = "winner block retrieved", hash = to_str(hash));
block.set_certificate(cert.clone());
block.set_certificate(*cert);

Some(block)
}
Expand Down
40 changes: 25 additions & 15 deletions consensus/src/agreement/verifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub async fn verify_agreement(
&msg.header,
0,
config::FIRST_REDUCTION_COMMITTEE_SIZE,
true,
)
.await
.map_err(|e| {
Expand All @@ -97,6 +98,7 @@ pub async fn verify_agreement(
&msg.header,
1,
config::SECOND_REDUCTION_COMMITTEE_SIZE,
true,
)
.await
.map_err(|e| {
Expand All @@ -122,6 +124,7 @@ pub async fn verify_step_votes(
hdr: &Header,
step_offset: u8,
committee_size: usize,
enable_quorum_check: bool,
) -> Result<(), Error> {
if hdr.step == 0 {
return Err(Error::InvalidStepNum);
Expand All @@ -136,35 +139,42 @@ pub async fn verify_step_votes(
&sv.aggregate_signature.inner(),
committees_set,
&cfg,
enable_quorum_check,
)
.await
}

#[allow(clippy::too_many_arguments)]
pub async fn verify_votes(
block_hash: &[u8; 32],
bitset: u64,
signature: &[u8; 48],
committees_set: &Arc<Mutex<CommitteeSet>>,
cfg: &sortition::Config,
enable_quorum_check: bool,
) -> Result<(), Error> {
// TODO: This should be refactored into a structure when #1118 issue is
// implemented
let sub_committee = {
// Scoped guard to fetch committee data quickly
let mut guard = committees_set.lock().await;

let sub_committee = guard.intersect(bitset, cfg);
let target_quorum = guard.quorum(cfg);
let total = guard.total_occurrences(&sub_committee, cfg);

if total < target_quorum {
tracing::error!(
desc = "vote_set_too_small",
committee = format!("{:#?}", sub_committee),
cfg = format!("{:#?}", cfg),
bitset = bitset,
target_quorum = target_quorum,
total = total,
);
Err(Error::VoteSetTooSmall(cfg.step))

if enable_quorum_check {
let target_quorum = guard.quorum(cfg);
let total = guard.total_occurrences(&sub_committee, cfg);
if total < target_quorum {
tracing::error!(
desc = "vote_set_too_small",
committee = format!("{:#?}", sub_committee),
cfg = format!("{:#?}", cfg),
bitset = bitset,
target_quorum = target_quorum,
total = total,
);
Err(Error::VoteSetTooSmall(cfg.step))
} else {
Ok(sub_committee)
}
} else {
Ok(sub_committee)
}
Expand Down
52 changes: 49 additions & 3 deletions consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::contract_state::Operations;

use node_data::ledger::*;
use node_data::message;
use node_data::message::Topics;
use node_data::message::{Payload, Topics};
use tracing::Instrument;

use crate::contract_state::CallParams;
Expand Down Expand Up @@ -87,9 +87,11 @@ pub enum ConsensusError {
ChildTaskTerminated,
Canceled,
}

/// Makes an attempt to cast a vote for a specified candidate block if VST call
/// passes. If candidate block is default, it casts a NIL vote, without calling
/// VST API
#[allow(clippy::too_many_arguments)]
pub fn spawn_send_reduction<T: Operations + 'static>(
pub fn spawn_cast_vote<T: Operations + 'static>(
join_set: &mut JoinSet<()>,
verified_hash: Arc<Mutex<[u8; 32]>>,
candidate: Block,
Expand Down Expand Up @@ -270,3 +272,47 @@ impl IterCounter for u8 {
self * Self::STEP_NUM + pos as u8
}
}

#[derive(Clone)]
pub(crate) struct AgreementSender {
queue: AsyncQueue<Message>,
}

impl AgreementSender {
pub(crate) fn new(queue: AsyncQueue<Message>) -> Self {
Self { queue }
}

/// Sends an agreement (internally) to the agreement loop.
pub(crate) async fn send(&self, msg: Message) -> bool {
herr-seppia marked this conversation as resolved.
Show resolved Hide resolved
if let Payload::Agreement(agreement) = &msg.payload {
if agreement.signature == [0u8; 48]
|| agreement.first_step.is_empty()
|| agreement.second_step.is_empty()
|| msg.header.block_hash == [0; 32]
{
return false;
}

tracing::debug!(
event = "send agreement",
hash = to_str(&msg.header.block_hash),
round = msg.header.round,
step = msg.header.step,
first = format!("{:#?}", agreement.first_step),
second = format!("{:#?}", agreement.second_step),
signature = to_str(&agreement.signature),
);

let _ = self
.queue
.send(msg.clone())
.await
.map_err(|e| error!("send agreement failed with {:?}", e));

return true;
}

false
}
}
2 changes: 2 additions & 0 deletions consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ pub const SECOND_REDUCTION_COMMITTEE_SIZE: usize = 64;
pub const CONSENSUS_DELAY_MS: u64 = 1000;

pub const DEFAULT_BLOCK_GAS_LIMIT: u64 = 5 * 1_000_000_000;

pub const RELAX_ITERATION_THRESHOLD: u8 = 10;
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
65 changes: 17 additions & 48 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database, IterCounter, RoundUpdate};
use crate::commons::{
AgreementSender, ConsensusError, Database, IterCounter, RoundUpdate,
};
use crate::contract_state::Operations;
use crate::phase::Phase;

use node_data::ledger::{to_str, Block};
use node_data::ledger::Block;

use node_data::message::{AsyncQueue, Message, Payload, Topics};
use node_data::message::{AsyncQueue, Message, Topics};

use crate::agreement::step;
use crate::execution_ctx::{ExecutionCtx, IterationCtx};
use crate::queue::Queue;
use crate::selection;
use crate::user::provisioners::Provisioners;
use crate::{firststep, secondstep};
use tracing::{error, Instrument};
use tracing::Instrument;

use crate::step_votes_reg::StepVotesRegistry;
use crate::step_votes_reg::CertInfoRegistry;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -113,12 +115,12 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
self.db.clone(),
);

let sender =
AgreementSender::new(self.agreement_process.inbound_queue.clone());

// Consensus loop - generation-selection-reduction loop
let mut main_task_handle = self.spawn_main_loop(
ru,
provisioners,
self.agreement_process.inbound_queue.clone(),
);
let mut main_task_handle =
self.spawn_main_loop(ru, provisioners, sender);

// Wait for any of the tasks to complete.
let result;
Expand Down Expand Up @@ -154,7 +156,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
&mut self,
ru: RoundUpdate,
mut provisioners: Provisioners,
mut agr_inbound_queue: AsyncQueue<Message>,
sender: AgreementSender,
) -> JoinHandle<Result<Block, ConsensusError>> {
let inbound = self.inbound.clone();
let outbound = self.outbound.clone();
Expand All @@ -168,7 +170,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
}

let sv_registry =
Arc::new(Mutex::new(StepVotesRegistry::new(ru.clone())));
Arc::new(Mutex::new(CertInfoRegistry::new(ru.clone())));

let sel_handler =
Arc::new(Mutex::new(selection::handler::Selection::new(
Expand Down Expand Up @@ -240,6 +242,8 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
ru.clone(),
step,
executor.clone(),
sv_registry.clone(),
sender.clone(),
);

// Execute a phase.
Expand All @@ -261,11 +265,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
// During execution of any step we may encounter that an
// agreement is generated for a former or current iteration.
if msg.topic() == Topics::Agreement {
Self::send_agreement(
&mut agr_inbound_queue,
msg.clone(),
)
.await;
sender.send(msg.clone()).await;
}
}

Expand All @@ -277,37 +277,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
}
})
}

/// Sends an agreement (internally) to the agreement loop.
async fn send_agreement(
agr_inbound_queue: &mut AsyncQueue<Message>,
msg: Message,
) {
if let Payload::Agreement(payload) = &msg.payload {
if payload.signature == [0u8; 48]
|| payload.first_step.is_empty()
|| payload.second_step.is_empty()
|| msg.header.block_hash == [0; 32]
{
return;
}

tracing::debug!(
event = "send agreement",
hash = to_str(&msg.header.block_hash),
round = msg.header.round,
step = msg.header.step,
first = format!("{:#?}", payload.first_step),
second = format!("{:#?}", payload.second_step),
signature = to_str(&payload.signature),
);

let _ = agr_inbound_queue
.send(msg.clone())
.await
.map_err(|e| error!("send agreement failed with {:?}", e));
}
}
}

#[inline]
Expand Down
Loading