diff --git a/examples/avro_rw.rs b/examples/avro_rw.rs index f63fe7c..3a4cb92 100644 --- a/examples/avro_rw.rs +++ b/examples/avro_rw.rs @@ -41,11 +41,12 @@ fn main() { source .inspect(|e| eprintln!("{e:?}")) + .repartition_by(Replication::Unlimited, |x| (x.num % 5).into()) .map(|mut e| { e.num *= 2; e }) - .write_avro(opts.output); + .write_avro_seq(opts.output); ctx.execute_blocking(); } diff --git a/examples/csv_write_read.rs b/examples/csv_write_read.rs index 5f5beb6..fc0938d 100644 --- a/examples/csv_write_read.rs +++ b/examples/csv_write_read.rs @@ -1,9 +1,6 @@ use renoir::prelude::*; fn main() { - tracing_subscriber::fmt::fmt() - .with_max_level(tracing::Level::DEBUG) - .init(); let conf = RuntimeConfig::local(4).unwrap(); let ctx = StreamContext::new(conf.clone()); @@ -13,22 +10,16 @@ fn main() { eprintln!("Writing to {}", dir_path.display()); // Write to multiple files in parallel - let mut path = dir_path.clone(); + let path = dir_path.clone(); ctx.stream_par_iter(0..100) .map(|i| (i, format!("{i:08x}"))) - .write_csv( - move |i| { - path.push(format!("{i:03}.csv")); - path - }, - false, - ); + .write_csv_seq(path, false); ctx.execute_blocking(); let ctx = StreamContext::new(conf); let mut path = dir_path; - path.push("001.csv"); + path.push("0001.csv"); ctx.stream_csv::<(i32, String)>(path) .for_each(|t| println!("{t:?}")); diff --git a/src/operator/sink/avro.rs b/src/operator/sink/avro.rs index edb68f7..1e76d87 100644 --- a/src/operator/sink/avro.rs +++ b/src/operator/sink/avro.rs @@ -1,113 +1,87 @@ use apache_avro::{AvroSchema, Schema, Writer}; use serde::Serialize; -use std::fmt::Display; use std::fs::File; -use std::io::BufWriter; +use std::io::{BufWriter, Write}; +use std::marker::PhantomData; use std::path::PathBuf; -use crate::block::{BlockStructure, OperatorKind, OperatorStructure}; -use crate::operator::{Operator, StreamElement}; +use crate::block::NextStrategy; +use crate::operator::{ExchangeData, Operator}; use crate::scheduler::ExecutionMetadata; -use crate::Stream; +use crate::{CoordUInt, Replication, Stream}; + +use super::writer::{sequential_path, WriteOperator, WriterOperator}; // #[derive(Debug)] -pub struct AvroSink -where - Op: Operator, -{ - prev: Op, - path: PathBuf, - /// Reader used to parse the CSV file. +pub struct AvroSink { + _t: PhantomData, writer: Option>, schema: Schema, } -impl AvroSink +impl AvroSink where - Op: Operator, - Op::Out: AvroSchema, + T: AvroSchema + Serialize, { - pub fn new>(prev: Op, path: P) -> Self { + pub fn new() -> Self { Self { - path: path.into(), - prev, + _t: PhantomData, writer: None, - schema: Op::Out::get_schema(), + schema: T::get_schema(), } } } -impl Display for AvroSink +impl WriteOperator for AvroSink where - Op: Operator, + T: AvroSchema + Serialize + Send, { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{} -> AvroSink<{}>", - self.prev, - std::any::type_name::() - ) + type Destination = PathBuf; + + fn write(&mut self, items: &mut impl Iterator) { + let w = self.writer.as_mut().unwrap(); + let mut w = Writer::new(&self.schema, w); + for item in items { + w.append_ser(item).expect("failed to write to avro"); + } + w.flush().unwrap(); } -} -impl Operator for AvroSink -where - Op: Operator, - Op::Out: AvroSchema + Serialize, -{ - type Out = (); + fn flush(&mut self) { /* already flushes in write */ + } - fn setup(&mut self, metadata: &mut ExecutionMetadata) { - self.prev.setup(metadata); + fn finalize(&mut self) { + if let Some(mut w) = self.writer.take() { + w.flush().unwrap(); + } + } + fn setup(&mut self, destination: Self::Destination) { let file = File::options() .read(true) .write(true) .create(true) .truncate(true) - .open(&self.path) + .open(&destination) .unwrap_or_else(|err| { panic!( "AvroSource: error while opening file {:?}: {:?}", - self.path, err + destination, err ) }); let buf_writer = BufWriter::new(file); self.writer = Some(buf_writer); } - - fn next(&mut self) -> StreamElement<()> { - let writer = self.writer.as_mut().unwrap(); - let mut w = Writer::new(&self.schema, writer); - loop { - match self.prev.next() { - StreamElement::Item(t) | StreamElement::Timestamped(t, _) => { - // w.extend_ser(values) - w.append_ser(t).expect("failed to write to avro"); - } - el => { - w.flush().unwrap(); - return el.map(|_| ()); - } - } - } - } - - fn structure(&self) -> BlockStructure { - let mut operator = OperatorStructure::new::("AvroSink"); - operator.kind = OperatorKind::Sink; - self.prev.structure().add_operator(operator) - } } -impl Clone for AvroSink -where - Op: Operator, -{ +impl Clone for AvroSink { fn clone(&self) -> Self { - panic!("AvroSink cannot be cloned, replication should be 1"); + Self { + _t: PhantomData, + writer: None, + schema: self.schema.clone(), + } } } @@ -116,21 +90,51 @@ where Op: 'static, Op::Out: AvroSchema + Serialize, { - /// Apply the given function to all the elements of the stream, consuming the stream. + pub fn write_avro PathBuf + Clone + Send + 'static>( + self, + make_path: F, + ) { + let make_destination = |metadata: &ExecutionMetadata| (make_path)(metadata.global_id); + + self.add_operator(|prev| { + let writer = AvroSink::new(); + WriterOperator::new(prev, writer, make_destination) + }) + .finalize_block(); + } + + /// Write output to avro files. A avro is created for each replica of the current block. + /// A file with a numerical suffix is created according to the path passed as parameter. /// - /// ## Example + /// + If the input is a directory numbered files will be created as output. + /// + If the input is a file name the basename will be kept as prefix and numbers will + /// be added as suffix while keeping the same extension for the output. /// - /// ``` - /// # use renoir::{StreamContext, RuntimeConfig}; - /// # use renoir::operator::source::IteratorSource; - /// # let mut env = StreamContext::new_local(); - /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); - /// s.for_each(|(key, n)| println!("Item: {} has key {}", n, key)); + /// ## Example /// - /// env.execute_blocking(); - /// ``` - pub fn write_avro>(self, path: P) { - self.add_operator(|prev| AvroSink::new(prev, path)) + /// + `template_path`: `/data/renoir/output.avro` -> `/data/renoir/output0000.avro`, /data/renoir/output0001.avro` ... + /// + `template_path`: `/data/renoir/` -> `/data/renoir/0000.avro`, /data/renoir/0001.avro` ... + pub fn write_avro_seq(self, template_path: PathBuf) { + self.add_operator(|prev| { + let writer = AvroSink::new(); + WriterOperator::new(prev, writer, |m| sequential_path(template_path, m)) + }) + .finalize_block(); + } +} + +impl Stream +where + Op: 'static, + Op::Out: AvroSchema + ExchangeData, +{ + pub fn write_avro_one>(self, path: P) { + let path = path.into(); + self.repartition(Replication::One, NextStrategy::only_one()) + .add_operator(|prev| { + let writer = AvroSink::new(); + WriterOperator::new(prev, writer, move |_| path) + }) .finalize_block(); } } diff --git a/src/operator/sink/csv.rs b/src/operator/sink/csv.rs index 59532db..8e3f477 100644 --- a/src/operator/sink/csv.rs +++ b/src/operator/sink/csv.rs @@ -9,27 +9,24 @@ use crate::operator::{ExchangeData, Operator}; use crate::scheduler::ExecutionMetadata; use crate::{CoordUInt, Replication, Stream}; -use super::writer::{WriteOperator, WriterOperator}; +use super::writer::{sequential_path, WriteOperator, WriterOperator}; // #[derive(Debug)] -pub struct CsvWriteOp { +pub struct CsvWriteOp { _t: PhantomData, - make_path: Option, append: bool, path: Option, /// Reader used to parse the CSV file. writer: Option>>, } -impl CsvWriteOp +impl CsvWriteOp where T: Serialize + Send, - N: FnOnce(CoordUInt) -> PathBuf + Clone + Send + 'static, { - pub fn new(make_path: N, append: bool) -> Self { + pub fn new(append: bool) -> Self { Self { _t: PhantomData, - make_path: Some(make_path), append, path: None, writer: None, @@ -37,14 +34,14 @@ where } } -impl WriteOperator for CsvWriteOp +impl WriteOperator for CsvWriteOp where T: Serialize + Send, - N: FnOnce(CoordUInt) -> PathBuf + Clone + Send + 'static, { - fn setup(&mut self, metadata: &ExecutionMetadata) { - let id = metadata.global_id; - self.path = Some(self.make_path.take().unwrap()(id)); + type Destination = PathBuf; + + fn setup(&mut self, destination: PathBuf) { + self.path = Some(destination); tracing::debug!("Write csv to path {:?}", self.path.as_ref().unwrap()); let file = File::options() @@ -70,8 +67,10 @@ where self.writer = Some(csv_writer); } - fn write(&mut self, item: T) { - self.writer.as_mut().unwrap().serialize(item).unwrap(); + fn write(&mut self, items: &mut impl Iterator) { + for item in items { + self.writer.as_mut().unwrap().serialize(item).unwrap(); + } } fn flush(&mut self) { @@ -83,14 +82,10 @@ where } } -impl Clone for CsvWriteOp -where - N: Clone, -{ +impl Clone for CsvWriteOp { fn clone(&self) -> Self { Self { _t: PhantomData, - make_path: self.make_path.clone(), append: self.append, path: None, writer: None, @@ -101,26 +96,53 @@ where impl Stream where Op: 'static, - Op::Out: Serialize + ExchangeData, + Op::Out: Serialize, { pub fn write_csv PathBuf + Clone + Send + 'static>( self, make_path: F, append: bool, ) { + let make_destination = |metadata: &ExecutionMetadata| (make_path)(metadata.global_id); + self.add_operator(|prev| { - let writer = CsvWriteOp::new(make_path, append); - WriterOperator { prev, writer } + let writer = CsvWriteOp::new(append); + WriterOperator::new(prev, writer, make_destination) }) .finalize_block(); } + /// Write output to CSV files. A CSV is created for each replica of the current block. + /// A file with a numerical suffix is created according to the path passed as parameter. + /// + /// + If the input is a directory numbered files will be created as output. + /// + If the input is a file name the basename will be kept as prefix and numbers will + /// be added as suffix while keeping the same extension for the output. + /// + /// ## Example + /// + /// + `template_path`: `/data/renoir/output.csv` -> `/data/renoir/output0000.csv`, /data/renoir/output0001.csv` ... + /// + `template_path`: `/data/renoir/` -> `/data/renoir/0000.csv`, /data/renoir/0001.csv` ... + pub fn write_csv_seq(self, template_path: PathBuf, append: bool) { + self.add_operator(|prev| { + let writer = CsvWriteOp::new(append); + WriterOperator::new(prev, writer, |m| sequential_path(template_path, m)) + }) + .finalize_block(); + } +} + +impl Stream +where + Op: 'static, + Op::Out: ExchangeData, +{ pub fn write_csv_one>(self, path: P, append: bool) { let path = path.into(); self.repartition(Replication::One, NextStrategy::only_one()) .add_operator(|prev| { - let writer = CsvWriteOp::new(move |_| path, append); - WriterOperator { prev, writer } + let writer = CsvWriteOp::new(append); + WriterOperator::new(prev, writer, move |_| path) }) .finalize_block(); } diff --git a/src/operator/sink/writer.rs b/src/operator/sink/writer.rs index 736a988..22d80a9 100644 --- a/src/operator/sink/writer.rs +++ b/src/operator/sink/writer.rs @@ -1,5 +1,5 @@ use serde::Serialize; -use std::fmt::Display; +use std::{ffi::OsString, fmt::Display, path::PathBuf}; use crate::{ operator::{Operator, StreamElement}, @@ -8,19 +8,37 @@ use crate::{ }; pub trait WriteOperator: Clone + Send { - fn setup(&mut self, metadata: &ExecutionMetadata); - fn write(&mut self, item: T); + type Destination; + fn setup(&mut self, destination: Self::Destination); + fn write(&mut self, items: &mut impl Iterator); fn flush(&mut self); fn finalize(&mut self); } #[derive(Debug, Clone)] -pub struct WriterOperator { - pub(super) prev: Op, - pub(super) writer: W, +pub struct WriterOperator { + prev: Op, + writer: W, + make_destination: Option, } -impl Display for WriterOperator { +impl WriterOperator +where + Op: Operator, + Op::Out: Serialize, + W: WriteOperator, + D: FnOnce(&ExecutionMetadata) -> W::Destination + Send + Clone, +{ + pub fn new(prev: Op, writer: W, make_destination: D) -> Self { + Self { + prev, + writer, + make_destination: Some(make_destination), + } + } +} + +impl Display for WriterOperator { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, @@ -32,31 +50,53 @@ impl Display for WriterOperator { } } -impl Operator for WriterOperator +impl Operator for WriterOperator where Op: Operator, Op::Out: Serialize, W: WriteOperator, + D: FnOnce(&ExecutionMetadata) -> W::Destination + Send + Clone, { type Out = (); fn setup(&mut self, metadata: &mut ExecutionMetadata) { self.prev.setup(metadata); - self.writer.setup(metadata); + self.writer + .setup((self.make_destination.take().unwrap())(metadata)); } fn next(&mut self) -> StreamElement<()> { - let el = self.prev.next(); - let ret = el.variant(); - match el { - StreamElement::Item(item) | StreamElement::Timestamped(item, _) => { - self.writer.write(item); + struct SequenceIterator<'a, Op: Operator> { + prev: &'a mut Op, + last: Option>, + } + + impl Iterator for SequenceIterator<'_, Op> { + type Item = Op::Out; + + fn next(&mut self) -> Option { + match self.prev.next() { + StreamElement::Item(item) | StreamElement::Timestamped(item, _) => Some(item), + other => { + self.last = Some(other); + None + } + } } - StreamElement::Watermark(_) => {} + } + + let mut s = SequenceIterator { + prev: &mut self.prev, + last: None, + }; + self.writer.write(&mut s); + let result = s.last.take().unwrap().variant(); + match &result { StreamElement::FlushBatch | StreamElement::FlushAndRestart => self.writer.flush(), StreamElement::Terminate => self.writer.finalize(), + _ => {} } - ret + result } fn structure(&self) -> crate::structure::BlockStructure { @@ -68,3 +108,24 @@ where self.prev.structure().add_operator(operator) } } + +pub fn sequential_path(base: PathBuf, metadata: &ExecutionMetadata) -> PathBuf { + let mut path = base; + let id = metadata.global_id; + if path.is_dir() { + path.push(format!("{id:04}")); + } else { + let file_name = path.file_stem().unwrap_or_default(); + let ext = path.extension(); + let mut name = OsString::new(); + name.push(file_name); + name.push(format!("{id:04}")); + if let Some(ext) = ext { + name.push("."); + name.push(ext); + } + path.pop(); + path.push(name); + } + path +} diff --git a/tests/utils.rs b/tests/utils.rs index 24ce941..3bc88cd 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -10,6 +10,7 @@ use std::time::Duration; use itertools::{process_results, Itertools}; +use rand::{thread_rng, Rng}; use renoir::config::{ConfigBuilder, HostConfig, RuntimeConfig}; use renoir::operator::{Data, Operator, StreamElement, Timestamp}; use renoir::structure::BlockStructure; @@ -150,7 +151,8 @@ impl TestHelper { Self::setup(); let mut hosts = vec![]; for host_id in 0..num_hosts { - let test_id = TEST_INDEX.fetch_add(1, Ordering::SeqCst) + 1; + let test_id: u16 = thread_rng().gen(); //TEST_INDEX.fetch_add(1, Ordering::SeqCst) + 1; + let high_part = (test_id & 0xff00) >> 8; let low_part = test_id & 0xff; let address = format!("127.{high_part}.{low_part}.{host_id}");