From e0d90a3621a50cac6606f1361eccbb0cd353870d Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 29 Sep 2023 12:20:32 +0300 Subject: [PATCH] chore: Remove flume crate (#86) --- Cargo.lock | 23 +---------------------- node/Cargo.toml | 1 - node/src/p2p.rs | 16 +++++++--------- 3 files changed, 8 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a044fc1..4877fa8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -575,7 +575,6 @@ dependencies = [ "celestia-types", "dashmap", "dotenvy", - "flume", "futures", "getrandom 0.2.10", "libp2p", @@ -1204,17 +1203,6 @@ dependencies = [ "paste", ] -[[package]] -name = "flume" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" -dependencies = [ - "futures-core", - "futures-sink", - "spin 0.9.8", -] - [[package]] name = "fnv" version = "1.0.7" @@ -3275,7 +3263,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin 0.5.2", + "spin", "untrusted", "web-sys", "winapi", @@ -3736,15 +3724,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -dependencies = [ - "lock_api", -] - [[package]] name = "spki" version = "0.7.2" diff --git a/node/Cargo.toml b/node/Cargo.toml index 49ca32a3..d7d9dd8f 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -11,7 +11,6 @@ tendermint-proto = { workspace = true } async-trait = "0.1.73" dashmap = "5.5.3" -flume = { version = "0.11.0", default-features = false, features = ["async"] } futures = "0.3.28" libp2p = { version = "0.52.3", features = [ "autonat", diff --git a/node/src/p2p.rs b/node/src/p2p.rs index fb08b602..6236a18b 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -23,7 +23,7 @@ use libp2p::{ }; use tendermint_proto::Protobuf; use tokio::select; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, info, instrument, trace, warn}; use crate::exchange::{ExchangeBehaviour, ExchangeConfig}; @@ -75,7 +75,7 @@ impl From for P2pError { #[derive(Debug)] pub struct P2p { - cmd_tx: flume::Sender, + cmd_tx: mpsc::Sender, _store: PhantomData, } @@ -119,7 +119,7 @@ where type Error = P2pError; async fn start(args: P2pArgs) -> Result { - let (cmd_tx, cmd_rx) = flume::bounded(16); + let (cmd_tx, cmd_rx) = mpsc::channel(16); let mut worker = Worker::new(args, cmd_rx)?; spawn(async move { @@ -139,7 +139,7 @@ where async fn send_command(&self, cmd: P2pCmd) -> Result<()> { self.cmd_tx - .send_async(cmd) + .send(cmd) .await .map_err(|_| P2pError::WorkerDied) } @@ -286,7 +286,7 @@ where { swarm: Swarm>, header_sub_topic_hash: TopicHash, - cmd_rx: flume::Receiver, + cmd_rx: mpsc::Receiver, peer_tracker: Arc, wait_connected_tx: Option>>, } @@ -295,7 +295,7 @@ impl Worker where S: Store + 'static, { - fn new(args: P2pArgs, cmd_rx: flume::Receiver) -> Result { + fn new(args: P2pArgs, cmd_rx: mpsc::Receiver) -> Result { let peer_tracker = Arc::new(PeerTracker::new()); let local_peer_id = PeerId::from(args.local_keypair.public()); @@ -350,8 +350,6 @@ where } async fn run(&mut self) { - let mut cmd_stream = self.cmd_rx.clone().into_stream().fuse(); - loop { select! { ev = self.swarm.select_next_some() => { @@ -359,7 +357,7 @@ where warn!("Failure while handling swarm event: {e}"); } }, - Some(cmd) = cmd_stream.next() => { + Some(cmd) = self.cmd_rx.recv() => { if let Err(e) = self.on_cmd(cmd).await { warn!("Failure while handling command. (error: {e})"); }