Skip to content

Commit

Permalink
node: fix clippy suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Dec 2, 2023
1 parent 93bbcec commit f84c0ef
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 152 deletions.
16 changes: 10 additions & 6 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>

// NB. After restart, state_root returned by VM is always the last
// finalized one.
let mut state_root = vm.read().await.get_state_root()?;
let state_root = vm.read().await.get_state_root()?;

info!(
event = "VM state loaded",
Expand Down Expand Up @@ -168,15 +168,19 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
Payload::NewBlock(_)
| Payload::Reduction(_)
| Payload::Agreement(_) => {
acc.read().await.reroute_msg(msg).await;
if let Err(e) = acc.read().await.reroute_msg(msg).await {
warn!("Unable to reroute_msg to the acceptor: {e}");
}
}
_ => warn!("invalid inbound message"),
}
},
// Re-routes messages originated from Consensus (upper) layer to the network layer.
recv = &mut outbound_chan.recv() => {
let msg = recv?;
network.read().await.broadcast(&msg).await;
if let Err(e) = network.read().await.broadcast(&msg).await {
warn!("Unable to re-route message {e}");
}
},
// Handles accept_block_timeout event
_ = sleep_until(timeout) => {
Expand Down Expand Up @@ -220,8 +224,8 @@ impl ChainSrv {
// either malformed or empty.
let genesis_blk = genesis::generate_state();

/// Persist genesis block
t.store_block(genesis_blk.header(), &[]);
// Persist genesis block
t.store_block(genesis_blk.header(), &[])?;
genesis_blk
}
};
Expand All @@ -246,7 +250,7 @@ impl ChainSrv {
}

Ok(())
});
})?;

tracing::info!(
event = "Ledger block loaded",
Expand Down
31 changes: 15 additions & 16 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use dusk_consensus::config::{self, SELECTION_COMMITTEE_SIZE};

use super::consensus::Task;

#[allow(dead_code)]
pub(crate) enum RevertTarget {
LastFinalizedState = 0,
LastEpoch = 1,
Expand Down Expand Up @@ -69,7 +70,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
network: Arc<RwLock<N>>,
vm: Arc<RwLock<VM>>,
) -> Self {
let mut acc = Self {
let acc = Self {
mrb: RwLock::new(mrb.clone()),
provisioners_list: RwLock::new(provisioners_list.clone()),
db: db.clone(),
Expand All @@ -90,16 +91,20 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
acc
}
// Re-route message to consensus task
pub(crate) async fn reroute_msg(&self, msg: Message) {
pub(crate) async fn reroute_msg(
&self,
msg: Message,
) -> Result<(), async_channel::SendError<Message>> {
match &msg.payload {
Payload::NewBlock(_) | Payload::Reduction(_) => {
self.task.read().await.main_inbound.send(msg).await;
self.task.read().await.main_inbound.send(msg).await?;
}
Payload::Agreement(_) => {
self.task.read().await.agreement_inbound.send(msg).await;
self.task.read().await.agreement_inbound.send(msg).await?;
}
_ => warn!("invalid inbound message"),
}
Ok(())
}

pub fn needs_update(blk: &Block, txs: &[SpentTransaction]) -> bool {
Expand Down Expand Up @@ -127,7 +132,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
blk: &Block,
) -> anyhow::Result<()> {
let mut task = self.task.write().await;
let (_, public_key) = task.keys.clone();

let mut mrb = self.mrb.write().await;
let mut provisioners_list = self.provisioners_list.write().await;
Expand Down Expand Up @@ -206,7 +210,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
}

pub(crate) async fn try_accept_block(
&self,
&mut self,
blk: &Block,
enable_consensus: bool,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -280,7 +284,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// Delete from mempool any transaction already included in the block
self.db.read().await.update(|update| {
for tx in blk.txs().iter() {
database::Mempool::delete_tx(update, tx.hash());
database::Mempool::delete_tx(update, tx.hash())?;
}
Ok(())
})?;
Expand Down Expand Up @@ -322,7 +326,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
/// This incorporates both VM state revert and Ledger state revert.
pub async fn try_revert(&self, target: RevertTarget) -> Result<()> {
let curr_height = self.get_curr_height().await;
let curr_iteration = self.get_curr_iteration().await;

let target_state_hash = match target {
RevertTarget::LastFinalizedState => {
Expand All @@ -336,7 +339,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {

anyhow::Ok(state_hash)
}
RevertTarget::LastEpoch => panic!("not implemented"),
_ => unimplemented!(),
}?;

// Delete any block until we reach the target_state_hash, the
Expand Down Expand Up @@ -369,9 +372,9 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// Attempt to resubmit transactions back to mempool.
// An error here is not considered critical.
for tx in blk.txs().iter() {
Mempool::add_tx(t, tx).map_err(|err| {
error!("failed to resubmit transactions")
});
if let Err(e) = Mempool::add_tx(t, tx) {
warn!("failed to resubmit transactions: {e}")
};
}

height -= 1;
Expand Down Expand Up @@ -407,10 +410,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
self.last_finalized.read().await.clone()
}

pub(crate) async fn get_curr_timestamp(&self) -> i64 {
self.mrb.read().await.header().timestamp
}

pub(crate) async fn get_curr_iteration(&self) -> u8 {
self.mrb.read().await.header().iteration
}
Expand Down
45 changes: 33 additions & 12 deletions node/src/chain/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,16 @@ impl Task {
);

let id = self.task_id;
let mut result_queue = self.result.clone();
let result_queue = self.result.clone();
let provisioners = provisioners.clone();
let (cancel_tx, cancel_rx) = oneshot::channel::<i32>();

self.running_task = Some((
tokio::spawn(async move {
result_queue
.send(c.spin(ru, provisioners, cancel_rx).await)
.await;
let cons_result = c.spin(ru, provisioners, cancel_rx).await;
if let Err(e) = result_queue.send(cons_result).await {
error!("Unable to send consensus result to queue {e}")
}

trace!("terminate consensus task: {}", id);
id
Expand All @@ -126,14 +127,20 @@ impl Task {
/// Aborts the running consensus task and waits for its termination.
pub(crate) async fn abort_with_wait(&mut self) {
if let Some((handle, cancel_chan)) = self.running_task.take() {
cancel_chan.send(0);
handle.await;
if cancel_chan.send(0).is_err() {
warn!("Unable to send cancel for abort_with_wait")
}
if let Err(e) = handle.await {
warn!("Unable to wait for abort {e}")
}
}
}

pub(crate) fn abort(&mut self) {
if let Some((handle, cancel_chan)) = self.running_task.take() {
cancel_chan.send(0);
if let Some((_, cancel_chan)) = self.running_task.take() {
if cancel_chan.send(0).is_err() {
warn!("Unable to send cancel for abort")
};
}
}
}
Expand All @@ -159,8 +166,15 @@ impl<DB: database::DB, N: Network> dusk_consensus::commons::Database
fn store_candidate_block(&mut self, b: Block) {
tracing::trace!("store candidate block: {:?}", b);

if let Ok(db) = self.db.try_read() {
db.update(|t| t.store_candidate_block(b));
match self.db.try_read() {
Ok(db) => {
if let Err(e) = db.update(|t| t.store_candidate_block(b)) {
warn!("Unable to store candidate block: {e}");
};
}
Err(e) => {
warn!("Cannot acquire lock to store candidate block: {e}");
}
}
}

Expand Down Expand Up @@ -220,8 +234,15 @@ impl<DB: database::DB, N: Network> dusk_consensus::commons::Database
}

fn delete_candidate_blocks(&mut self) {
if let Ok(db) = self.db.try_read() {
db.update(|t| t.clear_candidates());
match self.db.try_read() {
Ok(db) => {
if let Err(e) = db.update(|t| t.clear_candidates()) {
warn!("Unable to cleare candidates: {e}");
};
}
Err(e) => {
warn!("Cannot acquire lock to clear_candidate: {e}");
}
}
}
}
Expand Down
59 changes: 34 additions & 25 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
}
}

pub async fn on_idle(&mut self, timeout: Duration) -> anyhow::Result<()> {
pub async fn on_idle(&mut self, timeout: Duration) {
let acc = self.acc.read().await;
let height = acc.get_curr_height().await;
let iter = acc.get_curr_iteration().await;
Expand All @@ -77,18 +77,18 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
self.blacklisted_blocks.write().await.clear();

// Request missing blocks since my last finalized block
self.network
.write()
let get_blocks = Message::new_get_blocks(GetBlocks {
locator: last_finalized.header().hash,
});
if let Err(e) = self
.network
.read()
.await
.send_to_alive_peers(
&Message::new_get_blocks(GetBlocks {
locator: last_finalized.header().hash,
}),
REDUNDANCY_PEER_FACTOR,
)
.await;

Ok(())
.send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR)
.await
{
warn!("Unable to request GetBlocks {e}");
}
}

pub async fn on_event(
Expand Down Expand Up @@ -117,8 +117,8 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
match &mut self.curr {
State::InSync(ref mut curr) => {
if curr.on_event(blk, msg).await? {
/// Transition from InSync to OutOfSync state
curr.on_exiting();
// Transition from InSync to OutOfSync state
curr.on_exiting().await;

// Enter new state
let mut next = OutOfSyncImpl::new(
Expand All @@ -131,16 +131,19 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
}
State::OutOfSync(ref mut curr) => {
if curr.on_event(blk, msg).await? {
/// Transition from OutOfSync to InSync state
curr.on_exiting();
// Transition from OutOfSync to InSync state
curr.on_exiting().await;

// Enter new state
let mut next = InSyncImpl::new(
self.acc.clone(),
self.network.clone(),
self.blacklisted_blocks.clone(),
);
next.on_entering(blk).await;
next.on_entering(blk).await.map_err(|e| {
error!("Unable to enter in_sync state: {e}");
e
})?;
self.curr = State::InSync(next);
}
}
Expand Down Expand Up @@ -171,7 +174,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {

/// performed when entering the state
async fn on_entering(&mut self, blk: &Block) -> anyhow::Result<()> {
let acc = self.acc.write().await;
let mut acc = self.acc.write().await;
let curr_h = acc.get_curr_height().await;

if blk.header().height == curr_h + 1 {
Expand All @@ -191,7 +194,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
blk: &Block,
msg: &Message,
) -> anyhow::Result<bool> {
let acc = self.acc.write().await;
let mut acc = self.acc.write().await;
let h = blk.header().height;
let curr_h = acc.get_curr_height().await;
let iter = acc.get_curr_iteration().await;
Expand Down Expand Up @@ -263,7 +266,9 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {

// When accepting block from the wire in inSync state, we
// rebroadcast it
self.network.write().await.broadcast(msg).await;
if let Err(e) = self.network.write().await.broadcast(msg).await {
warn!("Unable to broadcast accepted block: {e}");
}

return Ok(false);
}
Expand All @@ -273,7 +278,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
}

fn allow_transition(&self, msg: &Message) -> anyhow::Result<()> {
let recv_peer = msg
let _recv_peer = msg
.metadata
.as_ref()
.map(|m| m.src_addr)
Expand Down Expand Up @@ -334,11 +339,15 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
// Request missing blocks from source peer
let gb_msg = Message::new_get_blocks(GetBlocks { locator });

self.network
.write()
if let Err(e) = self
.network
.read()
.await
.send_to_peer(&gb_msg, dest_addr)
.await;
.await
{
warn!("Unable to send GetBlocks: {e}")
};

// add to the pool
let key = blk.header().height;
Expand All @@ -364,7 +373,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
blk: &Block,
msg: &Message,
) -> anyhow::Result<bool> {
let acc = self.acc.write().await;
let mut acc = self.acc.write().await;
let h = blk.header().height;

if self
Expand Down
Loading

0 comments on commit f84c0ef

Please sign in to comment.