From 7c74ca5ff0abbd023fd3007d5a0c876d397a5bac Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 13 Aug 2024 22:29:02 -0700 Subject: [PATCH] disable coalesce_batches --- crates/core/src/context.rs | 18 +++++++++++++----- crates/core/src/datastream.rs | 18 +++++++++++++++++- examples/examples/emit_measurements.rs | 2 +- examples/examples/simple_aggregation.rs | 3 +-- 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index a261ea7..d48df98 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -16,15 +16,23 @@ use crate::utils::get_default_optimizer_rules; #[derive(Clone)] pub struct Context { - session_conext: Arc>, + pub session_conext: Arc>, } impl Context { pub fn new() -> Result { - let config = SessionConfig::new().set( - "datafusion.execution.batch_size", - datafusion::common::ScalarValue::UInt64(Some(32)), - ); + let config = SessionConfig::new() + .set( + "datafusion.execution.batch_size", + datafusion::common::ScalarValue::UInt64(Some(32)), + ) + // coalesce_batches slows down the pipeline and increases latency as it tries to concat + // small batches together so we disable it. + .set( + "datafusion.execution.coalesce_batches", + datafusion::common::ScalarValue::Boolean(Some(false)), + ); + let runtime = Arc::new(RuntimeEnv::default()); let state = SessionStateBuilder::new() diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 5e2159b..af877ea 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -9,6 +9,7 @@ use datafusion::execution::SendableRecordBatchStream; use datafusion::logical_expr::{ logical_plan::LogicalPlanBuilder, utils::find_window_exprs, Expr, JoinType, }; +use datafusion::physical_plan::display::DisplayableExecutionPlan; use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; @@ -163,13 +164,28 @@ impl DataStream { Ok(self) } - /// Prints the underlying logical_plan. + /// Prints the underlying logical plan. /// Useful for debugging chained method calls. pub fn print_plan(self) -> Result { println!("{}", self.df.logical_plan().display_indent()); Ok(self) } + /// Prints the underlying physical plan. + /// Useful for debugging and development + pub async fn print_physical_plan(self) -> Result { + let (session_state, plan) = self.df.as_ref().clone().into_parts(); + let physical_plan = self.df.as_ref().clone().create_physical_plan().await?; + let displayable_plan = DisplayableExecutionPlan::new(physical_plan.as_ref()); + + println!("{}", displayable_plan.indent(true)); + + Ok(Self { + df: Arc::new(DataFrame::new(session_state, plan)), + context: self.context.clone(), + }) + } + /// execute the stream and write the results to a give kafka topic pub async fn sink_kafka( self, diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 65a7198..736b81e 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -60,7 +60,7 @@ async fn main() -> Result<()> { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; } } diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 4097ea5..4682819 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -41,7 +41,6 @@ async fn main() -> Result<()> { let ds = ctx .from_topic(source_topic) .await? - // .filter(col("reading").gt(lit(70)))? .window( vec![], vec![ @@ -53,7 +52,7 @@ async fn main() -> Result<()> { Duration::from_millis(1_000), None, )? - .filter(col("max").lt(lit(113)))?; + .filter(col("max").gt(lit(113)))?; ds.clone().print_stream().await?;