diff --git a/Cargo.lock b/Cargo.lock index 6aac1f0..9720ae0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,6 +82,55 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" + +[[package]] +name = "anstyle-parse" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +dependencies = [ + "anstyle", + "windows-sys", +] + [[package]] name = "apache-avro" version = "0.16.0" @@ -606,6 +655,12 @@ dependencies = [ "libloading", ] +[[package]] +name = "colorchoice" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" + [[package]] name = "comfy-table" version = "7.1.1" @@ -1111,7 +1166,9 @@ dependencies = [ "arrow-schema", "datafusion", "df-streams-core", + "env_logger", "futures", + "log", "rand", "rdkafka", "serde", @@ -1147,6 +1204,29 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1453,6 +1533,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.12.1" @@ -2866,6 +2952,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.10.0" diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index fe2e2ba..5e2159b 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -15,48 +15,18 @@ use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; use crate::logical_plan::StreamingLogicalPlanBuilder; use crate::physical_plan::utils::time::TimestampUnit; +/// The primary interface for building a streaming job +/// +/// Wraps the DataFusion DataFrame and context objects and provides methods +/// for constructing and executing streaming pipelines. #[derive(Clone)] pub struct DataStream { pub df: Arc, pub(crate) context: Arc, } -pub trait Joinable { - fn get_plan(self) -> LogicalPlan; -} -impl Joinable for DataFrame { - fn get_plan(self) -> LogicalPlan { - let (_, plan) = self.into_parts(); - plan - } -} -impl Joinable for DataStream { - fn get_plan(self) -> LogicalPlan { - let (_, plan) = self.df.as_ref().clone().into_parts(); - plan - } -} - impl DataStream { - /// Return the schema of DataFrame that backs the DataStream - pub fn schema(&self) -> &DFSchema { - self.df.schema() - } - - /// Prints the schema of the underlying dataframe - /// Useful for debugging chained method calls. - pub fn print_schema(self) -> Result { - println!("{}", self.df.schema()); - Ok(self) - } - - /// Prints the underlying logical_plan. - /// Useful for debugging chained method calls. - pub fn print_plan(self) -> Result { - println!("{}", self.df.logical_plan().display_indent()); - Ok(self) - } - + // Select columns in the output stream pub fn select(self, expr_list: Vec) -> Result { let (session_state, plan) = self.df.as_ref().clone().into_parts(); @@ -74,6 +44,7 @@ impl DataStream { }) } + // Apply a filter pub fn filter(self, predicate: Expr) -> Result { let (session_state, plan) = self.df.as_ref().clone().into_parts(); @@ -85,8 +56,7 @@ impl DataStream { }) } - // drop_columns, sync, columns: &[&str] - // count + // Join two streams using the specified expression pub fn join_on( self, right: impl Joinable, @@ -106,6 +76,8 @@ impl DataStream { }) } + // Join two streams together using explicitly specified columns + // Also supports joining a DataStream with a DataFrame object pub fn join( self, right: impl Joinable, @@ -132,6 +104,7 @@ impl DataStream { }) } + /// create a streaming window pub fn window( self, group_expr: Vec, @@ -150,6 +123,8 @@ impl DataStream { }) } + /// execute the stream and print the results to stdout. + /// Mainly used for development and debugging pub async fn print_stream(self) -> Result<(), DataFusionError> { let mut stream: SendableRecordBatchStream = self.df.as_ref().clone().execute_stream().await?; @@ -176,7 +151,27 @@ impl DataStream { } } - pub async fn write_table( + /// Return the schema of DataFrame that backs the DataStream + pub fn schema(&self) -> &DFSchema { + self.df.schema() + } + + /// Prints the schema of the underlying dataframe + /// Useful for debugging chained method calls. + pub fn print_schema(self) -> Result { + println!("{}", self.df.schema()); + Ok(self) + } + + /// Prints the underlying logical_plan. + /// Useful for debugging chained method calls. + pub fn print_plan(self) -> Result { + println!("{}", self.df.logical_plan().display_indent()); + Ok(self) + } + + /// execute the stream and write the results to a give kafka topic + pub async fn sink_kafka( self, bootstrap_servers: String, topic: String, @@ -206,3 +201,21 @@ impl DataStream { Ok(()) } } + +/// Trait that allows both DataStream and DataFrame objects to be joined to +/// the current DataStream +pub trait Joinable { + fn get_plan(self) -> LogicalPlan; +} +impl Joinable for DataFrame { + fn get_plan(self) -> LogicalPlan { + let (_, plan) = self.into_parts(); + plan + } +} +impl Joinable for DataStream { + fn get_plan(self) -> LogicalPlan { + let (_, plan) = self.df.as_ref().clone().into_parts(); + plan + } +} diff --git a/crates/core/src/logical_plan/mod.rs b/crates/core/src/logical_plan/mod.rs index 1325e80..234192a 100644 --- a/crates/core/src/logical_plan/mod.rs +++ b/crates/core/src/logical_plan/mod.rs @@ -12,6 +12,7 @@ use datafusion::logical_expr::{Aggregate, Expr}; pub mod streaming_window; use streaming_window::{StreamingWindowPlanNode, StreamingWindowSchema, StreamingWindowType}; +/// Extend the DataFusion logical plan builder with streaming specific functionality pub trait StreamingLogicalPlanBuilder { fn streaming_window( self, @@ -24,7 +25,7 @@ pub trait StreamingLogicalPlanBuilder { // Extend the LogicalPlanBuilder with functions to add streaming operators to the plan impl StreamingLogicalPlanBuilder for LogicalPlanBuilder { - /// Apply franz window functions to extend the schema + /// Apply a streaming window functions to extend the schema fn streaming_window( self, group_expr: impl IntoIterator>, diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 2ed7ae9..551a0c2 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -22,3 +22,5 @@ tempfile = { version = "3" } rdkafka = { workspace = true } rand = "0.8.5" tokio-postgres = { version = "0.7.11", features = ["with-serde_json-1"] } +log = { workspace = true } +env_logger = "0.11.5" diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index e5e1bba..35310d9 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -82,7 +82,7 @@ async fn main() -> Result<()> { // ds.clone().print_stream().await?; - ds.write_table(bootstrap_servers.clone(), String::from("out_topic")) + ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic")) .await?; Ok(()) diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 82d9ecf..cf14fc3 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -3,7 +3,7 @@ use std::time::Duration; use datafusion::error::Result; use datafusion::functions_aggregate::average::avg; use datafusion::functions_aggregate::count::count; -// use datafusion::logical_expr::lit; +use datafusion::logical_expr::lit; use datafusion::logical_expr::{col, max, min}; use df_streams_core::context::Context; @@ -14,6 +14,10 @@ use df_streams_examples::get_sample_json; #[tokio::main] async fn main() -> Result<()> { + env_logger::builder() + .filter_level(log::LevelFilter::Warn) + .init(); + let sample_event = get_sample_json(); let bootstrap_servers = String::from("localhost:9092"); @@ -32,18 +36,22 @@ async fn main() -> Result<()> { ])) .await?; - let ds = ctx.from_topic(source_topic).await?.window( - vec![], - vec![ - count(col("reading")).alias("count"), - min(col("reading")).alias("min"), - max(col("reading")).alias("max"), - avg(col("reading")).alias("average"), - ], - Duration::from_millis(1_000), - None, - )?; - // .filter(col("max").gt(lit(114)))?; + let ds = ctx + .from_topic(source_topic) + .await? + // .filter(col("reading").gt(lit(70)))? + .window( + vec![], + vec![ + count(col("reading")).alias("count"), + min(col("reading")).alias("min"), + max(col("reading")).alias("max"), + avg(col("reading")).alias("average"), + ], + Duration::from_millis(1_000), + None, + )? + .filter(col("max").lt(lit(113)))?; ds.clone().print_stream().await?;