Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consensus: change Quorum to use Certificates with RatificationResult #1346

Merged
merged 14 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions consensus/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ impl Aggregator {
.votes_for(signer)
.ok_or(AggregatorError::NotCommitteeMember)?;

let (aggr_sign, cluster) =
self.0.entry((msg_step, vote.clone())).or_default();
let (aggr_sign, cluster) = self.0.entry((msg_step, *vote)).or_default();

// Each committee has 64 slots.
//
Expand All @@ -86,7 +85,7 @@ impl Aggregator {

debug!(
event = "vote aggregated",
%vote,
?vote,
from = signer.to_bs58(),
added = weight,
total,
Expand All @@ -107,11 +106,11 @@ impl Aggregator {
_ => committee.majority_quorum(),
};

let quorum_reached = total > quorum_target;
let quorum_reached = total >= quorum_target;
if quorum_reached {
tracing::info!(
event = "quorum reached",
%vote,
?vote,
total,
target = quorum_target,
bitset,
Expand Down
16 changes: 6 additions & 10 deletions consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub enum ConsensusError {
InvalidPrevBlockHash(Hash),
InvalidQuorumType,
InvalidVote(Vote),
InvalidMsgIteration(u8),
FutureEvent,
PastEvent,
NotCommitteeMember,
Expand Down Expand Up @@ -141,21 +142,16 @@ impl QuorumMsgSender {
}

/// Sends an quorum (internally) to the quorum loop.
pub(crate) async fn send(&self, msg: Message) {
pub(crate) async fn send_quorum(&self, msg: Message) {
match &msg.payload {
// TODO: Change me accordingly to https://github.com/dusk-network/rusk/issues/1268
Payload::Quorum(q)
if !q.validation.is_empty()
&& !q.ratification.is_empty()
&& q.vote != Vote::NoCandidate =>
{
Payload::Quorum(q) if !q.cert.ratification.is_empty() => {
tracing::debug!(
event = "send quorum_msg",
vote = %q.vote,
vote = ?q.vote(),
round = msg.header.round,
iteration = msg.header.iteration,
validation = ?q.validation,
ratification = ?q.ratification,
validation = ?q.cert.validation,
ratification = ?q.cert.ratification,
);
}
_ => return,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
// During execution of any step we may encounter that an
// quorum is generated for a former or current iteration.
if msg.topic() == Topics::Quorum {
sender.send(msg.clone()).await;
sender.send_quorum(msg.clone()).await;
}
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
event = "quorum",
src = "prev_step",
msg_step = m.get_step(),
vote = %q.vote,
vote = ?q.vote(),
);

self.quorum_sender.send(m).await;
self.quorum_sender.send_quorum(m).await;
}

Payload::ValidationResult(validation_result) => {
Expand Down
16 changes: 12 additions & 4 deletions consensus/src/quorum/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::queue::Queue;
use crate::user::committee::CommitteeSet;
use crate::user::provisioners::Provisioners;
use node_data::ledger::{to_str, Block, Certificate};
use node_data::message::payload::Vote;
use node_data::message::payload::{RatificationResult, Vote};
use node_data::message::{AsyncQueue, Message, Payload, Status};

use crate::quorum::verifiers;
Expand Down Expand Up @@ -158,14 +158,22 @@ impl<'p, D: Database> Executor<'p, D> {
.await
.ok()?;

debug!(
event = "quorum_collected",
result = ?quorum.cert.result,
iter = quorum.header.iteration,
round = quorum.header.round,
);

// Publish the quorum
self.publish(msg.clone()).await;
goshawk-3 marked this conversation as resolved.
Show resolved Hide resolved

if let Vote::Valid(hash) = quorum.vote {
if let RatificationResult::Success(Vote::Valid(hash)) =
&quorum.cert.result
{
// Create winning block
debug!("generate block from quorum msg");
let cert = quorum.generate_certificate();
return self.create_winning_block(&hash, &cert).await;
return self.create_winning_block(hash, &quorum.cert).await;
}
}

Expand Down
15 changes: 8 additions & 7 deletions consensus/src/quorum/verifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pub async fn verify_quorum(
// Verify validation
verify_step_votes(
&quorum.header,
&quorum.vote,
&quorum.validation,
quorum.vote(),
&quorum.cert.validation,
committees_set,
seed,
StepName::Validation,
Expand All @@ -38,7 +38,7 @@ pub async fn verify_quorum(
.map_err(|e| {
error!(
desc = "invalid validation",
sv = ?quorum.validation,
sv = ?quorum.cert.validation,
hdr = ?quorum.header,
);
e
Expand All @@ -47,8 +47,8 @@ pub async fn verify_quorum(
// Verify ratification
verify_step_votes(
&quorum.header,
&quorum.vote,
&quorum.ratification,
quorum.vote(),
&quorum.cert.ratification,
committees_set,
seed,
StepName::Ratification,
Expand All @@ -57,7 +57,7 @@ pub async fn verify_quorum(
.map_err(|e| {
error!(
desc = "invalid ratification",
sv = ?quorum.ratification,
sv = ?quorum.cert.ratification,
hdr = ?quorum.header,
);
e
Expand Down Expand Up @@ -135,13 +135,14 @@ pub fn verify_votes(
target_quorum,
};

if !quorum_result.quorum_reached() {
if vote != &Vote::NoQuorum && !quorum_result.quorum_reached() {
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
tracing::error!(
desc = "vote_set_too_small",
committee = format!("{:#?}", sub_committee),
bitset,
target_quorum,
total,
?vote
);
return Err(StepSigError::VoteSetTooSmall);
}
Expand Down
116 changes: 49 additions & 67 deletions consensus/src/ratification/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ use crate::commons::{ConsensusError, RoundUpdate};
use crate::msg_handler::{HandleMsgOutput, MsgHandler};
use crate::step_votes_reg::SafeCertificateInfoRegistry;
use async_trait::async_trait;
use node_data::ledger::Certificate;
use node_data::{ledger, StepName};
use tracing::{error, warn};

use crate::aggregator::Aggregator;

use crate::iteration_ctx::RoundCommittees;
use crate::quorum::verifiers::verify_votes;
use node_data::message::payload::{
QuorumType, Ratification, ValidationResult, Vote,
};
use node_data::message::payload::{Ratification, ValidationResult, Vote};
use node_data::message::{
payload, ConsensusHeader, Message, Payload, StepMessage,
};
Expand Down Expand Up @@ -68,54 +67,43 @@ impl MsgHandler for RatificationHandler {
if iteration != self.curr_iteration {
// Message that belongs to step from the past must be handled with
// collect_from_past fn
warn!(
event = "drop message",
reason = "invalid iteration number",
msg_iteration = iteration,
);
return Ok(HandleMsgOutput::Pending);
return Err(ConsensusError::InvalidMsgIteration(iteration));
}

// Collect vote, if msg payload is of ratification type
let collect_vote = self.aggregator.collect_vote(
committee,
p.sign_info(),
&p.vote,
p.get_step(),
);

match collect_vote {
Ok((sv, quorum_reached)) => {
// Record any signature in global registry
_ = self.sv_registry.lock().await.add_step_votes(
iteration,
&p.vote,
sv,
StepName::Ratification,
quorum_reached,
committee.excluded().expect("Generator to be excluded"),
);

if quorum_reached {
return Ok(HandleMsgOutput::Ready(self.build_quorum_msg(
ru,
iteration,
p.vote,
*p.validation_result.sv(),
sv,
)));
}
}
Err(error) => {
let (sv, quorum_reached) = self
.aggregator
.collect_vote(committee, p.sign_info(), &p.vote, p.get_step())
.map_err(|error| {
warn!(
event = "Cannot collect vote",
?error,
from = p.sign_info().signer.to_bs58(),
vote = %p.vote,
?p.vote,
msg_step = p.get_step(),
msg_round = p.header().round,
);
}
ConsensusError::InvalidVote(p.vote)
})?;

// Record any signature in global registry
_ = self.sv_registry.lock().await.add_step_votes(
iteration,
&p.vote,
sv,
StepName::Ratification,
quorum_reached,
committee.excluded().expect("Generator to be excluded"),
);

if quorum_reached {
return Ok(HandleMsgOutput::Ready(self.build_quorum_msg(
ru,
iteration,
p.vote,
*p.validation_result.sv(),
sv,
)));
}

Ok(HandleMsgOutput::Pending)
Expand Down Expand Up @@ -159,7 +147,7 @@ impl MsgHandler for RatificationHandler {
event = "Cannot collect vote",
?error,
from = p.sign_info().signer.to_bs58(),
vote = %p.vote,
vote = ?p.vote,
msg_step = p.get_step(),
msg_round = p.header().round,
);
Expand Down Expand Up @@ -201,9 +189,11 @@ impl RatificationHandler {

let quorum = payload::Quorum {
header,
vote,
validation,
ratification,
cert: Certificate {
result: vote.into(),
validation,
ratification,
},
};

Message::new_quorum(quorum)
Expand Down Expand Up @@ -232,27 +222,19 @@ impl RatificationHandler {
round_committees: &RoundCommittees,
result: &ValidationResult,
) -> Result<(), ConsensusError> {
// TODO: Check all quorums
match result.quorum() {
QuorumType::Valid | QuorumType::NoCandidate => {
if let Some(validation_committee) =
round_committees.get_validation_committee(iter)
{
verify_votes(
header,
StepName::Validation,
result.vote(),
result.sv(),
validation_committee,
)?;

return Ok(());
} else {
error!("could not get validation committee");
}
}
_ => {}
}
Err(ConsensusError::InvalidValidation(result.quorum()))
let validation_committee = round_committees
.get_validation_committee(iter)
.ok_or_else(|| {
error!("could not get validation committee");
ConsensusError::InvalidValidation(result.quorum())
})?;
verify_votes(
header,
StepName::Validation,
result.vote(),
result.sv(),
validation_committee,
)?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions consensus/src/ratification/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn build_ratification_payload(
let sign_info = message::SignInfo::default();
let mut ratification = message::payload::Ratification {
header,
vote: result.vote().clone(),
vote: *result.vote(),
sign_info,
validation_result: result.clone(),
timestamp: get_current_timestamp(),
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<T: Operations + 'static, DB: Database> RatificationStep<T, DB> {
handler.validation_result(),
ctx.outbound.clone(),
)
.instrument(tracing::info_span!("ratification", %vote))
.instrument(tracing::info_span!("ratification", ?vote))
.await;

// Collect my own vote
Expand Down
Loading
Loading