Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Feb 12, 2024
1 parent 5344f0e commit 1aa23da
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 104 deletions.
7 changes: 4 additions & 3 deletions consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub enum ConsensusError {
InvalidPrevBlockHash(Hash),
InvalidQuorumType,
InvalidVote(Vote),
InvalidMsgIteration(u8),
FutureEvent,
PastEvent,
NotCommitteeMember,
Expand Down Expand Up @@ -141,17 +142,17 @@ impl QuorumMsgSender {
}

/// Sends an quorum (internally) to the quorum loop.
pub(crate) async fn send(&self, msg: Message) {
pub(crate) async fn send_quorum(&self, msg: Message) {
match &msg.payload {
// TODO: EP: Change me accordingly to https://github.com/dusk-network/rusk/issues/1268
Payload::Quorum(q)
if !q.validation.is_empty()
&& !q.ratification.is_empty()
&& q.vote != Vote::NoCandidate =>
&& q.vote() != &Vote::NoCandidate =>
{
tracing::debug!(
event = "send quorum_msg",
vote = %q.vote,
vote = %q.vote(),
round = msg.header.round,
iteration = msg.header.iteration,
validation = ?q.validation,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
// During execution of any step we may encounter that an
// quorum is generated for a former or current iteration.
if msg.topic() == Topics::Quorum {
sender.send(msg.clone()).await;
sender.send_quorum(msg.clone()).await;
}
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
event = "quorum",
src = "prev_step",
msg_step = m.get_step(),
vote = %q.vote,
vote = %q.vote(),
);

self.quorum_sender.send(m).await;
self.quorum_sender.send_quorum(m).await;
}

Payload::ValidationResult(validation_result) => {
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/quorum/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ impl<'p, D: Database> Executor<'p, D> {
// Publish the quorum
self.publish(msg.clone()).await;

if let Vote::Valid(hash) = quorum.vote {
if let Vote::Valid(hash) = quorum.vote() {
// Create winning block
debug!("generate block from quorum msg");
let cert = quorum.generate_certificate();
return self.create_winning_block(&hash, &cert).await;
return self.create_winning_block(hash, &cert).await;
}
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/quorum/verifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn verify_quorum(
// Verify validation
verify_step_votes(
&quorum.header,
&quorum.vote,
quorum.vote(),
&quorum.validation,
committees_set,
seed,
Expand All @@ -47,7 +47,7 @@ pub async fn verify_quorum(
// Verify ratification
verify_step_votes(
&quorum.header,
&quorum.vote,
quorum.vote(),
&quorum.ratification,
committees_set,
seed,
Expand Down
68 changes: 29 additions & 39 deletions consensus/src/ratification/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,54 +68,44 @@ impl MsgHandler for RatificationHandler {
if iteration != self.curr_iteration {
// Message that belongs to step from the past must be handled with
// collect_from_past fn
warn!(
event = "drop message",
reason = "invalid iteration number",
msg_iteration = iteration,
);
return Ok(HandleMsgOutput::Pending);
return Err(ConsensusError::InvalidMsgIteration(iteration));
}

// Collect vote, if msg payload is of ratification type
let collect_vote = self.aggregator.collect_vote(
committee,
p.sign_info(),
&p.vote,
p.get_step(),
);

match collect_vote {
Ok((sv, quorum_reached)) => {
// Record any signature in global registry
_ = self.sv_registry.lock().await.add_step_votes(
iteration,
&p.vote,
sv,
StepName::Ratification,
quorum_reached,
committee.excluded().expect("Generator to be excluded"),
);

if quorum_reached {
return Ok(HandleMsgOutput::Ready(self.build_quorum_msg(
ru,
iteration,
p.vote,
*p.validation_result.sv(),
sv,
)));
}
}
Err(error) => {
let (sv, quorum_reached) = self
.aggregator
.collect_vote(committee, p.sign_info(), &p.vote, p.get_step())
.map_err(|error| {
let vote = p.vote.clone();
warn!(
event = "Cannot collect vote",
?error,
from = p.sign_info().signer.to_bs58(),
vote = %p.vote,
%vote,
msg_step = p.get_step(),
msg_round = p.header().round,
);
}
ConsensusError::InvalidVote(vote)
})?;

// Record any signature in global registry
_ = self.sv_registry.lock().await.add_step_votes(
iteration,
&p.vote,
sv,
StepName::Ratification,
quorum_reached,
committee.excluded().expect("Generator to be excluded"),
);

if quorum_reached {
return Ok(HandleMsgOutput::Ready(self.build_quorum_msg(
ru,
iteration,
p.vote,
*p.validation_result.sv(),
sv,
)));
}

Ok(HandleMsgOutput::Pending)
Expand Down Expand Up @@ -201,7 +191,7 @@ impl RatificationHandler {

let quorum = payload::Quorum {
header,
vote,
ratification_result: vote.into(),
validation,
ratification,
};
Expand Down
12 changes: 6 additions & 6 deletions consensus/src/step_votes_reg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use crate::commons::RoundUpdate;
use node_data::bls::PublicKeyBytes;
use node_data::ledger::{Certificate, IterationInfo, StepVotes};
use node_data::message::payload::Vote;
use node_data::message::payload::{RatificationResult, Vote};
use node_data::message::{payload, Message};
use node_data::StepName;
use std::collections::HashMap;
Expand All @@ -18,7 +18,7 @@ use tracing::{debug, warn};

#[derive(Clone)]
struct CertificateInfo {
vote: Vote,
result: RatificationResult,
cert: Certificate,

quorum_reached_validation: bool,
Expand All @@ -29,8 +29,8 @@ impl fmt::Display for CertificateInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"cert_info: {}, validation: ({:?},{:?}), ratification: ({:?},{:?}) ",
self.vote,
"cert_info: {:?}, validation: ({:?},{:?}), ratification: ({:?},{:?}) ",
self.result,
self.cert.validation,
self.quorum_reached_validation,
self.cert.ratification,
Expand All @@ -42,7 +42,7 @@ impl fmt::Display for CertificateInfo {
impl CertificateInfo {
pub(crate) fn new(vote: Vote) -> Self {
CertificateInfo {
vote,
result: vote.into(),
cert: Certificate::default(),
quorum_reached_validation: false,
quorum_reached_ratification: false,
Expand Down Expand Up @@ -185,7 +185,7 @@ impl CertInfoRegistry {

let payload = payload::Quorum {
header,
vote: result.vote.clone(),
ratification_result: result.result.clone(),
validation: result.cert.validation,
ratification: result.cert.ratification,
};
Expand Down
75 changes: 32 additions & 43 deletions consensus/src/validation/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,58 +96,47 @@ impl MsgHandler for ValidationHandler {
if iteration != self.curr_iteration {
// Message that belongs to step from the past must be handled with
// collect_from_past fn
warn!(
event = "drop message",
reason = "invalid iteration number",
msg_iteration = iteration,
);
return Ok(HandleMsgOutput::Pending);
return Err(ConsensusError::InvalidMsgIteration(iteration));
}

let collect_vote = self.aggr.collect_vote(
committee,
p.sign_info(),
&p.vote,
p.get_step(),
);
match collect_vote {
Ok((sv, quorum_reached)) => {
// Record result in global round registry
_ = self.sv_registry.lock().await.add_step_votes(
iteration,
&p.vote,
sv,
StepName::Validation,
quorum_reached,
committee.excluded().expect("Generator to be excluded"),
);

if quorum_reached {
let vote = p.vote;

let quorum_type = match vote {
Vote::NoCandidate => QuorumType::NoCandidate,
Vote::Invalid(_) => QuorumType::Invalid,
Vote::Valid(_) => QuorumType::Valid,
Vote::NoQuorum => {
return Err(ConsensusError::InvalidVote(vote));
}
};
info!(event = "quorum reached", %vote);
return Ok(final_result(sv, vote, quorum_type));
}
}

Err(error) => {
let (sv, quorum_reached) = self
.aggr
.collect_vote(committee, p.sign_info(), &p.vote, p.get_step())
.map_err(|error| {
let vote = p.vote.clone();
warn!(
event = "Cannot collect vote",
?error,
from = p.sign_info().signer.to_bs58(),
vote = %p.vote,
%vote,
msg_step = p.get_step(),
msg_round = p.header().round,
);
}
ConsensusError::InvalidVote(vote)
})?;
// Record result in global round registry
_ = self.sv_registry.lock().await.add_step_votes(
iteration,
&p.vote,
sv,
StepName::Validation,
quorum_reached,
committee.excluded().expect("Generator to be excluded"),
);

if quorum_reached {
let vote = p.vote;

let quorum_type = match vote {
Vote::NoCandidate => QuorumType::NoCandidate,
Vote::Invalid(_) => QuorumType::Invalid,
Vote::Valid(_) => QuorumType::Valid,
Vote::NoQuorum => {
return Err(ConsensusError::InvalidVote(vote));
}
};
info!(event = "quorum reached", %vote);
return Ok(final_result(sv, vote, quorum_type));
}

Ok(HandleMsgOutput::Pending)
Expand Down
46 changes: 45 additions & 1 deletion node-data/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::ledger::{
StepVotes, Transaction,
};
use crate::message::payload::{
QuorumType, Ratification, ValidationResult, Vote,
QuorumType, Ratification, RatificationResult, ValidationResult, Vote,
};
use crate::message::{ConsensusHeader, SignInfo};
use crate::Serializable;
Expand Down Expand Up @@ -194,6 +194,45 @@ impl Serializable for StepVotes {
}
}

impl Serializable for RatificationResult {
fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
match self {
RatificationResult::Fail(v) => {
w.write_all(&[0])?;
v.write(w)?;
}

RatificationResult::Success(v) => {
w.write_all(&[1])?;
v.write(w)?;
}
}

Ok(())
}

fn read<R: Read>(r: &mut R) -> io::Result<Self>
where
Self: Sized,
{
let result = match Self::read_u8(r)? {
0 => {
let vote = Vote::read(r)?;
Self::Fail(vote)
}
1 => {
let vote = Vote::read(r)?;
Self::Success(vote)
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid RatificationResult",
))?,
};
Ok(result)
}
}

impl Serializable for IterationsInfo {
fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
let count = self.cert_list.len() as u8;
Expand Down Expand Up @@ -401,4 +440,9 @@ mod tests {
fn test_encoding_block() {
assert_serializable::<Block>();
}

#[test]
fn test_encoding_ratification_result() {
assert_serializable::<RatificationResult>();
}
}
Loading

0 comments on commit 1aa23da

Please sign in to comment.