Skip to content

Commit

Permalink
Fixing the streaming join example
Browse files Browse the repository at this point in the history
  • Loading branch information
ameyc committed Nov 6, 2024
1 parent 6bc2bbd commit 367a306
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions examples/examples/stream_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ async fn main() -> Result<()> {

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;
let ctx = Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1"))
.await;

let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic_builder = topic_builder
Expand All @@ -29,7 +32,7 @@ async fn main() -> Result<()> {
.clone()
.with_topic(String::from("temperature"))
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;
Expand All @@ -40,30 +43,38 @@ async fn main() -> Result<()> {
.clone()
.with_topic(String::from("humidity"))
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?,
)
.await?;
.await?
.with_column("humidity_sensor", col("sensor_name"))?
.drop_columns(&["sensor_name"])?
.window(
vec![col("humidity_sensor")],
vec![avg(col("reading")).alias("avg_humidity")],
Duration::from_millis(1_000),
None,
)?
.with_column("humidity_window_start_time", col("window_start_time"))?
.with_column("humidity_window_end_time", col("window_end_time"))?
.drop_columns(&["window_start_time", "window_end_time"])?;

let joined_ds = ctx
.from_topic(temperature_topic)
.await?
.window(
vec![col("sensor_name")],
vec![avg(col("reading")).alias("avg_temperature")],
Duration::from_millis(1_000),
None,
)?
.join(
humidity_ds,
JoinType::Inner,
&["sensor_name"],
&["sensor_name"],
None,
)?
.window(
vec![],
vec![
avg(col("temperature.reading")).alias("avg_temperature"),
avg(col("humidity.reading")).alias("avg_humidity"),
],
Duration::from_millis(1_000),
&["sensor_name", "window_start_time"],
&["humidity_sensor", "humidity_window_start_time"],
None,
)?;

Expand Down

0 comments on commit 367a306

Please sign in to comment.