Skip to content

Commit

Permalink
Update datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 19, 2024
1 parent 4ac04b4 commit a8895fb
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 39 deletions.
99 changes: 81 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ chrono = { version = "0.4.38", default-features = false }
itertools = "0.13"

[patch.crates-io]
datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", rev="fc67038" }
datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", rev = "80874c4" }
10 changes: 5 additions & 5 deletions crates/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ impl StreamingLogicalPlanBuilder for LogicalPlanBuilder {
window_length: Duration,
slide: Option<Duration>,
) -> Result<Self> {
let group_expr = normalize_cols(group_expr, &self.plan)?;
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
let group_expr = normalize_cols(group_expr, self.plan())?;
let aggr_expr = normalize_cols(aggr_expr, self.plan())?;

let group_expr = add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?;
let group_expr = add_group_by_exprs_from_dependencies(group_expr, self.schema())?;
let window: StreamingWindowType = slide.map_or_else(
|| StreamingWindowType::Tumbling(window_length),
|_slide| StreamingWindowType::Sliding(window_length, _slide),
);

let plan = self.plan.clone();
let plan = self.plan().clone();

Aggregate::try_new(Arc::new(self.plan), group_expr, aggr_expr)
Aggregate::try_new(Arc::new(plan.clone()), group_expr, aggr_expr)
.map(|new_aggr| {
LogicalPlan::Extension(Extension {
node: Arc::new(StreamingWindowPlanNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl GroupedWindowAggStream {
};

let group_by = exec_operator.group_by.clone();
let group_schema = group_schema(&agg_schema, group_by.expr.len());
let group_schema = group_schema(&agg_schema, group_by.expr().len());
Ok(Self {
schema: agg_schema,
input,
Expand Down Expand Up @@ -461,7 +461,7 @@ pub(crate) fn evaluate_group_by(
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
let exprs: Vec<ArrayRef> = group_by
.expr
.expr()
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
Expand All @@ -470,7 +470,7 @@ pub(crate) fn evaluate_group_by(
.collect::<Result<Vec<_>>>()?;

let null_exprs: Vec<ArrayRef> = group_by
.null_expr
.null_expr()
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
Expand All @@ -479,7 +479,7 @@ pub(crate) fn evaluate_group_by(
.collect::<Result<Vec<_>>>()?;

Ok(group_by
.groups
.groups()
.iter()
.map(|group| {
group
Expand Down
Loading

0 comments on commit a8895fb

Please sign in to comment.