From 586c35f6beecbbec04b7e43f3a642a0472dbc05b Mon Sep 17 00:00:00 2001 From: imDema Date: Thu, 25 Jul 2024 14:55:19 +0200 Subject: [PATCH] Minor async fix --- src/network/tokio/demultiplexer.rs | 5 ++--- src/operator/map_async.rs | 2 +- src/operator/source/channel.rs | 19 +++++++++++-------- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/network/tokio/demultiplexer.rs b/src/network/tokio/demultiplexer.rs index 515131a4..0cbfd94b 100644 --- a/src/network/tokio/demultiplexer.rs +++ b/src/network/tokio/demultiplexer.rs @@ -5,7 +5,6 @@ use tokio::net::{TcpListener, TcpStream}; #[cfg(feature = "tokio")] use tokio::task::JoinHandle; -use anyhow::anyhow; use std::collections::HashMap; use std::net::ToSocketAddrs; @@ -82,12 +81,12 @@ async fn bind_remotes( let listener = TcpListener::bind(&*address) .await .map_err(|e| { - anyhow!( + panic!( "Failed to bind socket for {} at {:?}: {:?}", coord, address, e - ) + ) // TODO }) .unwrap(); let address = listener diff --git a/src/operator/map_async.rs b/src/operator/map_async.rs index e4089b44..69cb1fea 100644 --- a/src/operator/map_async.rs +++ b/src/operator/map_async.rs @@ -213,7 +213,7 @@ where } let el = self.prev.next(); - let kind = el.take(); + let kind = el.variant(); if let Some(b) = self.batcher.enqueue(el) { self.schedule_batch(b); diff --git a/src/operator/source/channel.rs b/src/operator/source/channel.rs index 5c0c3374..2a8065b5 100644 --- a/src/operator/source/channel.rs +++ b/src/operator/source/channel.rs @@ -4,10 +4,10 @@ use flume::{bounded, Receiver, RecvError, Sender, TryRecvError}; use crate::block::{BlockStructure, OperatorKind, OperatorStructure, Replication}; use crate::operator::source::Source; -use crate::operator::{Operator, StreamElement}; +use crate::operator::{replication, Operator, StreamElement}; use crate::scheduler::ExecutionMetadata; -const MAX_RETRY: u8 = 8; +const MAX_RETRY: u8 = 16; /// Source that consumes an iterator and emits all its elements into the stream. /// @@ -20,6 +20,7 @@ pub struct ChannelSource { rx: Receiver, terminated: bool, retry_count: u8, + replication: Replication, } impl Display for ChannelSource { @@ -31,10 +32,11 @@ impl Display for ChannelSource { impl ChannelSource { /// Create a new source that reads the items from the iterator provided as input. /// - /// **Note**: this source is **not parallel**, the iterator will be consumed only on a single - /// replica, on all the others no item will be read from the iterator. If you want to achieve - /// parallelism you need to add an operator that shuffles the data (e.g. - /// [`Stream::shuffle`](crate::Stream::shuffle)). + /// **Note**: the replication of this source is determined by the `replication` parameter + /// The channel is an MPMC channel so items will be captured by one and only one of the replicas + /// with no specified order. Developers must take into account this replication when sending items + /// to the channel in order for messages to be delivered to renoir (eg. if replication is One, + /// only the host with id 0 should send messages to the channel) /// /// ## Example /// @@ -47,12 +49,13 @@ impl ChannelSource { /// tx_channel.send(1); /// tx_channel.send(2); /// ``` - pub fn new(channel_size: usize) -> (Sender, Self) { + pub fn new(channel_size: usize, replication: Replication) -> (Sender, Self) { let (tx, rx) = bounded(channel_size); let s = Self { rx, terminated: false, retry_count: 0, + replication, }; (tx, s) @@ -61,7 +64,7 @@ impl ChannelSource { // TODO: remove Debug requirement impl Source for ChannelSource { fn replication(&self) -> Replication { - Replication::One + self.replication } }