Skip to content

Commit

Permalink
Add cache docs
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jul 3, 2024
1 parent 085dcc5 commit 9068581
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 31 deletions.
31 changes: 12 additions & 19 deletions src/operator/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,35 +123,28 @@ impl<I: Data + Send> StreamCache<I> {
}
}

/// Returns a copy of the [RuntimeConfig] of the [StreamContext] this
/// cache was created in.
pub fn config(&self) -> Arc<RuntimeConfig> {
self.config.clone()
}

/// Returns the data cached on this node by cloning the inner cache.
pub fn inner_cloned(&self) -> HashMap<CoordUInt, Vec<I>> {
let cache = self.data.lock();
assert!(cache.is_complete(), "Reading cache before it was complete. execution from a cached stream must start after the previous StreamContext has completed!");
cache.data.clone()
}

// // for debuggin purposes
// pub fn read(&self) -> Vec<I> {
// let cache_lock = self.cache_reference.lock();
// assert!(
// cache_lock.is_some(),
// "Can't read the cache before the execution"
// );
// cache_lock
// .clone()
// .unwrap()
// .into_iter()
// .sorted_by(|a, b| Ord::cmp(&a.0, &b.0))
// .flat_map(|(_, values)| values.into_iter())
// .fold(Vec::new(), |mut acc, values| {
// acc.push(values);
// acc
// })
// }

/// Consume the cache creating a new [Stream] in a [StreamContext].
///
/// The [StreamCache] will behave as a source with the same parallelism (and distribution of data)
/// as the original [Stream] it was cached from.
///
/// + **ATTENTION** ⚠️: The new [StreamContext] must have the same [RuntimeConfig] as the
/// one in which the cache was created.
/// + **ATTENTION** ⚠️: The cache can be resumed **only after** the execution of its origin
/// `StreamContext` has terminated.
pub fn stream_in(self, ctx: &StreamContext) -> Stream<CacheSource<I>> {
assert_eq!(
self.config,
Expand Down
27 changes: 15 additions & 12 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2077,14 +2077,14 @@ where
StreamOutput::from(output)
}

/// Closes the stream, preserving all resulting items in a [`StreamCache`]. This enables the
/// stream to be resumed from the last processed element.
/// Collect the output of the stream to a [StreamCache] that can later be resumed to
/// create a [Stream] with its content. Returns the cache and consumes the stream.
///
/// **Note**: To resume the stream later, ensure that the stream is executed using
/// [`StreamEnvironment::interactive_execute_blocking`]. Failing to do so will consume the environment.
///
/// **Note**: Calling [`StreamCache::read`] on the returned cache before the execution of the stream
/// will result in a panic.
/// To resume the cache, create a new [StreamContext](crate::StreamContext) with the **same**
/// [RuntimeConfig](crate::RuntimeConfig) and call the [StreamCache::stream_in] method.
///
/// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext`
/// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic!
///
/// ## Example
///
Expand Down Expand Up @@ -2117,12 +2117,15 @@ where
StreamCache::new(config, replication, output)
}

/// Create a checkpoint in a stream by caching all the elements in a [`StreamCache`].
/// The function returns a tuple containing the cache and the stream on which operations
/// following the checkpoint can be applied.
///
/// **Note**: This operator will split the current block.
/// Collect the output of the stream to a [StreamCache] that can later be resumed to
/// create a [Stream] with its content. Returns the cache and a copy of the current stream.
///
/// To resume the cache, create a new [StreamContext](crate::StreamContext) with the **same**
/// [RuntimeConfig](crate::RuntimeConfig) and call the [StreamCache::stream_in] method.
///
/// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext`
/// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic!
/// ## Example
/// ```
/// # use renoir::prelude::*;
Expand Down

0 comments on commit 9068581

Please sign in to comment.