Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support filter pushdown for datafusion #203

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions crates/core/src/exprs/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use crate::exprs::HudiOperator;

use anyhow::{Context, Result};
use arrow_array::{ArrayRef, Scalar, StringArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_schema::{DataType, Field, Schema};
use std::str::FromStr;

/// A partition filter that represents a filter expression for partition pruning.
#[derive(Debug, Clone)]
pub struct PartitionFilter {
jonathanc-n marked this conversation as resolved.
Show resolved Hide resolved
pub field: Field,
pub operator: HudiOperator,
pub value: Scalar<ArrayRef>,
}

impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter {
type Error = anyhow::Error;

fn try_from((filter, partition_schema): ((&str, &str, &str), &Schema)) -> Result<Self> {
let (field_name, operator_str, value_str) = filter;

let field: &Field = partition_schema
.field_with_name(field_name)
.with_context(|| format!("Field '{}' not found in partition schema", field_name))?;

let operator = HudiOperator::from_str(operator_str)
.with_context(|| format!("Unsupported operator: {}", operator_str))?;

let value = &[value_str];
let value = Self::cast_value(value, field.data_type())
.with_context(|| format!("Unable to cast {:?} as {:?}", value, field.data_type()))?;

let field = field.clone();
Ok(PartitionFilter {
field,
operator,
value,
})
}
}

impl PartitionFilter {
pub fn cast_value(value: &[&str; 1], data_type: &DataType) -> Result<Scalar<ArrayRef>> {
let cast_options = CastOptions {
safe: false,
format_options: Default::default(),
};

let value = StringArray::from(Vec::from(value));

Ok(Scalar::new(cast_with_options(
&value,
data_type,
&cast_options,
)?))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::exprs::HudiOperator;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::Datum;
use std::str::FromStr;

fn create_test_schema() -> Schema {
Schema::new(vec![
Field::new("date", DataType::Date32, false),
Field::new("category", DataType::Utf8, false),
Field::new("count", DataType::Int32, false),
])
}

#[test]
fn test_partition_filter_try_from_valid() {
let schema = create_test_schema();
let filter_tuple = ("date", "=", "2023-01-01");
let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_ok());
let filter = filter.unwrap();
assert_eq!(filter.field.name(), "date");
assert_eq!(filter.operator, HudiOperator::Eq);
assert_eq!(filter.value.get().0.len(), 1);

let filter_tuple = ("category", "!=", "foo");
let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_ok());
let filter = filter.unwrap();
assert_eq!(filter.field.name(), "category");
assert_eq!(filter.operator, HudiOperator::Ne);
assert_eq!(filter.value.get().0.len(), 1);
assert_eq!(
StringArray::from(filter.value.into_inner().to_data()).value(0),
"foo"
)
}

#[test]
fn test_partition_filter_try_from_invalid_field() {
let schema = create_test_schema();
let filter_tuple = ("invalid_field", "=", "2023-01-01");
let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_err());
assert!(filter
.unwrap_err()
.to_string()
.contains("not found in partition schema"));
}

#[test]
fn test_partition_filter_try_from_invalid_operator() {
let schema = create_test_schema();
let filter_tuple = ("date", "??", "2023-01-01");
let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_err());
assert!(filter
.unwrap_err()
.to_string()
.contains("Unsupported operator: ??"));
}

#[test]
fn test_partition_filter_try_from_invalid_value() {
let schema = create_test_schema();
let filter_tuple = ("count", "=", "not_a_number");
let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_err());
assert!(filter.unwrap_err().to_string().contains("Unable to cast"));
}

#[test]
fn test_partition_filter_try_from_all_operators() {
let schema = create_test_schema();
for (op, _) in HudiOperator::TOKEN_OP_PAIRS {
let filter_tuple = ("count", op, "10");
let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_ok(), "Failed for operator: {}", op);
let filter = filter.unwrap();
assert_eq!(filter.field.name(), "count");
assert_eq!(filter.operator, HudiOperator::from_str(op).unwrap());
}
}
}
107 changes: 107 additions & 0 deletions crates/core/src/exprs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

pub mod filter;

use anyhow::{anyhow, Error};
use std::cmp::PartialEq;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::str::FromStr;

pub use filter::*;

/// An operator that represents a comparison operation used in a partition filter expression.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum HudiOperator {
jonathanc-n marked this conversation as resolved.
Show resolved Hide resolved
Eq,
Ne,
Lt,
Lte,
Gt,
Gte,
}

impl Display for HudiOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
// Binary Operators
HudiOperator::Eq => write!(f, "="),
HudiOperator::Ne => write!(f, "!="),
HudiOperator::Lt => write!(f, "<"),
HudiOperator::Lte => write!(f, "<="),
HudiOperator::Gt => write!(f, ">"),
HudiOperator::Gte => write!(f, ">="),
}
}
}

// TODO: Add more operators
impl HudiOperator {
pub const TOKEN_OP_PAIRS: [(&'static str, HudiOperator); 6] = [
("=", HudiOperator::Eq),
("!=", HudiOperator::Ne),
("<", HudiOperator::Lt),
("<=", HudiOperator::Lte),
(">", HudiOperator::Gt),
(">=", HudiOperator::Gte),
];
}

impl FromStr for HudiOperator {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
HudiOperator::TOKEN_OP_PAIRS
.iter()
.find_map(|&(token, op)| {
if token.eq_ignore_ascii_case(s) {
Some(op)
} else {
None
}
})
.ok_or_else(|| anyhow!("Unsupported operator: {}", s))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_operator_from_str() {
assert_eq!(HudiOperator::from_str("=").unwrap(), HudiOperator::Eq);
assert_eq!(HudiOperator::from_str("!=").unwrap(), HudiOperator::Ne);
assert_eq!(HudiOperator::from_str("<").unwrap(), HudiOperator::Lt);
assert_eq!(HudiOperator::from_str("<=").unwrap(), HudiOperator::Lte);
assert_eq!(HudiOperator::from_str(">").unwrap(), HudiOperator::Gt);
assert_eq!(HudiOperator::from_str(">=").unwrap(), HudiOperator::Gte);
assert!(HudiOperator::from_str("??").is_err());
}

#[test]
fn test_operator_display() {
assert_eq!(HudiOperator::Eq.to_string(), "=");
assert_eq!(HudiOperator::Ne.to_string(), "!=");
assert_eq!(HudiOperator::Lt.to_string(), "<");
assert_eq!(HudiOperator::Lte.to_string(), "<=");
assert_eq!(HudiOperator::Gt.to_string(), ">");
assert_eq!(HudiOperator::Gte.to_string(), ">=");
}
}
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
//! ```

pub mod config;
pub mod exprs;
pub mod file_group;
pub mod storage;
pub mod table;
Expand Down
22 changes: 20 additions & 2 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,22 @@ mod tests {
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::Table;
use crate::table::{PartitionFilter, Table};

use anyhow::anyhow;
use arrow::datatypes::{DataType, Field, Schema};
use hudi_tests::TestTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use url::Url;

fn create_test_schema() -> Schema {
Schema::new(vec![
Field::new("byteField", DataType::Int32, false),
Field::new("shortField", DataType::Int32, false),
])
}

async fn create_test_fs_view(base_url: Url) -> FileSystemView {
FileSystemView::new(
Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, base_url)])),
Expand Down Expand Up @@ -296,8 +306,16 @@ mod tests {
.await
.unwrap();
let partition_schema = hudi_table.get_partition_schema().await.unwrap();

let schema = create_test_schema();
let filter_lt_20 = PartitionFilter::try_from((("byteField", "<", "20"), &schema))
.map_err(|e| anyhow!("Failed to create PartitionFilter: {}", e))
.unwrap();
let filter_eq_300 = PartitionFilter::try_from((("shortField", "=", "300"), &schema))
.map_err(|e| anyhow!("Failed to create PartitionFilter: {}", e))
jonathanc-n marked this conversation as resolved.
Show resolved Hide resolved
.unwrap();
let partition_pruner = PartitionPruner::new(
&[("byteField", "<", "20"), ("shortField", "=", "300")],
&[filter_lt_20, filter_eq_300],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
)
Expand Down
Loading
Loading