Skip to content

Commit

Permalink
Update to use WindowFunction::new to set additional parameters for or…
Browse files Browse the repository at this point in the history
…der_by using ExprFunctionExt
  • Loading branch information
timsaucer committed Jul 24, 2024
1 parent 404f394 commit 1818efd
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 29 deletions.
11 changes: 5 additions & 6 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,11 @@ async fn main() -> Result<()> {
df.show().await?;

// Now, run the function using the DataFrame API:
let window_expr = smooth_it.call(
vec![col("speed")], // smooth_it(speed)
vec![col("car")], // PARTITION BY car
vec![col("time").sort(true, true)], // ORDER BY time ASC
WindowFrame::new(None),
);
let window_expr = smooth_it.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;

// print the results
Expand Down
11 changes: 5 additions & 6 deletions datafusion-examples/examples/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ async fn main() -> Result<()> {
df.show().await?;

// Now, run the function using the DataFrame API:
let window_expr = smooth_it.call(
vec![col("speed")], // smooth_it(speed)
vec![col("car")], // PARTITION BY car
vec![col("time").sort(true, true)], // ORDER BY time ASC
WindowFrame::new(None),
);
let window_expr = smooth_it.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;

// print the results
Expand Down
24 changes: 23 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,27 @@ impl From<Arc<WindowUDF>> for WindowFunctionDefinition {
}

/// Window function
///
/// Holds the actual actual function to call
/// [`window_function::WindowFunction`] as well as its arguments
/// (`args`) and the contents of the `OVER` clause:
///
/// 1. `PARTITION BY`
/// 2. `ORDER BY`
/// 3. Window frame (e.g. `ROWS 1 PRECEDING AND 1 FOLLOWING`)
///
/// See [`Self::build`] to create an [`Expr`]
///
/// # Example
/// ```/// # use datafusion_expr::expr::WindowFunction;
/// // Create FIRST_VALUE(a) OVER (PARTITION BY b ORDER BY c)
/// let expr: Expr = Expr::WindowFunction(
/// WindowFunction::new(BuiltInWindowFunction::FirstValue, vec![col("a")])
/// )
/// .with_partition_by(vec![col("b")])
/// .with_order_by(vec![col("b")])
/// .build()?;
/// ```
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct WindowFunction {
/// Name of the function
Expand All @@ -811,7 +832,8 @@ pub struct WindowFunction {
}

impl WindowFunction {
/// Create a new Window expression
/// Create a new Window expression with the specified argument an
/// empty `OVER` clause
pub fn new(fun: impl Into<WindowFunctionDefinition>, args: Vec<Expr>) -> Self {
Self {
fun: fun.into(),
Expand Down
23 changes: 7 additions & 16 deletions datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use arrow::datatypes::DataType;

use datafusion_common::Result;

use crate::expr::WindowFunction;
use crate::{
function::WindowFunctionSimplification, Expr, PartitionEvaluator,
PartitionEvaluatorFactory, ReturnTypeFunction, Signature, WindowFrame,
PartitionEvaluatorFactory, ReturnTypeFunction, Signature,
};

/// Logical representation of a user-defined window function (UDWF)
Expand Down Expand Up @@ -123,28 +124,18 @@ impl WindowUDF {
Self::new_from_impl(AliasedWindowUDFImpl::new(Arc::clone(&self.inner), aliases))
}

/// creates a [`Expr`] that calls the window function given
/// the `partition_by`, `order_by`, and `window_frame` definition
/// creates a [`Expr`] that calls the window function with default
/// values for order_by, partition_by, window_frame. See [`ExprFunctionExt`]
/// for details on setting these values.
///
/// This utility allows using the UDWF without requiring access to
/// the registry, such as with the DataFrame API.
pub fn call(
&self,
args: Vec<Expr>,
partition_by: Vec<Expr>,
order_by: Vec<Expr>,
window_frame: WindowFrame,
) -> Expr {
args: Vec<Expr>) -> Expr {
let fun = crate::WindowFunctionDefinition::WindowUDF(Arc::new(self.clone()));

Expr::WindowFunction(crate::expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
null_treatment: None,
})
Expr::WindowFunction(WindowFunction::new(fun, args))
}

/// Returns this function's name
Expand Down

0 comments on commit 1818efd

Please sign in to comment.