From 3497c5edbe7f06e4fcb1772ea870062dad7ce841 Mon Sep 17 00:00:00 2001 From: imDema Date: Fri, 24 May 2024 12:19:26 +0200 Subject: [PATCH] Remove unneeded Sink trait --- src/operator/sink/collect.rs | 9 +-------- src/operator/sink/collect_channel.rs | 6 ------ src/operator/sink/collect_count.rs | 7 +------ src/operator/sink/collect_vec.rs | 7 +------ src/operator/sink/mod.rs | 16 +++------------- src/operator/start/binary.rs | 20 ++++++++++---------- src/operator/start/mod.rs | 6 +++--- src/operator/start/simple.rs | 4 ---- src/operator/window/aggr/collect_vec.rs | 9 +++++++++ 9 files changed, 28 insertions(+), 56 deletions(-) diff --git a/src/operator/sink/collect.rs b/src/operator/sink/collect.rs index cd1f3631..b7c01239 100644 --- a/src/operator/sink/collect.rs +++ b/src/operator/sink/collect.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use std::marker::PhantomData; use crate::block::{BlockStructure, OperatorKind, OperatorStructure}; -use crate::operator::sink::{Sink, StreamOutputRef}; +use crate::operator::sink::StreamOutputRef; use crate::operator::{ExchangeData, Operator, StreamElement}; use crate::scheduler::ExecutionMetadata; @@ -77,13 +77,6 @@ where } } -impl + Send, PreviousOperators> Sink - for Collect -where - PreviousOperators: Operator, -{ -} - impl + Send, PreviousOperators> Clone for Collect where diff --git a/src/operator/sink/collect_channel.rs b/src/operator/sink/collect_channel.rs index 5975cd52..24beee2d 100644 --- a/src/operator/sink/collect_channel.rs +++ b/src/operator/sink/collect_channel.rs @@ -1,7 +1,6 @@ use std::fmt::Display; use crate::block::{BlockStructure, OperatorKind, OperatorStructure}; -use crate::operator::sink::Sink; use crate::operator::{ExchangeData, Operator, StreamElement}; use crate::scheduler::ExecutionMetadata; @@ -67,11 +66,6 @@ where } } -impl Sink for CollectChannelSink where - PreviousOperators: Operator -{ -} - #[cfg(test)] mod tests { use itertools::Itertools; diff --git a/src/operator/sink/collect_count.rs b/src/operator/sink/collect_count.rs index d102ec79..924b2a9d 100644 --- a/src/operator/sink/collect_count.rs +++ b/src/operator/sink/collect_count.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use crate::block::{BlockStructure, OperatorKind, OperatorStructure}; -use crate::operator::sink::{Sink, StreamOutputRef}; +use crate::operator::sink::StreamOutputRef; use crate::operator::{Operator, StreamElement}; use crate::scheduler::ExecutionMetadata; @@ -71,11 +71,6 @@ where } } -impl Sink for CollectCountSink where - PreviousOperators: Operator -{ -} - impl Clone for CollectCountSink where PreviousOperators: Operator, diff --git a/src/operator/sink/collect_vec.rs b/src/operator/sink/collect_vec.rs index 57249969..43139b67 100644 --- a/src/operator/sink/collect_vec.rs +++ b/src/operator/sink/collect_vec.rs @@ -1,7 +1,7 @@ use std::fmt::Display; use crate::block::{BlockStructure, OperatorKind, OperatorStructure}; -use crate::operator::sink::{Sink, StreamOutputRef}; +use crate::operator::sink::StreamOutputRef; use crate::operator::{ExchangeData, Operator, StreamElement}; use crate::scheduler::ExecutionMetadata; @@ -75,11 +75,6 @@ where } } -impl Sink for CollectVecSink where - PreviousOperators: Operator -{ -} - impl Clone for CollectVecSink where PreviousOperators: Operator, diff --git a/src/operator/sink/mod.rs b/src/operator/sink/mod.rs index 17da4763..ad469db0 100644 --- a/src/operator/sink/mod.rs +++ b/src/operator/sink/mod.rs @@ -5,30 +5,23 @@ use std::sync::{Arc, Mutex}; -use crate::operator::Operator; - pub(super) mod collect; pub(super) mod collect_channel; pub(super) mod collect_count; pub(super) mod collect_vec; pub(super) mod for_each; -/// This trait marks all the operators that can be used as sinks. -pub(crate) trait Sink: Operator {} - pub(crate) type StreamOutputRef = Arc>>; /// The result of a stream after the execution. /// /// This will eventually hold the value _after_ the environment has been fully executed. To access /// the content of the output you have to call [`StreamOutput::get`]. -pub struct StreamOutput { - result: StreamOutputRef, -} +pub struct StreamOutput(StreamOutputRef); impl From> for StreamOutput { fn from(value: StreamOutputRef) -> Self { - Self { result: value } + Self(value) } } @@ -38,9 +31,6 @@ impl StreamOutput { /// This will consume the result and return the owned content. If the content has already been /// extracted, or if the content is not ready yet, this will return `None`. pub fn get(self) -> Option { - self.result - .try_lock() - .expect("Cannot lock output result") - .take() + self.0.try_lock().expect("Cannot lock output result").take() } } diff --git a/src/operator/start/binary.rs b/src/operator/start/binary.rs index 39f2e9c4..bc47452e 100644 --- a/src/operator/start/binary.rs +++ b/src/operator/start/binary.rs @@ -324,16 +324,16 @@ impl StartReceiver for BinaryStartReceiv previous } - fn cached_replicas(&self) -> usize { - let mut cached = 0; - if self.left.cached { - cached += self.left.instances - } - if self.right.cached { - cached += self.right.instances - } - cached - } + // fn cached_replicas(&self) -> usize { + // let mut cached = 0; + // if self.left.cached { + // cached += self.left.instances + // } + // if self.right.cached { + // cached += self.right.instances + // } + // cached + // } fn recv_timeout( &mut self, diff --git a/src/operator/start/mod.rs b/src/operator/start/mod.rs index 67a2d7ab..8f0ee72b 100644 --- a/src/operator/start/mod.rs +++ b/src/operator/start/mod.rs @@ -31,9 +31,9 @@ pub(crate) trait StartReceiver: Clone { /// This list should contain all the replicas this receiver will receive data from. fn prev_replicas(&self) -> Vec; - /// The number of those replicas which are behind a cache, and therefore never will emit a - /// `StreamElement::Terminate` message. - fn cached_replicas(&self) -> usize; + // /// The number of those replicas which are behind a cache, and therefore never will emit a + // /// `StreamElement::Terminate` message. + // fn cached_replicas(&self) -> usize; /// Try to receive a batch from the previous blocks, or fail with an error if the timeout /// expires. diff --git a/src/operator/start/simple.rs b/src/operator/start/simple.rs index 0665870b..f4efa041 100644 --- a/src/operator/start/simple.rs +++ b/src/operator/start/simple.rs @@ -51,10 +51,6 @@ impl StartReceiver for SimpleStartReceiver { self.previous_replicas.clone() } - fn cached_replicas(&self) -> usize { - 0 - } - fn recv_timeout(&mut self, timeout: Duration) -> Result, RecvTimeoutError> { let receiver = self.receiver.as_mut().unwrap(); receiver.recv_timeout(timeout) diff --git a/src/operator/window/aggr/collect_vec.rs b/src/operator/window/aggr/collect_vec.rs index 8f3d7214..274aabf4 100644 --- a/src/operator/window/aggr/collect_vec.rs +++ b/src/operator/window/aggr/collect_vec.rs @@ -52,4 +52,13 @@ where }; self.add_window_operator("WindowMap", acc) } + + pub fn to_vec(self) -> KeyedStream)>> { + let acc = CollectVec::, _> { + vec: Default::default(), + f: |v| v, + _o: PhantomData, + }; + self.add_window_operator("WindowMap", acc) + } }