Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to AggregateFunctionExpr #36

Merged
merged 6 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 110 additions & 145 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ denormalized-orchestrator = { path = "crates/orchestrator" }

datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "main" }

arrow = { version = "52.0.0", features = ["prettyprint"] }
arrow-array = { version = "52.0.0", default-features = false, features = [
arrow = { version = "53.0.0", features = ["prettyprint"] }
arrow-array = { version = "53.0.0", default-features = false, features = [
"chrono-tz",
] }
arrow-schema = { version = "52.0.0", default-features = false }
arrow-ipc = { version = "52.0.0", default-features = false, features = ["lz4"] }
arrow-json = { version = "52.0.0" }
arrow-string = { version = "52.0.0", default-features = false }
arrow-ord = { version = "52.0.0", default-features = false }
arrow-schema = { version = "53.0.0", default-features = false }
arrow-ipc = { version = "53.0.0", default-features = false, features = ["lz4"] }
arrow-json = { version = "53.0.0" }
arrow-string = { version = "53.0.0", default-features = false }
arrow-ord = { version = "53.0.0", default-features = false }

apache-avro = { version = "0.16", default-features = false, features = [
"bzip",
Expand All @@ -56,4 +56,4 @@ serde_json = "1"
base64 = "0.22.1"
chrono = { version = "0.4.38", default-features = false }
itertools = "0.13"
pyo3 = { version = "0.21.2", features = ["experimental-async"] }
pyo3 = { version = "0.22.2", features = ["experimental-async"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did this dep need to be updated? I think datafusion-python currently requires pyo3 0.21

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. the latest DF depends on arrow 53.0.0, which needs a bump for pyo3

4 changes: 2 additions & 2 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ impl Context {
let config = SessionConfig::new()
.set(
"datafusion.execution.batch_size",
datafusion::common::ScalarValue::UInt64(Some(32)),
&datafusion::common::ScalarValue::UInt64(Some(32)),
)
// coalesce_batches slows down the pipeline and increases latency as it tries to concat
// small batches together so we disable it.
.set(
"datafusion.execution.coalesce_batches",
datafusion::common::ScalarValue::Boolean(Some(false)),
&datafusion::common::ScalarValue::Boolean(Some(false)),
);

let runtime = Arc::new(RuntimeEnv::default());
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/datasource/kafka/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration};

use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};

use datafusion::logical_expr::Expr;
use datafusion::logical_expr::SortExpr;

use crate::physical_plan::utils::time::TimestampUnit;
use crate::utils::arrow_helpers::infer_arrow_schema_from_json_value;
Expand All @@ -29,7 +29,7 @@ pub struct KafkaReadConfig {
pub schema: SchemaRef,

pub encoding: StreamEncoding,
pub order: Vec<Vec<Expr>>,
pub order: Vec<Vec<SortExpr>>,
pub partition_count: i32,
pub timestamp_column: String,
pub timestamp_unit: TimestampUnit,
Expand Down
45 changes: 21 additions & 24 deletions crates/core/src/datasource/kafka/topic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_schema::{Schema, SchemaRef, SortOptions};
use datafusion::catalog::Session;
use datafusion::common::{not_impl_err, plan_err, Result};
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::{Expr, TableType};
use datafusion::logical_expr::{Expr, SortExpr, TableType};
use datafusion::physical_expr::{expressions, LexOrdering, PhysicalSortExpr};
use datafusion::physical_plan::ExecutionPlan;

Expand Down Expand Up @@ -89,36 +89,33 @@ impl TableProvider for TopicReader {
}
}

fn create_ordering(schema: &Schema, sort_order: &[Vec<Expr>]) -> Result<Vec<LexOrdering>> {
fn create_ordering(schema: &Schema, sort_order: &[Vec<SortExpr>]) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];

for exprs in sort_order {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for expr in exprs {
match expr {
Expr::Sort(sort) => match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
},
expr => {
return plan_err!(
"Expected single column references in output_ordering, got {expr}"
)
for sort in exprs {
match &sort.expr {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
},
expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
expr => {
return plan_err!(
"Expected single column references in output_ordering, got {expr}"
)
}
}
}
if !sort_exprs.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use arrow::{
use arrow_array::{ArrayRef, PrimitiveArray, RecordBatch, StructArray, TimestampMillisecondArray};
use arrow_ord::cmp;
use arrow_schema::{Schema, SchemaRef};
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
use datafusion::{
common::{utils::proxy::VecAllocExt, DataFusionError, Result},
execution::memory_pool::{MemoryConsumer, MemoryReservation},
Expand All @@ -31,9 +32,9 @@ use datafusion::{
AggregateMode,
},
metrics::BaselineMetrics,
AggregateExpr,
},
};

use futures::{Stream, StreamExt};

use crate::physical_plan::utils::time::RecordBatchWatermark;
Expand All @@ -51,7 +52,7 @@ pub struct GroupedWindowAggStream {
pub schema: SchemaRef,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
exec_aggregate_expressions: Vec<Arc<dyn AggregateExpr>>,
exec_aggregate_expressions: Vec<Arc<AggregateFunctionExpr>>,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
latest_watermark: Arc<Mutex<Option<SystemTime>>>,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/physical_plan/continuous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use datafusion::{
pub mod grouped_window_agg_stream;
pub mod streaming_window;

use datafusion::physical_expr::AggregateExpr;
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
use log::debug;

pub(crate) type GroupsAccumulatorItem = Box<dyn GroupsAccumulator>;

pub(crate) fn create_group_accumulator(
agg_expr: &Arc<dyn AggregateExpr>,
agg_expr: &Arc<AggregateFunctionExpr>,
) -> Result<Box<dyn GroupsAccumulator>> {
if agg_expr.groups_accumulator_supported() {
agg_expr.create_groups_accumulator()
Expand Down
24 changes: 13 additions & 11 deletions crates/core/src/physical_plan/continuous/streaming_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use arrow_ord::cmp;
use arrow_schema::{Field, Schema, SchemaRef};

use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
use datafusion::physical_expr::{
equivalence::{collapse_lex_req, ProjectionMapping},
expressions::UnKnownColumn,
AggregateExpr, Partitioning, PhysicalExpr, PhysicalSortRequirement,
Partitioning, PhysicalExpr, PhysicalSortRequirement,
};
use datafusion::physical_plan::{
aggregates::{
Expand All @@ -32,6 +33,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
InputOrderMode, PlanProperties,
};

use datafusion::{
common::{internal_err, stats::Precision, DataFusionError, Statistics},
physical_plan::Distribution,
Expand Down Expand Up @@ -193,7 +195,7 @@ pub enum PhysicalStreamingWindowType {
#[derive(Debug)]
pub struct StreamingWindowExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
pub aggregate_expressions: Vec<Arc<dyn AggregateExpr>>,
pub aggregate_expressions: Vec<Arc<AggregateFunctionExpr>>,
pub filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
/// Schema after the window is run
pub group_by: PhysicalGroupBy,
Expand All @@ -214,7 +216,7 @@ impl StreamingWindowExec {
pub fn try_new(
mode: AggregateMode,
group_by: PhysicalGroupBy,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
Expand All @@ -225,7 +227,7 @@ impl StreamingWindowExec {
&input.schema(),
group_by.expr(),
&aggr_expr,
group_by.contains_null(),
false, //group_by.contains_null(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's up with this comment?

mode,
)?;

Expand All @@ -246,7 +248,7 @@ impl StreamingWindowExec {
pub fn try_new_with_schema(
mode: AggregateMode,
group_by: PhysicalGroupBy,
mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
mut aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
Expand Down Expand Up @@ -363,7 +365,7 @@ impl StreamingWindowExec {
PlanProperties::new(eq_properties, output_partitioning, ExecutionMode::Unbounded)
}
/// Aggregate expressions
pub fn aggr_expr(&self) -> &[Arc<dyn AggregateExpr>] {
pub fn aggr_expr(&self) -> &[Arc<AggregateFunctionExpr>] {
&self.aggregate_expressions
}

Expand Down Expand Up @@ -608,7 +610,7 @@ pub struct WindowAggStream {
pub schema: SchemaRef,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
exec_aggregate_expressions: Vec<Arc<dyn AggregateExpr>>,
exec_aggregate_expressions: Vec<Arc<AggregateFunctionExpr>>,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
latest_watermark: Arc<Mutex<Option<SystemTime>>>,
Expand Down Expand Up @@ -809,7 +811,7 @@ impl FullWindowAggFrame {
pub fn new(
start_time: SystemTime,
end_time: SystemTime,
exec_aggregate_expressions: &[Arc<dyn AggregateExpr>],
exec_aggregate_expressions: &[Arc<AggregateFunctionExpr>],
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,

Expand Down Expand Up @@ -850,7 +852,7 @@ struct FullWindowAggStream {
pub schema: SchemaRef,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
exec_aggregate_expressions: Vec<Arc<dyn AggregateExpr>>,
exec_aggregate_expressions: Vec<Arc<AggregateFunctionExpr>>,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
cached_frames: BTreeMap<SystemTime, FullWindowAggFrame>,
Expand Down Expand Up @@ -1056,7 +1058,7 @@ fn snap_to_window_start(timestamp: SystemTime, window_length: Duration) -> Syste
fn create_schema(
input_schema: &Schema,
group_expr: &[(Arc<dyn PhysicalExpr>, String)],
aggr_expr: &[Arc<dyn AggregateExpr>],
aggr_expr: &[Arc<AggregateFunctionExpr>],
contains_null_expr: bool,
mode: AggregateMode,
) -> Result<Schema> {
Expand Down Expand Up @@ -1085,7 +1087,7 @@ fn create_schema(
| AggregateMode::SinglePartitioned => {
// in final mode, the field with the final result of the accumulator
for expr in aggr_expr {
fields.push(expr.field()?)
fields.push(expr.field())
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/physical_plan/utils/accumulators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::sync::Arc;

use datafusion::common::Result;
use datafusion::logical_expr::Accumulator;
use datafusion::physical_expr::AggregateExpr;
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;

pub(crate) type AccumulatorItem = Box<dyn Accumulator>;

pub(crate) fn create_accumulators(
aggr_expr: &[Arc<dyn AggregateExpr>],
aggr_expr: &[Arc<AggregateFunctionExpr>],
) -> Result<Vec<AccumulatorItem>> {
aggr_expr
.iter()
Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ name = "denormalized_python"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.21.2", features = ["experimental-async"] }
pyo3 = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did we need pyo3 out of the py-denormalized crate?

Copy link
Contributor Author

@ameyc ameyc Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we did. cargo update doesn't work otherwise because it goes down a circular resolver path.

denormalized = { workspace = true }
datafusion = { workspace = true, features = [
"pyarrow",
Expand Down