Skip to content

Commit

Permalink
Migrate invoke_no_args to invoke_batch
Browse files Browse the repository at this point in the history
Migrate scalar UDFs from deprecated invoke_no_args to invoke_batch.
  • Loading branch information
findepi committed Nov 4, 2024
1 parent 554ee00 commit 80d3cc7
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,11 @@ impl ScalarUDFImpl for Simple0ArgsScalarUDF {
Ok(self.return_type.clone())
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
not_impl_err!("{} function does not accept arguments", self.name())
}

fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(100))))
}
}
Expand Down
14 changes: 6 additions & 8 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,14 +749,12 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.return_type_from_exprs(args, schema, arg_types)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke(args)
}

fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke_no_args(number_rows)
fn invoke_batch(
&self,
args: &[ColumnarValue],
number_rows: usize,
) -> Result<ColumnarValue> {
self.inner.invoke_batch(args, number_rows)
}

fn simplify(
Expand Down
10 changes: 5 additions & 5 deletions datafusion/functions-nested/src/make_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ impl ScalarUDFImpl for MakeArray {
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
make_scalar_function(make_array_inner)(args)
}

fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
make_scalar_function(make_array_inner)(&[])
}

fn aliases(&self) -> &[String] {
&self.aliases
}
Expand Down
15 changes: 9 additions & 6 deletions datafusion/functions/src/core/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`VersionFunc`]: Implementation of the `version` function.

use arrow::datatypes::DataType;
use datafusion_common::{not_impl_err, plan_err, Result, ScalarValue};
use datafusion_common::{internal_err, plan_err, Result, ScalarValue};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_OTHER;
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
Expand Down Expand Up @@ -66,11 +66,14 @@ impl ScalarUDFImpl for VersionFunc {
}
}

fn invoke(&self, _: &[ColumnarValue]) -> Result<ColumnarValue> {
not_impl_err!("version does not take any arguments")
}

fn invoke_no_args(&self, _: usize) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
if !args.is_empty() {
return internal_err!("{} function does not accept arguments", self.name());
}
// TODO it would be great to add rust version and arrow version,
// but that requires a `build.rs` script and/or adding a version const to arrow-rs
let version = format!(
Expand Down
11 changes: 9 additions & 2 deletions datafusion/functions/src/math/pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::OnceLock;

use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Float64;
use datafusion_common::{not_impl_err, Result, ScalarValue};
use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{
Expand Down Expand Up @@ -67,7 +67,14 @@ impl ScalarUDFImpl for PiFunc {
not_impl_err!("{} function does not accept arguments", self.name())
}

fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
if !args.is_empty() {
return internal_err!("{} function does not accept arguments", self.name());
}
Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some(
std::f64::consts::PI,
))))
Expand Down
15 changes: 9 additions & 6 deletions datafusion/functions/src/math/random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Float64;
use rand::{thread_rng, Rng};

use datafusion_common::{not_impl_err, Result};
use datafusion_common::{internal_err, Result};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_MATH;
use datafusion_expr::ColumnarValue;
use datafusion_expr::{Documentation, ScalarUDFImpl, Signature, Volatility};
Expand Down Expand Up @@ -64,11 +64,14 @@ impl ScalarUDFImpl for RandomFunc {
Ok(Float64)
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
not_impl_err!("{} function does not accept arguments", self.name())
}

fn invoke_no_args(&self, num_rows: usize) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
num_rows: usize,
) -> Result<ColumnarValue> {
if !args.is_empty() {
return internal_err!("{} function does not accept arguments", self.name());
}
let mut rng = thread_rng();
let mut values = vec![0.0; num_rows];
// Equivalent to set each element with rng.gen_range(0.0..1.0), but more efficient
Expand Down
15 changes: 9 additions & 6 deletions datafusion/functions/src/string/uuid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Utf8;
use uuid::Uuid;

use datafusion_common::{not_impl_err, Result};
use datafusion_common::{internal_err, Result};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING;
use datafusion_expr::{ColumnarValue, Documentation, Volatility};
use datafusion_expr::{ScalarUDFImpl, Signature};
Expand Down Expand Up @@ -64,13 +64,16 @@ impl ScalarUDFImpl for UuidFunc {
Ok(Utf8)
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
not_impl_err!("{} function does not accept arguments", self.name())
}

/// Prints random (v4) uuid values per row
/// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'
fn invoke_no_args(&self, num_rows: usize) -> Result<ColumnarValue> {
fn invoke_batch(
&self,
args: &[ColumnarValue],
num_rows: usize,
) -> Result<ColumnarValue> {
if !args.is_empty() {
return internal_err!("{} function does not accept arguments", self.name());
}
let values = std::iter::repeat_with(|| Uuid::new_v4().to_string()).take(num_rows);
let array = GenericStringArray::<i32>::from_iter_values(values);
Ok(ColumnarValue::Array(Arc::new(array)))
Expand Down

0 comments on commit 80d3cc7

Please sign in to comment.