Skip to content

Commit

Permalink
Remove unneeded Sink trait
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed May 24, 2024
1 parent d4a0357 commit 3497c5e
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 56 deletions.
9 changes: 1 addition & 8 deletions src/operator/sink/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Display;
use std::marker::PhantomData;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
use crate::operator::sink::{Sink, StreamOutputRef};
use crate::operator::sink::StreamOutputRef;
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

Expand Down Expand Up @@ -77,13 +77,6 @@ where
}
}

impl<Out: ExchangeData, C: FromIterator<Out> + Send, PreviousOperators> Sink
for Collect<Out, C, PreviousOperators>
where
PreviousOperators: Operator<Out = Out>,
{
}

impl<Out: ExchangeData, C: FromIterator<Out> + Send, PreviousOperators> Clone
for Collect<Out, C, PreviousOperators>
where
Expand Down
6 changes: 0 additions & 6 deletions src/operator/sink/collect_channel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::fmt::Display;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
use crate::operator::sink::Sink;
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

Expand Down Expand Up @@ -67,11 +66,6 @@ where
}
}

impl<Out: ExchangeData, PreviousOperators> Sink for CollectChannelSink<Out, PreviousOperators> where
PreviousOperators: Operator<Out = Out>
{
}

#[cfg(test)]
mod tests {
use itertools::Itertools;
Expand Down
7 changes: 1 addition & 6 deletions src/operator/sink/collect_count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Display;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure};

use crate::operator::sink::{Sink, StreamOutputRef};
use crate::operator::sink::StreamOutputRef;
use crate::operator::{Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

Expand Down Expand Up @@ -71,11 +71,6 @@ where
}
}

impl<PreviousOperators> Sink for CollectCountSink<PreviousOperators> where
PreviousOperators: Operator<Out = usize>
{
}

impl<PreviousOperators> Clone for CollectCountSink<PreviousOperators>
where
PreviousOperators: Operator<Out = usize>,
Expand Down
7 changes: 1 addition & 6 deletions src/operator/sink/collect_vec.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Display;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
use crate::operator::sink::{Sink, StreamOutputRef};
use crate::operator::sink::StreamOutputRef;
use crate::operator::{ExchangeData, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;

Expand Down Expand Up @@ -75,11 +75,6 @@ where
}
}

impl<Out: ExchangeData, PreviousOperators> Sink for CollectVecSink<Out, PreviousOperators> where
PreviousOperators: Operator<Out = Out>
{
}

impl<Out: ExchangeData, PreviousOperators> Clone for CollectVecSink<Out, PreviousOperators>
where
PreviousOperators: Operator<Out = Out>,
Expand Down
16 changes: 3 additions & 13 deletions src/operator/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,23 @@
use std::sync::{Arc, Mutex};

use crate::operator::Operator;

pub(super) mod collect;
pub(super) mod collect_channel;
pub(super) mod collect_count;
pub(super) mod collect_vec;
pub(super) mod for_each;

/// This trait marks all the operators that can be used as sinks.
pub(crate) trait Sink: Operator<Out = ()> {}

pub(crate) type StreamOutputRef<Out> = Arc<Mutex<Option<Out>>>;

/// The result of a stream after the execution.
///
/// This will eventually hold the value _after_ the environment has been fully executed. To access
/// the content of the output you have to call [`StreamOutput::get`].
pub struct StreamOutput<Out> {
result: StreamOutputRef<Out>,
}
pub struct StreamOutput<Out>(StreamOutputRef<Out>);

impl<Out> From<StreamOutputRef<Out>> for StreamOutput<Out> {
fn from(value: StreamOutputRef<Out>) -> Self {
Self { result: value }
Self(value)
}
}

Expand All @@ -38,9 +31,6 @@ impl<Out> StreamOutput<Out> {
/// This will consume the result and return the owned content. If the content has already been
/// extracted, or if the content is not ready yet, this will return `None`.
pub fn get(self) -> Option<Out> {
self.result
.try_lock()
.expect("Cannot lock output result")
.take()
self.0.try_lock().expect("Cannot lock output result").take()
}
}
20 changes: 10 additions & 10 deletions src/operator/start/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,16 @@ impl<OutL: ExchangeData, OutR: ExchangeData> StartReceiver for BinaryStartReceiv
previous
}

fn cached_replicas(&self) -> usize {
let mut cached = 0;
if self.left.cached {
cached += self.left.instances
}
if self.right.cached {
cached += self.right.instances
}
cached
}
// fn cached_replicas(&self) -> usize {
// let mut cached = 0;
// if self.left.cached {
// cached += self.left.instances
// }
// if self.right.cached {
// cached += self.right.instances
// }
// cached
// }

fn recv_timeout(
&mut self,
Expand Down
6 changes: 3 additions & 3 deletions src/operator/start/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub(crate) trait StartReceiver: Clone {
/// This list should contain all the replicas this receiver will receive data from.
fn prev_replicas(&self) -> Vec<Coord>;

/// The number of those replicas which are behind a cache, and therefore never will emit a
/// `StreamElement::Terminate` message.
fn cached_replicas(&self) -> usize;
// /// The number of those replicas which are behind a cache, and therefore never will emit a
// /// `StreamElement::Terminate` message.
// fn cached_replicas(&self) -> usize;

/// Try to receive a batch from the previous blocks, or fail with an error if the timeout
/// expires.
Expand Down
4 changes: 0 additions & 4 deletions src/operator/start/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ impl<Out: ExchangeData> StartReceiver for SimpleStartReceiver<Out> {
self.previous_replicas.clone()
}

fn cached_replicas(&self) -> usize {
0
}

fn recv_timeout(&mut self, timeout: Duration) -> Result<NetworkMessage<Out>, RecvTimeoutError> {
let receiver = self.receiver.as_mut().unwrap();
receiver.recv_timeout(timeout)
Expand Down
9 changes: 9 additions & 0 deletions src/operator/window/aggr/collect_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,13 @@ where
};
self.add_window_operator("WindowMap", acc)
}

pub fn to_vec(self) -> KeyedStream<impl Operator<Out = (Key, Vec<Out>)>> {
let acc = CollectVec::<Out, Vec<Out>, _> {
vec: Default::default(),
f: |v| v,
_o: PhantomData,
};
self.add_window_operator("WindowMap", acc)
}
}

0 comments on commit 3497c5e

Please sign in to comment.