Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds WindowUDFImpl::reverse_exprtrait method + Support for IGNORE NULLS #12662

Merged
merged 5 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub use sqlparser;
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF};
pub use udf::{ScalarUDF, ScalarUDFImpl};
pub use udwf::{WindowUDF, WindowUDFImpl};
pub use udwf::{ReversedUDWF, WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};

#[cfg(test)]
Expand Down
26 changes: 26 additions & 0 deletions datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ impl WindowUDF {
pub fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}

/// Returns the reversed user-defined window function when the
/// order of evaluation is reversed.
///
/// See [`WindowUDFImpl::reverse_expr`] for more details.
pub fn reverse_expr(&self) -> ReversedUDWF {
self.inner.reverse_expr()
}
}

impl<F> From<F> for WindowUDF
Expand Down Expand Up @@ -351,6 +359,24 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
not_impl_err!("Function {} does not implement coerce_types", self.name())
}

/// Allows customizing the behavior of the user-defined window
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 -- this is a very nice API

/// function when it is evaluated in reverse order.
fn reverse_expr(&self) -> ReversedUDWF {
ReversedUDWF::NotSupported
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the newly added trait method returns a default value, I don't think this is a breaking API change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it 👍

}
}

pub enum ReversedUDWF {
/// The result of evaluating the user-defined window function
/// remains identical when reversed.
Identical,
/// A window function which does not support evaluating the result
/// in reverse order.
NotSupported,
/// Customize the user-defined window function for evaluating the
/// result in reverse order.
Reversed(Arc<WindowUDF>),
}

impl PartialEq for dyn WindowUDFImpl {
Expand Down
28 changes: 24 additions & 4 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use datafusion_common::{
exec_datafusion_err, exec_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{
BuiltInWindowFunction, PartitionEvaluator, WindowFrame, WindowFunctionDefinition,
WindowUDF,
BuiltInWindowFunction, PartitionEvaluator, ReversedUDWF, WindowFrame,
WindowFunctionDefinition, WindowUDF,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::equivalence::collapse_lex_req;
Expand Down Expand Up @@ -130,7 +130,7 @@ pub fn create_window_expr(
}
// TODO: Ordering not supported for Window UDFs yet
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new(
create_udwf_window_expr(fun, args, input_schema, name)?,
create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?,
partition_by,
order_by,
window_frame,
Expand Down Expand Up @@ -329,6 +329,7 @@ fn create_udwf_window_expr(
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
// need to get the types into an owned vec for some reason
let input_types: Vec<_> = args
Expand All @@ -341,6 +342,8 @@ fn create_udwf_window_expr(
args: args.to_vec(),
input_types,
name,
is_reversed: false,
ignore_nulls,
}))
}

Expand All @@ -353,6 +356,12 @@ struct WindowUDFExpr {
name: String,
/// Types of input expressions
input_types: Vec<DataType>,
/// This is set to `true` only if the user-defined window function
/// expression supports evaluation in reverse order, and the
/// evaluation order is reversed.
is_reversed: bool,
/// Set to `true` if `IGNORE NULLS` is defined, `false` otherwise.
ignore_nulls: bool,
}

impl BuiltInWindowFunctionExpr for WindowUDFExpr {
Expand All @@ -378,7 +387,18 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
}

fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
None
match self.fun.reverse_expr() {
ReversedUDWF::Identical => Some(Arc::new(self.clone())),
ReversedUDWF::NotSupported => None,
ReversedUDWF::Reversed(fun) => Some(Arc::new(WindowUDFExpr {
fun,
args: self.args.clone(),
name: self.name.clone(),
input_types: self.input_types.clone(),
is_reversed: !self.is_reversed,
ignore_nulls: self.ignore_nulls,
})),
}
}

fn get_result_ordering(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
Expand Down