From 90685819382a855721bd7e9fc60ecedb495a2763 Mon Sep 17 00:00:00 2001 From: imDema Date: Wed, 3 Jul 2024 18:04:26 +0200 Subject: [PATCH] Add cache docs --- src/operator/cache.rs | 31 ++++++++++++------------------- src/operator/mod.rs | 27 +++++++++++++++------------ 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/src/operator/cache.rs b/src/operator/cache.rs index 190da11..5dff436 100644 --- a/src/operator/cache.rs +++ b/src/operator/cache.rs @@ -123,35 +123,28 @@ impl StreamCache { } } + /// Returns a copy of the [RuntimeConfig] of the [StreamContext] this + /// cache was created in. pub fn config(&self) -> Arc { self.config.clone() } + /// Returns the data cached on this node by cloning the inner cache. pub fn inner_cloned(&self) -> HashMap> { let cache = self.data.lock(); assert!(cache.is_complete(), "Reading cache before it was complete. execution from a cached stream must start after the previous StreamContext has completed!"); cache.data.clone() } - // // for debuggin purposes - // pub fn read(&self) -> Vec { - // let cache_lock = self.cache_reference.lock(); - // assert!( - // cache_lock.is_some(), - // "Can't read the cache before the execution" - // ); - // cache_lock - // .clone() - // .unwrap() - // .into_iter() - // .sorted_by(|a, b| Ord::cmp(&a.0, &b.0)) - // .flat_map(|(_, values)| values.into_iter()) - // .fold(Vec::new(), |mut acc, values| { - // acc.push(values); - // acc - // }) - // } - + /// Consume the cache creating a new [Stream] in a [StreamContext]. + /// + /// The [StreamCache] will behave as a source with the same parallelism (and distribution of data) + /// as the original [Stream] it was cached from. + /// + /// + **ATTENTION** ⚠️: The new [StreamContext] must have the same [RuntimeConfig] as the + /// one in which the cache was created. + /// + **ATTENTION** ⚠️: The cache can be resumed **only after** the execution of its origin + /// `StreamContext` has terminated. pub fn stream_in(self, ctx: &StreamContext) -> Stream> { assert_eq!( self.config, diff --git a/src/operator/mod.rs b/src/operator/mod.rs index d8fef74..1693cee 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -2077,14 +2077,14 @@ where StreamOutput::from(output) } - /// Closes the stream, preserving all resulting items in a [`StreamCache`]. This enables the - /// stream to be resumed from the last processed element. + /// Collect the output of the stream to a [StreamCache] that can later be resumed to + /// create a [Stream] with its content. Returns the cache and consumes the stream. /// - /// **Note**: To resume the stream later, ensure that the stream is executed using - /// [`StreamEnvironment::interactive_execute_blocking`]. Failing to do so will consume the environment. - /// - /// **Note**: Calling [`StreamCache::read`] on the returned cache before the execution of the stream - /// will result in a panic. + /// To resume the cache, create a new [StreamContext](crate::StreamContext) with the **same** + /// [RuntimeConfig](crate::RuntimeConfig) and call the [StreamCache::stream_in] method. + /// + /// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext` + /// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic! /// /// ## Example /// @@ -2117,12 +2117,15 @@ where StreamCache::new(config, replication, output) } - /// Create a checkpoint in a stream by caching all the elements in a [`StreamCache`]. - /// The function returns a tuple containing the cache and the stream on which operations - /// following the checkpoint can be applied. - /// - /// **Note**: This operator will split the current block. + /// Collect the output of the stream to a [StreamCache] that can later be resumed to + /// create a [Stream] with its content. Returns the cache and a copy of the current stream. /// + /// To resume the cache, create a new [StreamContext](crate::StreamContext) with the **same** + /// [RuntimeConfig](crate::RuntimeConfig) and call the [StreamCache::stream_in] method. + /// + /// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext` + /// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic! + /// ## Example /// ``` /// # use renoir::prelude::*;