From e1234dd88413278f830290b50fd546356d08bd19 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 12 Aug 2024 10:40:04 -0700 Subject: [PATCH] change streaming_window -> window --- crates/core/src/datastream.rs | 5 ++++- examples/examples/kafka_rideshare.rs | 2 +- examples/examples/simple_aggregation.rs | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 1ff117b..4ce9451 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -19,6 +19,9 @@ pub struct DataStream { } impl DataStream { + pub fn schema(&self) -> Result { + } + pub fn filter(&self, predicate: Expr) -> Result { let (session_state, plan) = self.df.as_ref().clone().into_parts(); @@ -33,7 +36,7 @@ impl DataStream { // drop_columns, sync, columns: &[&str] // count - pub fn streaming_window( + pub fn window( &self, group_expr: Vec, aggr_expr: Vec, diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index eb858f0..e5e1bba 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -69,7 +69,7 @@ async fn main() -> Result<()> { ])) .await?; - let ds = ctx.from_topic(source_topic).await?.streaming_window( + let ds = ctx.from_topic(source_topic).await?.window( vec![], vec![ max(col("imu_measurement").field("gps").field("speed")), diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 7d8736a..0ba7f33 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -33,7 +33,7 @@ async fn main() -> Result<()> { let ds = ctx .from_topic(source_topic) .await? - .streaming_window( + .window( vec![], vec![ min(col("temperature")).alias("min"),