From abc637eead9a5fd4550ca8567d4689faef149acf Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 12 Aug 2024 14:41:32 -0700 Subject: [PATCH] update examples --- .../core/src/datasource/kafka/kafka_config.rs | 2 +- crates/core/src/datastream.rs | 114 ++++++++++- examples/examples/emit_measurements.rs | 41 ++-- examples/examples/simple_aggregation.rs | 40 ++-- examples/examples/stream_join.rs | 181 +++++++----------- examples/src/lib.rs | 17 ++ 6 files changed, 234 insertions(+), 161 deletions(-) create mode 100644 examples/src/lib.rs diff --git a/crates/core/src/datasource/kafka/kafka_config.rs b/crates/core/src/datasource/kafka/kafka_config.rs index bd9f728..1da2633 100644 --- a/crates/core/src/datasource/kafka/kafka_config.rs +++ b/crates/core/src/datasource/kafka/kafka_config.rs @@ -85,7 +85,7 @@ impl KafkaWriteConfig { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct KafkaTopicBuilder { bootstrap_servers: String, topic: Option, diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 8c31342..fe2e2ba 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -1,3 +1,4 @@ +use datafusion::logical_expr::LogicalPlan; use futures::StreamExt; use std::{sync::Arc, time::Duration}; @@ -5,7 +6,9 @@ use datafusion::common::{DFSchema, DataFusionError, Result}; pub use datafusion::dataframe::DataFrame; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::execution::SendableRecordBatchStream; -use datafusion::logical_expr::{logical_plan::LogicalPlanBuilder, Expr}; +use datafusion::logical_expr::{ + logical_plan::LogicalPlanBuilder, utils::find_window_exprs, Expr, JoinType, +}; use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; @@ -18,12 +21,60 @@ pub struct DataStream { pub(crate) context: Arc, } +pub trait Joinable { + fn get_plan(self) -> LogicalPlan; +} +impl Joinable for DataFrame { + fn get_plan(self) -> LogicalPlan { + let (_, plan) = self.into_parts(); + plan + } +} +impl Joinable for DataStream { + fn get_plan(self) -> LogicalPlan { + let (_, plan) = self.df.as_ref().clone().into_parts(); + plan + } +} + impl DataStream { + /// Return the schema of DataFrame that backs the DataStream pub fn schema(&self) -> &DFSchema { self.df.schema() } - pub fn filter(&self, predicate: Expr) -> Result { + /// Prints the schema of the underlying dataframe + /// Useful for debugging chained method calls. + pub fn print_schema(self) -> Result { + println!("{}", self.df.schema()); + Ok(self) + } + + /// Prints the underlying logical_plan. + /// Useful for debugging chained method calls. + pub fn print_plan(self) -> Result { + println!("{}", self.df.logical_plan().display_indent()); + Ok(self) + } + + pub fn select(self, expr_list: Vec) -> Result { + let (session_state, plan) = self.df.as_ref().clone().into_parts(); + + let window_func_exprs = find_window_exprs(&expr_list); + let plan = if window_func_exprs.is_empty() { + plan + } else { + LogicalPlanBuilder::window_plan(plan, window_func_exprs)? + }; + let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?; + + Ok(Self { + df: Arc::new(DataFrame::new(session_state, project_plan)), + context: self.context.clone(), + }) + } + + pub fn filter(self, predicate: Expr) -> Result { let (session_state, plan) = self.df.as_ref().clone().into_parts(); let plan = LogicalPlanBuilder::from(plan).filter(predicate)?.build()?; @@ -36,9 +87,53 @@ impl DataStream { // drop_columns, sync, columns: &[&str] // count + pub fn join_on( + self, + right: impl Joinable, + join_type: JoinType, + on_exprs: impl IntoIterator, + ) -> Result { + let (session_state, plan) = self.df.as_ref().clone().into_parts(); + let right_plan = right.get_plan(); + + let plan = LogicalPlanBuilder::from(plan) + .join_on(right_plan, join_type, on_exprs)? + .build()?; + + Ok(Self { + df: Arc::new(DataFrame::new(session_state, plan)), + context: self.context.clone(), + }) + } + + pub fn join( + self, + right: impl Joinable, + join_type: JoinType, + left_cols: &[&str], + right_cols: &[&str], + filter: Option, + ) -> Result { + let (session_state, plan) = self.df.as_ref().clone().into_parts(); + let right_plan = right.get_plan(); + + let plan = LogicalPlanBuilder::from(plan) + .join( + right_plan, + join_type, + (left_cols.to_vec(), right_cols.to_vec()), + filter, + )? + .build()?; + + Ok(Self { + df: Arc::new(DataFrame::new(session_state, plan)), + context: self.context.clone(), + }) + } pub fn window( - &self, + self, group_expr: Vec, aggr_expr: Vec, window_length: Duration, @@ -55,19 +150,18 @@ impl DataStream { }) } - pub async fn print_stream(&self) -> Result<(), DataFusionError> { + pub async fn print_stream(self) -> Result<(), DataFusionError> { let mut stream: SendableRecordBatchStream = self.df.as_ref().clone().execute_stream().await?; loop { match stream.next().await.transpose() { Ok(Some(batch)) => { - if batch.num_rows() > 0 { + for i in 0..batch.num_rows() { + let row = batch.slice(i, 1); println!( "{}", - datafusion::common::arrow::util::pretty::pretty_format_batches(&[ - batch - ]) - .unwrap() + datafusion::common::arrow::util::pretty::pretty_format_batches(&[row]) + .unwrap() ); } } @@ -83,7 +177,7 @@ impl DataStream { } pub async fn write_table( - &self, + self, bootstrap_servers: String, topic: String, ) -> Result<(), DataFusionError> { diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 311bbcd..4718da2 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -1,35 +1,52 @@ use datafusion::error::Result; +use rand::seq::SliceRandom; use rdkafka::producer::FutureProducer; -use serde::{Deserialize, Serialize}; use std::time::{SystemTime, UNIX_EPOCH}; use rdkafka::config::ClientConfig; use rdkafka::producer::FutureRecord; use rdkafka::util::Timeout; -#[derive(Serialize, Deserialize)] -pub struct Measurment { - occurred_at_ms: u64, - temperature: f64, -} +use df_streams_examples::Measurment; /// docker run -p 9092:9092 --name kafka apache/kafka #[tokio::main] async fn main() -> Result<()> { + let mut rng = rand::thread_rng(); + let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", String::from("localhost:9092")) .set("message.timeout.ms", "60000") .create() .expect("Producer creation error"); - let topic = "temperature".to_string(); + let sensors = ["sensor_0", "sensor_1", "sensor_2", "sensor_3", "sensor_4"]; loop { - let msg = serde_json::to_vec(&Measurment { - occurred_at_ms: get_timestamp_ms(), - temperature: rand::random::() * 115.0, - }) - .unwrap(); + let sensor_name = sensors.choose(&mut rng).unwrap().to_string(); + + // Alternate between sending random temperature and humidity readings + let (topic, msg) = if rand::random::() < 0.4 { + ( + "temperature".to_string(), + serde_json::to_vec(&Measurment { + occurred_at_ms: get_timestamp_ms(), + sensor_name, + reading: rand::random::() * 115.0, + }) + .unwrap(), + ) + } else { + ( + "humidity".to_string(), + serde_json::to_vec(&Measurment { + occurred_at_ms: get_timestamp_ms(), + sensor_name, + reading: rand::random::(), + }) + .unwrap(), + ) + }; producer .send( diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 0ba7f33..82d9ecf 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -2,50 +2,48 @@ use std::time::Duration; use datafusion::error::Result; use datafusion::functions_aggregate::average::avg; -use datafusion::logical_expr::lit; +use datafusion::functions_aggregate::count::count; +// use datafusion::logical_expr::lit; use datafusion::logical_expr::{col, max, min}; use df_streams_core::context::Context; use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; use df_streams_core::physical_plan::utils::time::TimestampUnit; +use df_streams_examples::get_sample_json; + #[tokio::main] async fn main() -> Result<()> { - let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#; + let sample_event = get_sample_json(); let bootstrap_servers = String::from("localhost:9092"); let ctx = Context::new()?; - let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic = topic_builder .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis) .with_encoding("json")? .with_topic(String::from("temperature")) - .infer_schema_from_json(sample_event)? + .infer_schema_from_json(sample_event.as_str())? .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?; - let ds = ctx - .from_topic(source_topic) - .await? - .window( - vec![], - vec![ - min(col("temperature")).alias("min"), - max(col("temperature")).alias("max"), - avg(col("temperature")).alias("average"), - ], - Duration::from_millis(1_000), // 5 second window - None, - )? - .filter(col("max").gt(lit(114)))?; - - println!("{}", ds.df.logical_plan().display_indent()); + let ds = ctx.from_topic(source_topic).await?.window( + vec![], + vec![ + count(col("reading")).alias("count"), + min(col("reading")).alias("min"), + max(col("reading")).alias("max"), + avg(col("reading")).alias("average"), + ], + Duration::from_millis(1_000), + None, + )?; + // .filter(col("max").gt(lit(114)))?; ds.clone().print_stream().await?; diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index 223f26c..d96715e 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -1,126 +1,73 @@ -#![allow(dead_code)] -#![allow(unused_variables)] -#![allow(unused_imports)] +use std::time::Duration; +use datafusion::common::JoinType; use datafusion::error::Result; -use datafusion::functions::core::expr_ext::FieldAccessor; -use datafusion::functions_aggregate::count::count; -use datafusion::logical_expr::{col, max, min}; +use datafusion::functions_aggregate::average::avg; +use datafusion::logical_expr::col; use df_streams_core::context::Context; use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; use df_streams_core::physical_plan::utils::time::TimestampUnit; -use std::time::Duration; -use tracing_subscriber::{fmt::format::FmtSpan, FmtSubscriber}; +use df_streams_examples::get_sample_json; + +#[tokio::main] +async fn main() -> Result<()> { + let sample_event = get_sample_json(); + + let bootstrap_servers = String::from("localhost:9092"); + + let ctx = Context::new()?; + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); + + let source_topic_builder = topic_builder + .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis) + .with_encoding("json")? + .infer_schema_from_json(sample_event.as_str())?; + + let temperature_topic = source_topic_builder + .clone() + .with_topic(String::from("temperature")) + .build_reader(ConnectionOpts::from([ + ("auto.offset.reset".to_string(), "earliest".to_string()), + ("group.id".to_string(), "sample_pipeline".to_string()), + ])) + .await?; + + let humidity_ds = ctx + .from_topic( + source_topic_builder + .clone() + .with_topic(String::from("humidity")) + .build_reader(ConnectionOpts::from([ + ("auto.offset.reset".to_string(), "earliest".to_string()), + ("group.id".to_string(), "sample_pipeline".to_string()), + ])) + .await?, + ) + .await?; + + let joined_ds = ctx + .from_topic(temperature_topic) + .await? + .join( + humidity_ds, + JoinType::Inner, + &["sensor_name"], + &["sensor_name"], + None, + )? + .window( + vec![], + vec![ + avg(col("temperature.reading")).alias("avg_temperature"), + avg(col("humidity.reading")).alias("avg_humidity"), + ], + Duration::from_millis(1_000), + None, + )?; + + joined_ds.clone().print_stream().await?; -#[tokio::main(flavor = "multi_thread")] -async fn main() { - // tracing_log::LogTracer::init().expect("Failed to set up log tracer"); - // - // let subscriber = FmtSubscriber::builder() - // .with_max_level(tracing::Level::INFO) - // .with_span_events(FmtSpan::CLOSE | FmtSpan::ENTER) - // .finish(); - // tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); - // - // let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092"); - // - // // Configure Kafka source for IMU data - // let imu_stream = create_kafka_source( - // &r#"{ - // "driver_id": "690c119e-63c9-479b-b822-872ee7d89165", - // "occurred_at_ms": 1715201766763, - // "imu_measurement": { - // "timestamp": "2024-05-08T20:56:06.763260Z", - // "accelerometer": { - // "x": 1.4187794, - // "y": -0.13967037, - // "z": 0.5483732 - // }, - // "gyroscope": { - // "x": 0.005840948, - // "y": 0.0035944171, - // "z": 0.0041645765 - // }, - // "gps": { - // "latitude": 72.3492587464122, - // "longitude": 144.85596244550095, - // "altitude": 2.9088259, - // "speed": 57.96137 - // } - // }, - // "meta": { - // "nonsense": "MMMMMMMMMM" - // } - // }"# - // .to_string(), - // bootstrap_servers.clone(), - // "driver-imu-data".to_string(), - // "kafka_rideshare".to_string(), - // ); - // - // // Configure Kafka source for Trip data - // let trip_stream = create_kafka_source( - // &r#"{ - // "event_name": "TRIP_START", - // "trip_id": "b005922a-4ba5-4678-b0e6-bcb5ca2abe3e", - // "driver_id": "788fb395-96d0-4bc8-8ed9-bcf4e11e7543", - // "occurred_at_ms": 1718752555452, - // "meta": { - // "nonsense": "MMMMMMMMMM" - // } - // }"# - // .to_string(), - // bootstrap_servers.clone(), - // "trips".to_string(), - // "kafka_rideshare".to_string(), - // ); - // - // let mut config = ConfigOptions::default(); - // let _ = config.set("datafusion.execution.batch_size", "32"); - // - // // Create the context object with a source from kafka - // let ctx = SessionContext::new_with_config(config.into()); - // - // let imu_stream_plan = LogicalPlanBuilder::scan_with_filters( - // "imu_data", - // provider_as_source(Arc::new(imu_stream)), - // None, - // vec![], - // ) - // .unwrap() - // .build() - // .unwrap(); - // - // let logical_plan = LogicalPlanBuilder::scan_with_filters( - // "trips", - // provider_as_source(Arc::new(trip_stream)), - // None, - // vec![], - // ) - // .unwrap() - // .join_on( - // imu_stream_plan, - // JoinType::Left, - // vec![col("trips.driver_id").eq(col("imu_data.driver_id"))], - // ) - // .unwrap() - // .build() - // .unwrap(); - // - // let df = DataFrame::new(ctx.state(), logical_plan); - // let windowed_df = df - // .clone() - // .select(vec![ - // col("trips.trip_id"), - // col("trips.driver_id"), - // col("trips.event_name"), - // col("imu_measurement").field("gps").field("speed"), - // ]) - // .unwrap(); - // - // let writer = PrettyPrinter::new().unwrap(); - // let sink = Box::new(writer) as Box; - // //let _ = windowed_df.sink(sink).await; + Ok(()) } diff --git a/examples/src/lib.rs b/examples/src/lib.rs new file mode 100644 index 0000000..430b5d2 --- /dev/null +++ b/examples/src/lib.rs @@ -0,0 +1,17 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct Measurment { + pub occurred_at_ms: u64, + pub sensor_name: String, + pub reading: f64, +} + +pub fn get_sample_json() -> String { + serde_json::to_string(&Measurment { + occurred_at_ms: 100, + sensor_name: "foo".to_string(), + reading: 0., + }) + .unwrap() +}