diff --git a/Cargo.lock b/Cargo.lock index 863f0f8..6002209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "ahash" version = "0.8.11" @@ -39,6 +45,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "anes" version = "0.1.6" @@ -93,6 +105,43 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "apache-avro" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb7c683b2f8f40970b70e39ff8be514c95b96fcb9c4af87e1ed2cb2e10801a0" +dependencies = [ + "apache-avro-derive", + "digest", + "lazy_static", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand", + "regex-lite", + "serde", + "serde_json", + "strum", + "strum_macros", + "thiserror", + "typed-builder", + "uuid", +] + +[[package]] +name = "apache-avro-derive" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e317e411016923787d14f6deb741c1a1d036e64c2785b079747c852f7fae5ca4" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.59", +] + [[package]] name = "autocfg" version = "1.2.0" @@ -241,7 +290,7 @@ version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.59", @@ -270,6 +319,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.12" @@ -279,6 +337,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.5.1" @@ -377,6 +444,46 @@ dependencies = [ "memchr", ] +[[package]] +name = "darling" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "syn 2.0.59", +] + +[[package]] +name = "darling_macro" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.59", +] + +[[package]] +name = "dary_heap" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" + [[package]] name = "dashmap" version = "5.5.3" @@ -417,6 +524,12 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "either" version = "1.11.0" @@ -490,6 +603,12 @@ dependencies = [ "spin", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures" version = "0.3.30" @@ -638,6 +757,16 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] + +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "heck" @@ -657,6 +786,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "indexmap" version = "2.2.6" @@ -748,6 +883,30 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libflate" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +dependencies = [ + "core2", + "hashbrown", + "rle-decode-fast", +] + [[package]] name = "libmimalloc-sys" version = "0.1.35" @@ -882,6 +1041,25 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.18" @@ -1049,6 +1227,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quad-rand" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" + [[package]] name = "quick_cache" version = "0.5.1" @@ -1161,6 +1345,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + [[package]] name = "regex-syntax" version = "0.8.3" @@ -1171,6 +1361,7 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" name = "renoir" version = "0.2.0" dependencies = [ + "apache-avro", "base64", "bincode", "clap", @@ -1179,6 +1370,7 @@ dependencies = [ "csv", "dashmap", "derivative", + "dyn-clone", "env_logger", "fake", "flume", @@ -1215,6 +1407,12 @@ dependencies = [ "wyhash", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -1234,6 +1432,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustversion" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" + [[package]] name = "ryu" version = "1.0.17" @@ -1379,6 +1583,25 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" + +[[package]] +name = "strum_macros" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.59", +] + [[package]] name = "syn" version = "1.0.109" @@ -1573,6 +1796,26 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "typed-builder" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.59", +] + [[package]] name = "typemap_rev" version = "0.3.0" @@ -1597,6 +1840,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +dependencies = [ + "serde", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 9656dd1..7deb8ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,8 @@ indexmap = "2.2.6" tracing = { version = "0.1.40", features = ["log"] } quick_cache = "0.5.1" dashmap = "5.5.3" +apache-avro = { version = "0.16.0", features = ["derive"] } +dyn-clone = "1.0.17" [dev-dependencies] # for the tests diff --git a/examples/avro_rw.rs b/examples/avro_rw.rs new file mode 100644 index 0000000..6add0db --- /dev/null +++ b/examples/avro_rw.rs @@ -0,0 +1,41 @@ +use std::path::PathBuf; + +use apache_avro::AvroSchema; +use clap::Parser; +use renoir::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Parser)] +struct Options { + #[clap(short,long)] + input: Option, + + #[clap(short,long)] + output: PathBuf, +} + +#[derive(Serialize, Deserialize, AvroSchema, Clone, Debug)] +struct InputType { + s: String, + num: u32, +} + +fn main() { + let (conf, args) = RuntimeConfig::from_args(); + let opts = Options::parse_from(args); + conf.spawn_remote_workers(); + + let ctx = StreamContext::new(conf.clone()); + + let source = if let Some(input) = opts.input { + ctx.stream_avro(input).into_boxed() + } else { + ctx.stream_iter((0..100).map(|i| InputType{ s: format!("{i:o}"), num: i })).into_boxed() + }; + + source.inspect(|e| eprintln!("{e:?}")) + .map(|mut e| { e.num *= 2; e }) + .write_avro(opts.output); + + ctx.execute_blocking(); +} \ No newline at end of file diff --git a/src/operator/boxed.rs b/src/operator/boxed.rs new file mode 100644 index 0000000..d7e46e7 --- /dev/null +++ b/src/operator/boxed.rs @@ -0,0 +1,103 @@ +use std::fmt::Display; + +use dyn_clone::DynClone; + +use crate::{ + block::BlockStructure, + operator::{Data, Operator, StreamElement}, + ExecutionMetadata, Stream, +}; + +pub(crate) trait DynOperator: DynClone { + type Out: Data; + /// Setup the operator chain. This is called before any call to `next` and it's used to + /// initialize the operator. When it's called the operator has already been cloned and it will + /// never be cloned again. Therefore it's safe to store replica-specific metadata inside of it. + /// + /// It's important that each operator (except the start of a chain) calls `.setup()` recursively + /// on the previous operators. + fn setup(&mut self, metadata: &mut ExecutionMetadata); + + /// Take a value from the previous operator, process it and return it. + fn next(&mut self) -> StreamElement; + + /// A more refined representation of the operator and its predecessors. + fn structure(&self) -> BlockStructure; +} + +dyn_clone::clone_trait_object!( DynOperator); + +impl DynOperator for Op +where + Op: Operator, + ::Out: Clone + Send + 'static, +{ + type Out = Op::Out; + + fn setup(&mut self, metadata: &mut ExecutionMetadata) { + self.setup(metadata) + } + + fn next(&mut self) -> StreamElement { + self.next() + } + + fn structure(&self) -> BlockStructure { + self.structure() + } +} + +pub struct BoxedOperator { + pub(crate) op: Box + 'static + Send>, +} + +impl Clone for BoxedOperator { + fn clone(&self) -> Self { + Self { + op: self.op.clone(), + } + } +} + +impl Display for BoxedOperator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BoxedOperator") + } +} + +impl BoxedOperator { + pub fn new + 'static>(op: Op) -> Self { + Self { + op: Box::new(op), + } + } +} + +impl Operator for BoxedOperator { + type Out = O; + + fn next(&mut self) -> StreamElement { + self.op.next() + } + + fn setup(&mut self, metadata: &mut ExecutionMetadata) { + self.op.setup(metadata) + } + + fn structure(&self) -> BlockStructure { + self.op.structure() + } +} + +impl Stream +where + Op: Operator + 'static, + Op::Out: Clone + Send + 'static, +{ + /// Erase operator type using dynamic dispatching. + /// + /// Use only when strictly necessary as it is decrimental for performance. + pub fn into_boxed(self) -> Stream> { + self.add_operator(|prev| BoxedOperator::new(prev)) + } +} diff --git a/src/operator/mod.rs b/src/operator/mod.rs index 88f0500..50296d0 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -6,6 +6,7 @@ use std::fmt::Display; use std::hash::Hash; use std::ops::{AddAssign, Div}; +use std::path::PathBuf; use flume::{unbounded, Receiver}; #[cfg(feature = "tokio")] @@ -25,6 +26,7 @@ use crate::{BatchMode, KeyedStream, Stream}; #[cfg(feature = "tokio")] use self::map_async::MapAsync; use self::map_memo::MapMemo; +use self::sink::avro::AvroSink; use self::sink::collect::Collect; use self::sink::collect_channel::CollectChannelSink; use self::sink::collect_count::CollectCountSink; @@ -86,6 +88,7 @@ pub mod source; mod start; pub mod window; mod zip; +mod boxed; /// Marker trait that all the types inside a stream should implement. pub trait Data: Clone + Send + 'static {} diff --git a/src/operator/sink/avro.rs b/src/operator/sink/avro.rs new file mode 100644 index 0000000..91350d3 --- /dev/null +++ b/src/operator/sink/avro.rs @@ -0,0 +1,166 @@ +use apache_avro::{AvroSchema, Schema, Writer}; +use serde::Serialize; +use std::fmt::Display; +use std::fs::File; +use std::io::BufWriter; +use std::marker::PhantomData; +use std::path::PathBuf; + +use crate::block::{BlockStructure, OperatorKind, OperatorStructure}; +use crate::operator::sink::StreamOutputRef; +use crate::operator::{ExchangeData, Operator, StreamElement}; +use crate::scheduler::ExecutionMetadata; +use crate::Stream; + +// #[derive(Debug)] +pub struct AvroSink +where + Op: Operator, +{ + prev: Op, + path: PathBuf, + /// Reader used to parse the CSV file. + writer: Option>, + schema: Schema, +} + +impl AvroSink +where + Op: Operator, + Op::Out: AvroSchema, +{ + pub fn new>(prev: Op, path: P) -> Self { + Self { + path: path.into(), + prev, + writer: None, + schema: Op::Out::get_schema(), + } + } +} + +impl Display for AvroSink +where + Op: Operator, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{} -> AvroSink<{}>", + self.prev, + std::any::type_name::() + ) + } +} + +impl Operator for AvroSink +where + Op: Operator, + Op::Out: AvroSchema + Serialize, +{ + type Out = (); + + fn setup(&mut self, metadata: &mut ExecutionMetadata) { + self.prev.setup(metadata); + + let file = File::options() + .read(true) + .write(true) + .create(true) + .open(&self.path) + .unwrap_or_else(|err| { + panic!( + "AvroSource: error while opening file {:?}: {:?}", + self.path, err + ) + }); + + + let buf_writer = BufWriter::new(file); + self.writer = Some(buf_writer); + } + + fn next(&mut self) -> StreamElement<()> { + let writer = self.writer.as_mut().unwrap(); + let mut w = Writer::new(&self.schema, writer); + loop { + match self.prev.next() { + StreamElement::Item(t) | StreamElement::Timestamped(t, _) => { + // w.extend_ser(values) + w.append_ser(t).expect("failed to write to avro"); + } + el => { + w.flush().unwrap(); + return el.map(|_| ()); + } + } + } + } + + fn structure(&self) -> BlockStructure { + let mut operator = OperatorStructure::new::("AvroSink"); + operator.kind = OperatorKind::Sink; + self.prev.structure().add_operator(operator) + } +} + +impl Clone for AvroSink +where + Op: Operator, +{ + fn clone(&self) -> Self { + panic!("AvroSink cannot be cloned, replication should be 1"); + } +} + +impl Stream where + Op: 'static, + Op::Out: AvroSchema + Serialize +{ + + /// Apply the given function to all the elements of the stream, consuming the stream. + /// + /// ## Example + /// + /// ``` + /// # use renoir::{StreamContext, RuntimeConfig}; + /// # use renoir::operator::source::IteratorSource; + /// # let mut env = StreamContext::new_local(); + /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); + /// s.for_each(|(key, n)| println!("Item: {} has key {}", n, key)); + /// + /// env.execute_blocking(); + /// ``` + pub fn write_avro>(self, path: P) + { + self.add_operator(|prev| AvroSink::new(prev, path)) + .finalize_block(); + } +} + +// #[cfg(test)] +// mod qtests { +// use std::AvroSinkions::HashSet; + +// use crate::config::RuntimeConfig; +// use crate::environment::StreamContext; +// use crate::operator::source; + +// #[test] +// fn AvroSink_vec() { +// let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); +// let source = source::IteratorSource::new(0..10u8); +// let res = env.stream(source).AvroSink::>(); +// env.execute_blocking(); +// assert_eq!(res.get().unwrap(), (0..10).AvroSink::>()); +// } + +// #[test] +// fn AvroSink_set() { +// let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); +// let source = source::IteratorSource::new(0..10u8); +// let res = env.stream(source).AvroSink::>(); +// env.execute_blocking(); +// assert_eq!(res.get().unwrap(), (0..10).AvroSink::>()); +// } +// } diff --git a/src/operator/sink/mod.rs b/src/operator/sink/mod.rs index ad469db..2db1c9d 100644 --- a/src/operator/sink/mod.rs +++ b/src/operator/sink/mod.rs @@ -10,6 +10,7 @@ pub(super) mod collect_channel; pub(super) mod collect_count; pub(super) mod collect_vec; pub(super) mod for_each; +pub(super) mod avro; pub(crate) type StreamOutputRef = Arc>>; diff --git a/src/operator/source/avro.rs b/src/operator/source/avro.rs new file mode 100644 index 0000000..3b87dd6 --- /dev/null +++ b/src/operator/source/avro.rs @@ -0,0 +1,229 @@ +use std::fmt::Display; +use std::fs::File; +use std::io; +use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; +use std::marker::PhantomData; +use std::path::PathBuf; + +use apache_avro::Reader; +use serde::Deserialize; + +use crate::block::{BlockStructure, OperatorKind, OperatorStructure, Replication}; +use crate::operator::source::Source; +use crate::operator::{Data, Operator, StreamElement}; +use crate::scheduler::ExecutionMetadata; +use crate::Stream; + +/// Source that reads and parses a CSV file. +/// +/// The file is divided in chunks and is read concurrently by multiple replicas. +pub struct AvroSource Deserialize<'a>> { + /// Path of the file. + path: PathBuf, + /// Reader used to parse the CSV file. + reader: Option>>, + /// Whether the reader has terminated its job. + terminated: bool, + _out: PhantomData, +} + +impl Deserialize<'a>> Display for AvroSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "AvroSource<{}>", std::any::type_name::()) + } +} + +impl Deserialize<'a>> AvroSource { + /// Create a new source that reads and parse the lines of a CSV file. + /// + /// The file is partitioned into as many chunks as replicas, each replica has to have the + /// **same** file in the same path. It is guaranteed that each line of the file is emitted by + /// exactly one replica. + /// + /// After creating the source it's possible to customize its behaviour using one of the + /// available methods. By default it is assumed that the delimiter is `,` and the CSV has + /// headers. + /// + /// Each line will be deserialized into the type `Out`, so the structure of the CSV must be + /// valid for that deserialization. The [`csv`](https://crates.io/crates/csv) crate is used for + /// the parsing. + /// + /// **Note**: the file must be readable and its size must be available. This means that only + /// regular files can be read. + /// + /// ## Example + /// + /// ``` + /// # use renoir::{StreamContext, RuntimeConfig}; + /// # use renoir::operator::source::AvroSource; + /// # use serde::{Deserialize, Serialize}; + /// # let mut env = StreamContext::new_local(); + /// #[derive(Clone, Deserialize, Serialize)] + /// struct Thing { + /// what: String, + /// count: u64, + /// } + /// let source = AvroSource::::new("/datasets/huge.csv"); + /// let s = env.stream(source); + /// ``` + pub fn new>(path: P) -> Self { + Self { + path: path.into(), + reader: None, + terminated: false, + _out: PhantomData, + } + } +} + +impl Deserialize<'a>> Source for AvroSource { + fn replication(&self) -> Replication { + Replication::One + } +} + +impl Deserialize<'a>> Operator for AvroSource { + type Out = Out; + + fn setup(&mut self, metadata: &mut ExecutionMetadata) { + // let global_id = metadata.global_id; + // let instances = metadata.replicas.len(); + + let file = File::options() + .read(true) + .write(false) + .open(&self.path) + .unwrap_or_else(|err| { + panic!( + "AvroSource: error while opening file {:?}: {:?}", + self.path, err + ) + }); + + let buf_reader = BufReader::new(file); + let reader = Reader::new(buf_reader).expect("failed to create avro reader"); + + self.reader = Some(reader); + } + + fn next(&mut self) -> StreamElement { + if self.terminated { + return StreamElement::Terminate; + } + let reader = self + .reader + .as_mut() + .expect("AvroSource was not initialized"); + + match reader.next() { + Some(Ok(el)) => { + tracing::trace!("avro Value: {el:?}"); + StreamElement::Item(apache_avro::from_value(&el).expect("could not deserialize from avro Value to specified type")) + } + Some(Err(e)) => panic!("Error while reading Aveo file: {:?}", e), + None => { + self.terminated = true; + StreamElement::FlushAndRestart + } + } + } + + fn structure(&self) -> BlockStructure { + let mut operator = OperatorStructure::new::("AvroSource"); + operator.kind = OperatorKind::Source; + BlockStructure::default().add_operator(operator) + } +} + +impl Deserialize<'a>> Clone for AvroSource { + fn clone(&self) -> Self { + assert!( + self.reader.is_none(), + "AvroSource must be cloned before calling setup" + ); + Self { + path: self.path.clone(), + reader: None, + terminated: false, + _out: PhantomData, + } + } +} + +impl crate::StreamContext { + /// Convenience method, creates a `AvroSource` and makes a stream using `StreamContext::stream` + pub fn stream_avro Deserialize<'a>>( + &self, + path: impl Into, + ) -> Stream> { + let source = AvroSource::new(path); + self.stream(source) + } +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use itertools::Itertools; + use serde::{Deserialize, Serialize}; + use tempfile::NamedTempFile; + + use crate::config::RuntimeConfig; + use crate::environment::StreamContext; + // use crate::operator::source::AvroSource; + + // #[test] + // fn csv_without_headers() { + // for num_records in 0..100 { + // for terminator in &["\n", "\r\n"] { + // let file = NamedTempFile::new().unwrap(); + // for i in 0..num_records { + // write!(file.as_file(), "{},{}{}", i, i + 1, terminator).unwrap(); + // } + + // let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); + // let source = AvroSource::<(i32, i32)>::new(file.path()).has_headers(false); + // let res = env.stream(source).shuffle().collect_vec(); + // env.execute_blocking(); + + // let mut res = res.get().unwrap(); + // res.sort_unstable(); + // assert_eq!(res, (0..num_records).map(|x| (x, x + 1)).collect_vec()); + // } + // } + // } + + // #[test] + // fn csv_with_headers() { + // #[derive(Clone, Serialize, Deserialize)] + // struct T { + // a: i32, + // b: i32, + // } + + // for num_records in 0..100 { + // for terminator in &["\n", "\r\n"] { + // let file = NamedTempFile::new().unwrap(); + // write!(file.as_file(), "a,b{terminator}").unwrap(); + // for i in 0..num_records { + // write!(file.as_file(), "{},{}{}", i, i + 1, terminator).unwrap(); + // } + + // let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); + // let source = AvroSource::::new(file.path()); + // let res = env.stream(source).shuffle().collect_vec(); + // env.execute_blocking(); + + // let res = res + // .get() + // .unwrap() + // .into_iter() + // .map(|x| (x.a, x.b)) + // .sorted() + // .collect_vec(); + // assert_eq!(res, (0..num_records).map(|x| (x, x + 1)).collect_vec()); + // } + // } + // } +} diff --git a/src/operator/source/mod.rs b/src/operator/source/mod.rs index ff95b2f..32e12e9 100644 --- a/src/operator/source/mod.rs +++ b/src/operator/source/mod.rs @@ -7,6 +7,7 @@ pub use channel::*; pub use file::*; pub use iterator::*; pub use parallel_iterator::*; +pub use avro::*; use crate::{block::Replication, operator::Operator}; @@ -17,6 +18,7 @@ mod csv; mod file; mod iterator; mod parallel_iterator; +mod avro; /// This trait marks all the operators that can be used as sinks. pub trait Source: Operator { diff --git a/src/operator/window/aggr/columnar.rs b/src/operator/window/aggr/columnar.rs new file mode 100644 index 0000000..e4a58e2 --- /dev/null +++ b/src/operator/window/aggr/columnar.rs @@ -0,0 +1,60 @@ +use arrow::array::ArrayBuilder; + +/// TODO +use super::super::*; +use crate::operator::{Data, DataKey, Operator}; +use crate::stream::{KeyedStream, WindowedStream}; + +#[derive(Clone)] +struct Columnar +where + O: Extend + Default, +{ + array_builder: ArrayBuilder, + _i: PhantomData, +} + + +impl WindowAccumulator for Columnar +where + O:, + I: Clone + Send + 'static, + O: Extend + Default + Clone+ Send + 'static, +{ + type In = I; + + type Out = O; + + #[inline] + fn process(&mut self, el: Self::In) { + self.collection.extend(std::iter::once(el)); + } + + #[inline] + fn output(self) -> Self::Out { + self.collection + } + + #[inline] + fn size_hint(&mut self, size: usize) { + + } +} + +impl WindowedStream +where + WindowDescr: WindowDescription, + OperatorChain: Operator + 'static, + Key: DataKey, + Out: Data + Ord, +{ + pub fn to_arrow + Default + Clone + Send + 'static>( + self, + ) -> KeyedStream> { + let acc = Columnar:: { + collection: Default::default(), + _i: PhantomData, + }; + self.add_window_operator("WindowCollect", acc) + } +} diff --git a/src/operator/window/aggr/mod.rs b/src/operator/window/aggr/mod.rs index 1793745..99f6a4a 100644 --- a/src/operator/window/aggr/mod.rs +++ b/src/operator/window/aggr/mod.rs @@ -1,7 +1,9 @@ mod fold; +// mod columnar; pub(super) use fold::{Fold, FoldFirst}; mod collect_vec; +// mod collect; mod count; mod join; mod max;