Skip to content

Commit

Permalink
chore: Remove flume crate (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique authored Sep 29, 2023
1 parent 3a172fd commit e0d90a3
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 32 deletions.
23 changes: 1 addition & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ tendermint-proto = { workspace = true }

async-trait = "0.1.73"
dashmap = "5.5.3"
flume = { version = "0.11.0", default-features = false, features = ["async"] }
futures = "0.3.28"
libp2p = { version = "0.52.3", features = [
"autonat",
Expand Down
16 changes: 7 additions & 9 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use libp2p::{
};
use tendermint_proto::Protobuf;
use tokio::select;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, instrument, trace, warn};

use crate::exchange::{ExchangeBehaviour, ExchangeConfig};
Expand Down Expand Up @@ -75,7 +75,7 @@ impl From<oneshot::error::RecvError> for P2pError {

#[derive(Debug)]
pub struct P2p<S> {
cmd_tx: flume::Sender<P2pCmd>,
cmd_tx: mpsc::Sender<P2pCmd>,
_store: PhantomData<S>,
}

Expand Down Expand Up @@ -119,7 +119,7 @@ where
type Error = P2pError;

async fn start(args: P2pArgs<S>) -> Result<Self, P2pError> {
let (cmd_tx, cmd_rx) = flume::bounded(16);
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let mut worker = Worker::new(args, cmd_rx)?;

spawn(async move {
Expand All @@ -139,7 +139,7 @@ where

async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
self.cmd_tx
.send_async(cmd)
.send(cmd)
.await
.map_err(|_| P2pError::WorkerDied)
}
Expand Down Expand Up @@ -286,7 +286,7 @@ where
{
swarm: Swarm<Behaviour<S>>,
header_sub_topic_hash: TopicHash,
cmd_rx: flume::Receiver<P2pCmd>,
cmd_rx: mpsc::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
wait_connected_tx: Option<Vec<oneshot::Sender<()>>>,
}
Expand All @@ -295,7 +295,7 @@ impl<S> Worker<S>
where
S: Store + 'static,
{
fn new(args: P2pArgs<S>, cmd_rx: flume::Receiver<P2pCmd>) -> Result<Self, P2pError> {
fn new(args: P2pArgs<S>, cmd_rx: mpsc::Receiver<P2pCmd>) -> Result<Self, P2pError> {
let peer_tracker = Arc::new(PeerTracker::new());
let local_peer_id = PeerId::from(args.local_keypair.public());

Expand Down Expand Up @@ -350,16 +350,14 @@ where
}

async fn run(&mut self) {
let mut cmd_stream = self.cmd_rx.clone().into_stream().fuse();

loop {
select! {
ev = self.swarm.select_next_some() => {
if let Err(e) = self.on_swarm_event(ev).await {
warn!("Failure while handling swarm event: {e}");
}
},
Some(cmd) = cmd_stream.next() => {
Some(cmd) = self.cmd_rx.recv() => {
if let Err(e) = self.on_cmd(cmd).await {
warn!("Failure while handling command. (error: {e})");
}
Expand Down

0 comments on commit e0d90a3

Please sign in to comment.