Skip to content

Commit

Permalink
Merge branch 'master' into fix-1904
Browse files Browse the repository at this point in the history
  • Loading branch information
goshawk-3 authored Sep 6, 2024
2 parents 606795b + f2b8fc0 commit 65ca957
Show file tree
Hide file tree
Showing 252 changed files with 10,444 additions and 4,150 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ jobs:
with:
filters: |
run-ci:
- 'rusk/**/*
- 'node/**/*
predicate-quantifier: "any"
- 'rusk/**'
- 'node/**'
- '.github/workflows/benchmarks.yml'
benchmark:
needs: changes
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ruskwallet_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
with:
filters: |
run-ci:
- 'wallet-core/**'
- 'execution-core/**'
- 'rusk-wallet/**'
- '.github/workflows/ruskwallet_ci.yml'
fmt:
Expand Down
1 change: 0 additions & 1 deletion consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl From<BlsSigError> for ConsensusError {
#[async_trait::async_trait]
pub trait Database: Send + Sync {
fn store_candidate_block(&mut self, b: Block);
fn delete_candidate_blocks(&mut self);
}

#[derive(Clone)]
Expand Down
3 changes: 1 addition & 2 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
Phase::Proposal(proposal::step::ProposalStep::new(
executor.clone(),
db.clone(),
proposal_handler.clone(),
proposal_handler,
)),
Phase::Validation(validation::step::ValidationStep::new(
executor.clone(),
Expand All @@ -179,7 +179,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
let mut iter_ctx = IterationCtx::new(
ru.round,
iter,
proposal_handler,
validation_handler,
ratification_handler,
ru.base_timeouts.clone(),
Expand Down
75 changes: 37 additions & 38 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database, QuorumMsgSender, RoundUpdate};
use crate::commons::{ConsensusError, QuorumMsgSender, RoundUpdate};

use crate::iteration_ctx::IterationCtx;
use crate::msg_handler::{HandleMsgOutput, MsgHandler};
Expand Down Expand Up @@ -34,8 +34,8 @@ use tracing::{debug, error, info, trace, warn};

/// ExecutionCtx encapsulates all data needed in the execution of consensus
/// messages handlers.
pub struct ExecutionCtx<'a, DB: Database, T> {
pub iter_ctx: &'a mut IterationCtx<DB>,
pub struct ExecutionCtx<'a, T> {
pub iter_ctx: &'a mut IterationCtx,

/// Messaging-related fields
pub inbound: AsyncQueue<Message>,
Expand All @@ -57,11 +57,11 @@ pub struct ExecutionCtx<'a, DB: Database, T> {
quorum_sender: QuorumMsgSender,
}

impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> {
/// Creates step execution context.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
iter_ctx: &'a mut IterationCtx<DB>,
iter_ctx: &'a mut IterationCtx,
inbound: AsyncQueue<Message>,
outbound: AsyncQueue<Message>,
future_msgs: Arc<Mutex<MsgRegistry<Message>>>,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {

if let Some(committee) = self.iter_ctx.committees.get_committee(step) {
if self.am_member(committee) {
RatificationStep::<DB>::try_vote(
RatificationStep::try_vote(
&self.round_update,
msg_iteration,
validation,
Expand Down Expand Up @@ -248,41 +248,40 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
async fn on_emergency_mode(&mut self, msg: Message) {
self.outbound.try_send(msg.clone());

// Try to cast validation vote for a candidate block from former
// iteration
// Try to vote for a candidate block from former iteration
if let Payload::Candidate(p) = &msg.payload {
self.try_cast_validation_vote(&p.candidate).await;
}

let msg_iteration = msg.header.iteration;

// Collect message from a previous iteration/step.
if let Some(m) = self
.iter_ctx
.collect_past_event(&self.round_update, msg)
.await
{
match &m.payload {
Payload::Quorum(q) => {
debug!(
event = "quorum",
src = "prev_step",
msg_step = m.get_step(),
vote = ?q.vote(),
);

self.quorum_sender.send_quorum(m).await;
}
} else {
let msg_iteration = msg.header.iteration;

// Collect message from a previous iteration/step.
if let Some(m) = self
.iter_ctx
.collect_past_event(&self.round_update, msg)
.await
{
match &m.payload {
Payload::Quorum(q) => {
debug!(
event = "quorum",
src = "prev_step",
msg_step = m.get_step(),
vote = ?q.vote(),
);

self.quorum_sender.send_quorum(m).await;
}

Payload::ValidationResult(validation_result) => {
self.try_cast_ratification_vote(
msg_iteration,
validation_result,
)
.await
}
_ => {
// Not supported.
Payload::ValidationResult(validation_result) => {
self.try_cast_ratification_vote(
msg_iteration,
validation_result,
)
.await
}
_ => {
// Not supported.
}
}
}
}
Expand Down
18 changes: 4 additions & 14 deletions consensus/src/iteration_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::Database;
use crate::commons::{RoundUpdate, TimeoutSet};
use std::cmp;

Expand All @@ -14,7 +13,7 @@ use crate::msg_handler::MsgHandler;

use crate::user::committee::Committee;

use crate::{proposal, ratification, validation};
use crate::{ratification, validation};
use node_data::bls::PublicKeyBytes;

use node_data::message::Message;
Expand Down Expand Up @@ -60,11 +59,10 @@ impl RoundCommittees {

/// Represents a shared state within a context of the execution of a single
/// iteration.
pub struct IterationCtx<DB: Database> {
pub struct IterationCtx {
validation_handler: Arc<Mutex<validation::handler::ValidationHandler>>,
ratification_handler:
Arc<Mutex<ratification::handler::RatificationHandler>>,
proposal_handler: Arc<Mutex<proposal::handler::ProposalHandler<DB>>>,

pub join_set: JoinSet<()>,

Expand All @@ -79,11 +77,10 @@ pub struct IterationCtx<DB: Database> {
timeouts: TimeoutSet,
}

impl<D: Database> IterationCtx<D> {
impl IterationCtx {
pub fn new(
round: u64,
iter: u8,
proposal_handler: Arc<Mutex<proposal::handler::ProposalHandler<D>>>,
validation_handler: Arc<Mutex<validation::handler::ValidationHandler>>,
ratification_handler: Arc<
Mutex<ratification::handler::RatificationHandler>,
Expand All @@ -94,7 +91,6 @@ impl<D: Database> IterationCtx<D> {
round,
join_set: JoinSet::new(),
iter,
proposal_handler,
validation_handler,
ratification_handler,
committees: Default::default(),
Expand Down Expand Up @@ -152,12 +148,6 @@ impl<D: Database> IterationCtx<D> {
let generator = self.get_generator(msg.header.iteration);

match msg.topic() {
node_data::message::Topics::Candidate => {
let mut handler = self.proposal_handler.lock().await;
_ = handler
.collect_from_past(msg, ru, committee, generator)
.await;
}
node_data::message::Topics::Validation => {
let mut handler = self.validation_handler.lock().await;
if let Ok(HandleMsgOutput::Ready(m)) = handler
Expand All @@ -183,7 +173,7 @@ impl<D: Database> IterationCtx<D> {
}
}

impl<DB: Database> Drop for IterationCtx<DB> {
impl Drop for IterationCtx {
fn drop(&mut self) {
self.on_close();
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ macro_rules! await_phase {
pub enum Phase<T: Operations, D: Database> {
Proposal(proposal::step::ProposalStep<T, D>),
Validation(validation::step::ValidationStep<T>),
Ratification(ratification::step::RatificationStep<D>),
Ratification(ratification::step::RatificationStep),
}

impl<T: Operations + 'static, D: Database + 'static> Phase<T, D> {
Expand All @@ -54,7 +54,7 @@ impl<T: Operations + 'static, D: Database + 'static> Phase<T, D> {

pub async fn run(
&mut self,
mut ctx: ExecutionCtx<'_, D, T>,
mut ctx: ExecutionCtx<'_, T>,
) -> Result<Message, ConsensusError> {
ctx.set_start_time();

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/proposal/block_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<T: Operations> Generator<T> {

candidate.sign(&ru.secret_key, ru.pubkey_bls.inner());

Ok(Message::new_candidate(candidate))
Ok(candidate.into())
}

async fn generate_block(
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/proposal/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<T: Operations + 'static, D: Database> ProposalStep<T, D> {

pub async fn run(
&mut self,
mut ctx: ExecutionCtx<'_, D, T>,
mut ctx: ExecutionCtx<'_, T>,
) -> Result<Message, ConsensusError> {
let committee = ctx
.get_current_committee()
Expand Down
29 changes: 0 additions & 29 deletions consensus/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ impl<T: Debug + Clone> MsgRegistry<T> {
self.0.remove(&round);
}

/// Removes all messages that belong to a round greater than the specified.
pub fn remove_msgs_greater_than(&mut self, round: u64) {
self.0.split_off(&round);
}

/// Removes all messages that do not belong to the range (closed interval)
/// of keys
pub fn remove_msgs_out_of_range(&mut self, start_round: u64, offset: u64) {
Expand Down Expand Up @@ -121,30 +116,6 @@ mod tests {
assert!(reg.drain_msg_by_round_step(round, 2).is_none());
}

#[test]
fn test_remove() {
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
struct Item(i32);

let round = 100;

let mut reg = MsgRegistry::<Item>::default();
reg.put_msg(round + 1, 1, Item(1));
reg.put_msg(round + 2, 1, Item(1));
reg.put_msg(round + 3, 1, Item(1));
reg.put_msg(round, 1, Item(1));

reg.remove_msgs_greater_than(round + 2);

assert!(reg.drain_msg_by_round_step(round, 1).is_some());
assert!(reg.drain_msg_by_round_step(round + 1, 1).is_some());

assert!(reg.drain_msg_by_round_step(round + 2, 1).is_none());
assert!(reg.drain_msg_by_round_step(round + 3, 1).is_none());

assert_eq!(reg.msg_count(), 0);
}

#[test]
fn test_remove_msgs_out_of_range() {
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/ratification/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl RatificationHandler {
},
};

Message::new_quorum(quorum)
quorum.into()
}

pub(crate) fn reset(&mut self, iter: u8, validation: ValidationResult) {
Expand Down
19 changes: 7 additions & 12 deletions consensus/src/ratification/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database, RoundUpdate};
use crate::commons::{ConsensusError, RoundUpdate};
use crate::execution_ctx::ExecutionCtx;
use crate::operations::Operations;
use std::marker::PhantomData;

use crate::msg_handler::{HandleMsgOutput, MsgHandler};
use crate::ratification::handler;
Expand All @@ -19,12 +18,11 @@ use tokio::sync::Mutex;

use tracing::{info, Instrument};

pub struct RatificationStep<DB> {
pub struct RatificationStep {
handler: Arc<Mutex<handler::RatificationHandler>>,
marker: PhantomData<DB>,
}

impl<DB: Database> RatificationStep<DB> {
impl RatificationStep {
pub async fn try_vote(
ru: &RoundUpdate,
iteration: u8,
Expand All @@ -35,7 +33,7 @@ impl<DB: Database> RatificationStep<DB> {
let ratification =
self::build_ratification_payload(ru, iteration, result);

let msg = Message::new_ratification(ratification);
let msg = Message::from(ratification);

// Publish ratification vote
info!(event = "send_vote", validation_bitset = result.sv().bitset);
Expand Down Expand Up @@ -70,14 +68,11 @@ pub fn build_ratification_payload(
ratification
}

impl<DB: Database> RatificationStep<DB> {
impl RatificationStep {
pub(crate) fn new(
handler: Arc<Mutex<handler::RatificationHandler>>,
) -> Self {
Self {
handler,
marker: PhantomData,
}
Self { handler }
}

pub async fn reinitialize(
Expand Down Expand Up @@ -112,7 +107,7 @@ impl<DB: Database> RatificationStep<DB> {

pub async fn run<T: Operations + 'static>(
&mut self,
mut ctx: ExecutionCtx<'_, DB, T>,
mut ctx: ExecutionCtx<'_, T>,
) -> Result<Message, ConsensusError> {
let committee = ctx
.get_current_committee()
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/step_votes_reg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl AttInfoRegistry {

let payload = payload::Quorum { header, att };

Message::new_quorum(payload)
payload.into()
}

pub(crate) fn get_failed_atts(&self, to: u8) -> Vec<Option<IterationInfo>> {
Expand Down
Loading

0 comments on commit 65ca957

Please sign in to comment.