From 367a3060a7e7cc7aa3e747cbba4f5a5529f68485 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Tue, 5 Nov 2024 17:42:24 -0800 Subject: [PATCH] Fixing the streaming join example --- examples/examples/stream_join.rs | 41 ++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index ac7dae4..5c6c672 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -17,7 +17,10 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()?; + let ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1")) + .await; + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic_builder = topic_builder @@ -29,7 +32,7 @@ async fn main() -> Result<()> { .clone() .with_topic(String::from("temperature")) .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?; @@ -40,30 +43,38 @@ async fn main() -> Result<()> { .clone() .with_topic(String::from("humidity")) .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?, ) - .await?; + .await? + .with_column("humidity_sensor", col("sensor_name"))? + .drop_columns(&["sensor_name"])? + .window( + vec![col("humidity_sensor")], + vec![avg(col("reading")).alias("avg_humidity")], + Duration::from_millis(1_000), + None, + )? + .with_column("humidity_window_start_time", col("window_start_time"))? + .with_column("humidity_window_end_time", col("window_end_time"))? + .drop_columns(&["window_start_time", "window_end_time"])?; let joined_ds = ctx .from_topic(temperature_topic) .await? + .window( + vec![col("sensor_name")], + vec![avg(col("reading")).alias("avg_temperature")], + Duration::from_millis(1_000), + None, + )? .join( humidity_ds, JoinType::Inner, - &["sensor_name"], - &["sensor_name"], - None, - )? - .window( - vec![], - vec![ - avg(col("temperature.reading")).alias("avg_temperature"), - avg(col("humidity.reading")).alias("avg_humidity"), - ], - Duration::from_millis(1_000), + &["sensor_name", "window_start_time"], + &["humidity_sensor", "humidity_window_start_time"], None, )?;