Skip to content

Commit

Permalink
Add writer sink and csv sink
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jun 12, 2024
1 parent 161dfc0 commit c863fdd
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 12 deletions.
36 changes: 36 additions & 0 deletions examples/csv_write_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use renoir::prelude::*;

fn main() {
tracing_subscriber::fmt::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let conf = RuntimeConfig::local(4).unwrap();

let ctx = StreamContext::new(conf.clone());

let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path().to_path_buf();
eprintln!("Writing to {}", dir_path.display());

// Write to multiple files in parallel
let mut path = dir_path.clone();
ctx.stream_par_iter(0..100)
.map(|i| (i, format!("{i:08x}")))
.write_csv(
move |i| {
path.push(format!("{i:03}.csv"));
path
},
false,
);

ctx.execute_blocking();

let ctx = StreamContext::new(conf);
let mut path = dir_path;
path.push("001.csv");
ctx.stream_csv::<(i32, String)>(path)
.for_each(|t| println!("{t:?}"));

ctx.execute_blocking();
}
5 changes: 4 additions & 1 deletion src/operator/add_timestamps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ where
StreamElement::FlushAndRestart
| StreamElement::FlushBatch
| StreamElement::Terminate => elem,
_ => panic!("AddTimestamp received invalid variant: {}", elem.variant()),
_ => panic!(
"AddTimestamp received invalid variant: {}",
elem.variant_str()
),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ where

fn next(&mut self) -> StreamElement<()> {
let message = self.prev.next();
let to_return = message.take();
let to_return = message.variant();
match &message {
// Broadcast messages
StreamElement::Watermark(_)
Expand Down
2 changes: 1 addition & 1 deletion src/operator/iteration/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl<Out: ExchangeData, State: ExchangeData> Iterate<Out, State> {
StreamElement::FlushAndRestart => {}
m => unreachable!(
"Iterate received invalid message from IterationLeader: {}",
m.variant()
m.variant_str()
),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/operator/iteration/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ where
StreamElement::FlushAndRestart | StreamElement::FlushBatch => {}
update => unreachable!(
"IterationLeader received an invalid message: {}",
update.variant()
update.variant_str()
),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/operator/iteration/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
StreamElement::FlushAndRestart => {}
m => unreachable!(
"Iterate received invalid message from IterationLeader: {}",
m.variant()
m.variant_str()
),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/operator/key_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mod tests {
assert_eq!(a, i);
assert_eq!(b, i);
}
item => panic!("Expected StreamElement::Item, got {}", item.variant()),
item => panic!("Expected StreamElement::Item, got {}", item.variant_str()),
}
}
assert_eq!(key_by.next(), StreamElement::Terminate);
Expand Down
4 changes: 2 additions & 2 deletions src/operator/keyed_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ mod tests {
let item = keyed_fold.next();
match item {
StreamElement::Item(x) => res.push(x),
other => panic!("Expecting StreamElement::Item, got {}", other.variant()),
other => panic!("Expecting StreamElement::Item, got {}", other.variant_str()),
}
}

Expand Down Expand Up @@ -235,7 +235,7 @@ mod tests {
StreamElement::Timestamped(x, ts) => res.push((x, ts)),
other => panic!(
"Expecting StreamElement::Timestamped, got {}",
other.variant()
other.variant_str()
),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub trait Operator: Clone + Send + Display {
impl<Out> StreamElement<Out> {
/// Create a new `StreamElement` with an `Item(())` if `self` contains an item, otherwise it
/// returns the same variant of `self`.
pub fn take(&self) -> StreamElement<()> {
pub fn variant(&self) -> StreamElement<()> {
match self {
StreamElement::Item(_) => StreamElement::Item(()),
StreamElement::Timestamped(_, _) => StreamElement::Item(()),
Expand Down Expand Up @@ -219,7 +219,7 @@ impl<Out> StreamElement<Out> {
}

/// A string representation of the variant of this `StreamElement`.
pub fn variant(&self) -> &'static str {
pub fn variant_str(&self) -> &'static str {
match self {
StreamElement::Item(_) => "Item",
StreamElement::Timestamped(_, _) => "Timestamped",
Expand Down
2 changes: 1 addition & 1 deletion src/operator/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ where
"RoutingEnd still has routes to be setup!"
);
let message = self.prev.next();
let to_return = message.take();
let to_return = message.variant();
match &message {
// Broadcast messages
StreamElement::Watermark(_)
Expand Down
127 changes: 127 additions & 0 deletions src/operator/sink/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use serde::Serialize;
use std::fs::File;
use std::io::BufWriter;
use std::marker::PhantomData;
use std::path::PathBuf;

use crate::block::NextStrategy;
use crate::operator::{ExchangeData, Operator};
use crate::scheduler::ExecutionMetadata;
use crate::{CoordUInt, Replication, Stream};

use super::writer::{WriteOperator, WriterOperator};

// #[derive(Debug)]
pub struct CsvWriteOp<T, N> {
_t: PhantomData<T>,
make_path: Option<N>,
append: bool,
path: Option<PathBuf>,
/// Reader used to parse the CSV file.
writer: Option<csv::Writer<BufWriter<File>>>,
}

impl<T, N> CsvWriteOp<T, N>
where
T: Serialize + Send,
N: FnOnce(CoordUInt) -> PathBuf + Clone + Send + 'static,
{
pub fn new(make_path: N, append: bool) -> Self {
Self {
_t: PhantomData,
make_path: Some(make_path),
append,
path: None,
writer: None,
}
}
}

impl<T, N> WriteOperator<T> for CsvWriteOp<T, N>
where
T: Serialize + Send,
N: FnOnce(CoordUInt) -> PathBuf + Clone + Send + 'static,
{
fn setup(&mut self, metadata: &ExecutionMetadata) {
let id = metadata.global_id;
self.path = Some(self.make_path.take().unwrap()(id));

tracing::debug!("Write csv to path {:?}", self.path.as_ref().unwrap());
let file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(!self.append)
.append(self.append)
.open(self.path.as_ref().unwrap())
.unwrap_or_else(|err| {
panic!(
"CsvSource: error while opening file {:?}: {:?}",
self.path, err
)
});
let file_len = file.metadata().unwrap().len();

let buf_writer = BufWriter::new(file);
let csv_writer = csv::WriterBuilder::default()
.has_headers(file_len == 0)
.from_writer(buf_writer);

self.writer = Some(csv_writer);
}

fn write(&mut self, item: T) {
self.writer.as_mut().unwrap().serialize(item).unwrap();
}

fn flush(&mut self) {
self.writer.as_mut().unwrap().flush().ok();
}

fn finalize(&mut self) {
self.writer.take();
}
}

impl<T, N> Clone for CsvWriteOp<T, N>
where
N: Clone,
{
fn clone(&self) -> Self {
Self {
_t: PhantomData,
make_path: self.make_path.clone(),
append: self.append,
path: None,
writer: None,
}
}
}

impl<Op: Operator> Stream<Op>
where
Op: 'static,
Op::Out: Serialize + ExchangeData,
{
pub fn write_csv<F: FnOnce(CoordUInt) -> PathBuf + Clone + Send + 'static>(
self,
make_path: F,
append: bool,
) {
self.add_operator(|prev| {
let writer = CsvWriteOp::new(make_path, append);
WriterOperator { prev, writer }
})
.finalize_block();
}

pub fn write_csv_one<P: Into<PathBuf>>(self, path: P, append: bool) {
let path = path.into();
self.repartition(Replication::One, NextStrategy::only_one())
.add_operator(|prev| {
let writer = CsvWriteOp::new(move |_| path, append);
WriterOperator { prev, writer }
})
.finalize_block();
}
}
2 changes: 2 additions & 0 deletions src/operator/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub(super) mod collect;
pub(super) mod collect_channel;
pub(super) mod collect_count;
pub(super) mod collect_vec;
pub(super) mod csv;
pub(super) mod for_each;
pub(super) mod writer;

pub(crate) type StreamOutputRef<Out> = Arc<Mutex<Option<Out>>>;

Expand Down
70 changes: 70 additions & 0 deletions src/operator/sink/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use serde::Serialize;
use std::fmt::Display;

use crate::{
operator::{Operator, StreamElement},
structure::{OperatorKind, OperatorStructure},
ExecutionMetadata,
};

pub trait WriteOperator<T: Serialize>: Clone + Send {
fn setup(&mut self, metadata: &ExecutionMetadata);
fn write(&mut self, item: T);
fn flush(&mut self);
fn finalize(&mut self);
}

#[derive(Debug, Clone)]
pub struct WriterOperator<W, Op> {
pub(super) prev: Op,
pub(super) writer: W,
}

impl<W, Op: Operator> Display for WriterOperator<W, Op> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} -> WriterSink<{}, {}>",
self.prev,
std::any::type_name::<W>(),
std::any::type_name::<Op::Out>()
)
}
}

impl<W, Op> Operator for WriterOperator<W, Op>
where
Op: Operator,
Op::Out: Serialize,
W: WriteOperator<Op::Out>,
{
type Out = ();

fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.prev.setup(metadata);
self.writer.setup(metadata);
}

fn next(&mut self) -> StreamElement<()> {
let el = self.prev.next();
let ret = el.variant();
match el {
StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
self.writer.write(item);
}
StreamElement::Watermark(_) => {}
StreamElement::FlushBatch | StreamElement::FlushAndRestart => self.writer.flush(),
StreamElement::Terminate => self.writer.finalize(),
}
ret
}

fn structure(&self) -> crate::structure::BlockStructure {
let mut operator = OperatorStructure::new::<Op::Out, _>(format!(
"WriterSink<{}>",
std::any::type_name::<W>()
));
operator.kind = OperatorKind::Sink;
self.prev.structure().add_operator(operator)
}
}
1 change: 0 additions & 1 deletion src/operator/window/aggr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ mod fold;
pub(super) use fold::{Fold, FoldFirst};

mod collect_vec;
// mod collect;
mod count;
mod join;
mod max;
Expand Down

0 comments on commit c863fdd

Please sign in to comment.