Skip to content

Commit

Permalink
Merge pull request #1167 from dusk-network/node_clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia authored Dec 4, 2023
2 parents 02ae1e9 + 246bac3 commit 9345cf4
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 261 deletions.
39 changes: 15 additions & 24 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,24 @@ mod fallback;
mod fsm;
mod genesis;

use crate::database::{Candidate, Ledger, Mempool};
use crate::database::Ledger;
use crate::{database, vm, Network};
use crate::{LongLivedService, Message};
use anyhow::{anyhow, bail, Result};
use anyhow::Result;

use dusk_consensus::user::committee::CommitteeSet;
use std::rc::Rc;
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use tracing::{error, info, warn};

use async_trait::async_trait;
use dusk_consensus::commons::{ConsensusError, Database, RoundUpdate};
use dusk_consensus::consensus::Consensus;
use dusk_consensus::contract_state::{
CallParams, Error, Operations, Output, StateRoot,
};
use dusk_consensus::user::provisioners::Provisioners;
use node_data::ledger::{self, to_str, Block, Hash, Header};
use node_data::ledger::{to_str, Block};
use node_data::message::AsyncQueue;
use node_data::message::{Payload, Topics};
use node_data::Serializable;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::sync::RwLock;
use tokio::time::{sleep_until, Instant};

use node_data::message::payload::GetBlocks;
use std::any;
use std::time::Duration;

use self::acceptor::{Acceptor, RevertTarget};
use self::consensus::Task;
use self::fsm::SimpleFSM;

pub use acceptor::verify_block_cert;
Expand Down Expand Up @@ -102,7 +89,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>

// NB. After restart, state_root returned by VM is always the last
// finalized one.
let mut state_root = vm.read().await.get_state_root()?;
let state_root = vm.read().await.get_state_root()?;

info!(
event = "VM state loaded",
Expand Down Expand Up @@ -181,15 +168,19 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
Payload::NewBlock(_)
| Payload::Reduction(_)
| Payload::Agreement(_) => {
acc.read().await.reroute_msg(msg).await;
if let Err(e) = acc.read().await.reroute_msg(msg).await {
warn!("Unable to reroute_msg to the acceptor: {e}");
}
}
_ => warn!("invalid inbound message"),
}
},
// Re-routes messages originated from Consensus (upper) layer to the network layer.
recv = &mut outbound_chan.recv() => {
let msg = recv?;
network.read().await.broadcast(&msg).await;
if let Err(e) = network.read().await.broadcast(&msg).await {
warn!("Unable to re-route message {e}");
}
},
// Handles accept_block_timeout event
_ = sleep_until(timeout) => {
Expand Down Expand Up @@ -233,8 +224,8 @@ impl ChainSrv {
// either malformed or empty.
let genesis_blk = genesis::generate_state();

/// Persist genesis block
t.store_block(genesis_blk.header(), &[]);
// Persist genesis block
t.store_block(genesis_blk.header(), &[])?;
genesis_blk
}
};
Expand All @@ -259,7 +250,7 @@ impl ChainSrv {
}

Ok(())
});
})?;

tracing::info!(
event = "Ledger block loaded",
Expand Down
62 changes: 22 additions & 40 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,28 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::database::{Candidate, Ledger, Mempool};
use crate::{database, vm, Network};
use crate::{LongLivedService, Message};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use dusk_bls12_381_sign::PublicKey;
use dusk_consensus::commons::{
ConsensusError, Database, IterCounter, RoundUpdate, StepName,
};
use dusk_consensus::consensus::{self, Consensus};
use dusk_consensus::contract_state::{
CallParams, Error, Operations, Output, StateRoot,
};
use crate::database::{self, Ledger, Mempool};
use crate::{vm, Message, Network};
use anyhow::{anyhow, Result};
use dusk_consensus::commons::{ConsensusError, IterCounter, StepName};
use dusk_consensus::user::committee::{Committee, CommitteeSet};
use dusk_consensus::user::provisioners::Provisioners;
use dusk_consensus::user::sortition;
use hex::ToHex;
use node_data::ledger::{
self, to_str, Block, Hash, Header, Seed, Signature, SpentTransaction,
self, to_str, Block, Seed, Signature, SpentTransaction,
};
use node_data::message::AsyncQueue;
use node_data::message::{Payload, Topics};
use node_data::Serializable;
use std::cell::RefCell;
use std::rc::Rc;
use node_data::message::Payload;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::sync::{Mutex, RwLock};
use tracing::{error, info, warn};

use dusk_consensus::agreement::verifiers;
use dusk_consensus::config::{self, SELECTION_COMMITTEE_SIZE};
use std::any;
use std::collections::HashMap;

use super::consensus::Task;
use super::genesis;

#[allow(dead_code)]
pub(crate) enum RevertTarget {
LastFinalizedState = 0,
LastEpoch = 1,
Expand Down Expand Up @@ -86,7 +70,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
network: Arc<RwLock<N>>,
vm: Arc<RwLock<VM>>,
) -> Self {
let mut acc = Self {
let acc = Self {
mrb: RwLock::new(mrb.clone()),
provisioners_list: RwLock::new(provisioners_list.clone()),
db: db.clone(),
Expand All @@ -107,16 +91,20 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
acc
}
// Re-route message to consensus task
pub(crate) async fn reroute_msg(&self, msg: Message) {
pub(crate) async fn reroute_msg(
&self,
msg: Message,
) -> Result<(), async_channel::SendError<Message>> {
match &msg.payload {
Payload::NewBlock(_) | Payload::Reduction(_) => {
self.task.read().await.main_inbound.send(msg).await;
self.task.read().await.main_inbound.send(msg).await?;
}
Payload::Agreement(_) => {
self.task.read().await.agreement_inbound.send(msg).await;
self.task.read().await.agreement_inbound.send(msg).await?;
}
_ => warn!("invalid inbound message"),
}
Ok(())
}

pub fn needs_update(blk: &Block, txs: &[SpentTransaction]) -> bool {
Expand Down Expand Up @@ -144,7 +132,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
blk: &Block,
) -> anyhow::Result<()> {
let mut task = self.task.write().await;
let (_, public_key) = task.keys.clone();

let mut mrb = self.mrb.write().await;
let mut provisioners_list = self.provisioners_list.write().await;
Expand Down Expand Up @@ -223,7 +210,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
}

pub(crate) async fn try_accept_block(
&self,
&mut self,
blk: &Block,
enable_consensus: bool,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -297,7 +284,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// Delete from mempool any transaction already included in the block
self.db.read().await.update(|update| {
for tx in blk.txs().iter() {
database::Mempool::delete_tx(update, tx.hash());
database::Mempool::delete_tx(update, tx.hash())?;
}
Ok(())
})?;
Expand Down Expand Up @@ -339,7 +326,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
/// This incorporates both VM state revert and Ledger state revert.
pub async fn try_revert(&self, target: RevertTarget) -> Result<()> {
let curr_height = self.get_curr_height().await;
let curr_iteration = self.get_curr_iteration().await;

let target_state_hash = match target {
RevertTarget::LastFinalizedState => {
Expand All @@ -353,7 +339,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {

anyhow::Ok(state_hash)
}
RevertTarget::LastEpoch => panic!("not implemented"),
_ => unimplemented!(),
}?;

// Delete any block until we reach the target_state_hash, the
Expand Down Expand Up @@ -386,9 +372,9 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// Attempt to resubmit transactions back to mempool.
// An error here is not considered critical.
for tx in blk.txs().iter() {
Mempool::add_tx(t, tx).map_err(|err| {
error!("failed to resubmit transactions")
});
if let Err(e) = Mempool::add_tx(t, tx) {
warn!("failed to resubmit transactions: {e}")
};
}

height -= 1;
Expand Down Expand Up @@ -424,10 +410,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
self.last_finalized.read().await.clone()
}

pub(crate) async fn get_curr_timestamp(&self) -> i64 {
self.mrb.read().await.header().timestamp
}

pub(crate) async fn get_curr_iteration(&self) -> u8 {
self.mrb.read().await.header().iteration
}
Expand Down
61 changes: 39 additions & 22 deletions node/src/chain/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,24 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::database::{Candidate, Ledger, Mempool};
use crate::{database, vm, Network};
use crate::{LongLivedService, Message};
use anyhow::bail;
use crate::database::{self, Candidate, Mempool};
use crate::{vm, Message, Network};
use async_trait::async_trait;
use dusk_consensus::commons::{ConsensusError, Database, RoundUpdate};
use dusk_consensus::commons::{ConsensusError, RoundUpdate};
use dusk_consensus::consensus::Consensus;
use dusk_consensus::contract_state::{
CallParams, Error, Operations, Output, StateRoot, VerificationOutput,
CallParams, Error, Operations, Output, VerificationOutput,
};
use dusk_consensus::user::provisioners::Provisioners;
use node_data::ledger::{Block, Hash, Transaction};
use node_data::message::payload::{self, GetCandidate};
use node_data::message::payload::GetCandidate;
use node_data::message::AsyncQueue;
use node_data::message::{Payload, Topics};
use node_data::Serializable;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::task::JoinHandle;
use tracing::{error, info, trace};
use tracing::{error, info, trace, warn};

use std::sync::Arc;
use std::{any, vec};

/// Consensus Service Task is responsible for running the consensus layer.
///
Expand Down Expand Up @@ -110,15 +106,16 @@ impl Task {
);

let id = self.task_id;
let mut result_queue = self.result.clone();
let result_queue = self.result.clone();
let provisioners = provisioners.clone();
let (cancel_tx, cancel_rx) = oneshot::channel::<i32>();

self.running_task = Some((
tokio::spawn(async move {
result_queue
.send(c.spin(ru, provisioners, cancel_rx).await)
.await;
let cons_result = c.spin(ru, provisioners, cancel_rx).await;
if let Err(e) = result_queue.send(cons_result).await {
error!("Unable to send consensus result to queue {e}")
}

trace!("terminate consensus task: {}", id);
id
Expand All @@ -130,14 +127,20 @@ impl Task {
/// Aborts the running consensus task and waits for its termination.
pub(crate) async fn abort_with_wait(&mut self) {
if let Some((handle, cancel_chan)) = self.running_task.take() {
cancel_chan.send(0);
handle.await;
if cancel_chan.send(0).is_err() {
warn!("Unable to send cancel for abort_with_wait")
}
if let Err(e) = handle.await {
warn!("Unable to wait for abort {e}")
}
}
}

pub(crate) fn abort(&mut self) {
if let Some((handle, cancel_chan)) = self.running_task.take() {
cancel_chan.send(0);
if let Some((_, cancel_chan)) = self.running_task.take() {
if cancel_chan.send(0).is_err() {
warn!("Unable to send cancel for abort")
};
}
}
}
Expand All @@ -163,8 +166,15 @@ impl<DB: database::DB, N: Network> dusk_consensus::commons::Database
fn store_candidate_block(&mut self, b: Block) {
tracing::trace!("store candidate block: {:?}", b);

if let Ok(db) = self.db.try_read() {
db.update(|t| t.store_candidate_block(b));
match self.db.try_read() {
Ok(db) => {
if let Err(e) = db.update(|t| t.store_candidate_block(b)) {
warn!("Unable to store candidate block: {e}");
};
}
Err(e) => {
warn!("Cannot acquire lock to store candidate block: {e}");
}
}
}

Expand Down Expand Up @@ -224,8 +234,15 @@ impl<DB: database::DB, N: Network> dusk_consensus::commons::Database
}

fn delete_candidate_blocks(&mut self) {
if let Ok(db) = self.db.try_read() {
db.update(|t| t.clear_candidates());
match self.db.try_read() {
Ok(db) => {
if let Err(e) = db.update(|t| t.clear_candidates()) {
warn!("Unable to cleare candidates: {e}");
};
}
Err(e) => {
warn!("Cannot acquire lock to clear_candidate: {e}");
}
}
}
}
Expand Down
14 changes: 4 additions & 10 deletions node/src/chain/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,13 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::sync::Arc;

use anyhow::{anyhow, bail, Result};
use node_data::{
bls::PublicKey,
ledger::{self, Block, Hash, Header},
};
use tokio::sync::RwLock;
use tracing::{info, warn};
use anyhow::{anyhow, Result};
use node_data::ledger::Block;
use tracing::info;

use crate::{
chain::acceptor,
database::{self, Ledger, Mempool},
database::{self, Ledger},
vm, Network,
};

Expand Down
Loading

0 comments on commit 9345cf4

Please sign in to comment.