Skip to content

Commit

Permalink
disable coalesce_batches
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 14, 2024
1 parent c6f3d9a commit 7c74ca5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 9 deletions.
18 changes: 13 additions & 5 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@ use crate::utils::get_default_optimizer_rules;

#[derive(Clone)]
pub struct Context {
session_conext: Arc<RwLock<SessionContext>>,
pub session_conext: Arc<RwLock<SessionContext>>,
}

impl Context {
pub fn new() -> Result<Self, DataFusionError> {
let config = SessionConfig::new().set(
"datafusion.execution.batch_size",
datafusion::common::ScalarValue::UInt64(Some(32)),
);
let config = SessionConfig::new()
.set(
"datafusion.execution.batch_size",
datafusion::common::ScalarValue::UInt64(Some(32)),
)
// coalesce_batches slows down the pipeline and increases latency as it tries to concat
// small batches together so we disable it.
.set(
"datafusion.execution.coalesce_batches",
datafusion::common::ScalarValue::Boolean(Some(false)),
);

let runtime = Arc::new(RuntimeEnv::default());

let state = SessionStateBuilder::new()
Expand Down
18 changes: 17 additions & 1 deletion crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::{
logical_plan::LogicalPlanBuilder, utils::find_window_exprs, Expr, JoinType,
};
use datafusion::physical_plan::display::DisplayableExecutionPlan;

use crate::context::Context;
use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
Expand Down Expand Up @@ -163,13 +164,28 @@ impl DataStream {
Ok(self)
}

/// Prints the underlying logical_plan.
/// Prints the underlying logical plan.
/// Useful for debugging chained method calls.
pub fn print_plan(self) -> Result<Self, DataFusionError> {
println!("{}", self.df.logical_plan().display_indent());
Ok(self)
}

/// Prints the underlying physical plan.
/// Useful for debugging and development
pub async fn print_physical_plan(self) -> Result<Self, DataFusionError> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();
let physical_plan = self.df.as_ref().clone().create_physical_plan().await?;
let displayable_plan = DisplayableExecutionPlan::new(physical_plan.as_ref());

println!("{}", displayable_plan.indent(true));

Ok(Self {
df: Arc::new(DataFrame::new(session_state, plan)),
context: self.context.clone(),
})
}

/// execute the stream and write the results to a give kafka topic
pub async fn sink_kafka(
self,
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn main() -> Result<()> {
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
}
}

Expand Down
3 changes: 1 addition & 2 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ async fn main() -> Result<()> {
let ds = ctx
.from_topic(source_topic)
.await?
// .filter(col("reading").gt(lit(70)))?
.window(
vec![],
vec![
Expand All @@ -53,7 +52,7 @@ async fn main() -> Result<()> {
Duration::from_millis(1_000),
None,
)?
.filter(col("max").lt(lit(113)))?;
.filter(col("max").gt(lit(113)))?;

ds.clone().print_stream().await?;

Expand Down

0 comments on commit 7c74ca5

Please sign in to comment.