Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/port_release_notes
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 18, 2024
2 parents ec996d2 + e9435a9 commit 5f405c3
Show file tree
Hide file tree
Showing 86 changed files with 4,129 additions and 1,651 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,9 @@ jobs:
#
# To reproduce:
# 1. Install the version of Rust that is failing. Example:
# rustup install 1.78.0
# rustup install 1.79.0
# 2. Run the command that failed with that version. Example:
# cargo +1.78.0 check -p datafusion
# cargo +1.79.0 check -p datafusion
#
# To resolve, either:
# 1. Change your code to use older Rust features,
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.78"
rust-version = "1.79"
version = "42.1.0"

[workspace.dependencies]
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.78"
rust-version = "1.79"
readme = "README.md"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.78-bookworm AS builder
FROM rust:1.79-bookworm AS builder

COPY . /usr/src/datafusion
COPY ./datafusion /usr/src/datafusion/datafusion
Expand Down
26 changes: 17 additions & 9 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ config_namespace! {
/// if the source of statistics is accurate.
/// We plan to make this the default in the future.
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false

/// Should DataFusion enforce batch size in joins or not. By default,
/// DataFusion will not enforce batch size in joins. Enforcing batch size
/// in joins can reduce memory usage when joining large
/// tables with a highly-selective join filter, but is also slightly slower.
pub enforce_batch_size_in_joins: bool, default = false
}
}

Expand Down Expand Up @@ -1222,16 +1228,18 @@ impl ConfigField for TableOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Extensions are handled in the public `ConfigOptions::set`
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
let Some(format) = &self.current_format else {
return _config_err!("Specify a format for TableOptions");
};
match key {
"format" => match format {
#[cfg(feature = "parquet")]
ConfigFileType::PARQUET => self.parquet.set(rem, value),
ConfigFileType::CSV => self.csv.set(rem, value),
ConfigFileType::JSON => self.json.set(rem, value),
},
"format" => {
let Some(format) = &self.current_format else {
return _config_err!("Specify a format for TableOptions");
};
match format {
#[cfg(feature = "parquet")]
ConfigFileType::PARQUET => self.parquet.set(rem, value),
ConfigFileType::CSV => self.csv.set(rem, value),
ConfigFileType::JSON => self.json.set(rem, value),
}
}
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
}
Expand Down
73 changes: 1 addition & 72 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::vec::IntoIter;

use crate::error::_plan_err;
use crate::utils::{merge_and_order_indices, set_difference};
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};

use sqlparser::ast::TableConstraint;
use crate::{DFSchema, JoinType};

/// This object defines a constraint on a table.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
Expand Down Expand Up @@ -60,74 +57,6 @@ impl Constraints {
Self { inner: constraints }
}

/// Convert each `TableConstraint` to corresponding `Constraint`
pub fn new_from_table_constraints(
constraints: &[TableConstraint],
df_schema: &DFSchemaRef,
) -> Result<Self> {
let constraints = constraints
.iter()
.map(|c: &TableConstraint| match c {
TableConstraint::Unique { name, columns, .. } => {
let field_names = df_schema.field_names();
// Get unique constraint indices in the schema:
let indices = columns
.iter()
.map(|u| {
let idx = field_names
.iter()
.position(|item| *item == u.value)
.ok_or_else(|| {
let name = name
.as_ref()
.map(|name| format!("with name '{name}' "))
.unwrap_or("".to_string());
DataFusionError::Execution(
format!("Column for unique constraint {}not found in schema: {}", name,u.value)
)
})?;
Ok(idx)
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraint::Unique(indices))
}
TableConstraint::PrimaryKey { columns, .. } => {
let field_names = df_schema.field_names();
// Get primary key indices in the schema:
let indices = columns
.iter()
.map(|pk| {
let idx = field_names
.iter()
.position(|item| *item == pk.value)
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Column for primary key not found in schema: {}",
pk.value
))
})?;
Ok(idx)
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraint::PrimaryKey(indices))
}
TableConstraint::ForeignKey { .. } => {
_plan_err!("Foreign key constraints are not currently supported")
}
TableConstraint::Check { .. } => {
_plan_err!("Check constraints are not currently supported")
}
TableConstraint::Index { .. } => {
_plan_err!("Indexes are not currently supported")
}
TableConstraint::FulltextOrSpatial { .. } => {
_plan_err!("Indexes are not currently supported")
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraints::new_unverified(constraints))
}

/// Check whether constraints is empty
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.78"
rust-version = "1.79"

[lints]
workspace = true
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2987,9 +2987,7 @@ mod tests {
JoinType::Inner,
Some(Expr::Literal(ScalarValue::Null)),
)?;
let expected_plan = "CrossJoin:\
\n TableScan: a projection=[c1], full_filters=[Boolean(NULL)]\
\n TableScan: b projection=[c1]";
let expected_plan = "EmptyRelation";
assert_eq!(expected_plan, format!("{}", join.into_optimized_plan()?));

// JOIN ON expression must be boolean type
Expand Down
36 changes: 18 additions & 18 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;
use super::ListingTableUrl;
use super::PartitionedFile;
use crate::execution::context::SessionState;
use datafusion_common::internal_err;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{BinaryExpr, Operator};

Expand Down Expand Up @@ -285,25 +286,20 @@ async fn prune_partitions(
let props = ExecutionProps::new();

// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props).ok()?;
expr.evaluate(&batch)
.ok()?
.into_array(partitions.len())
.ok()
let do_filter = |filter| -> Result<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props)?;
expr.evaluate(&batch)?.into_array(partitions.len())
};

//.Compute the conjunction of the filters, ignoring errors
//.Compute the conjunction of the filters
let mask = filters
.iter()
.fold(None, |acc, filter| match (acc, do_filter(filter)) {
(Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
(None, Some(r)) => Some(r.as_boolean().clone()),
(r, None) => r,
});
.map(|f| do_filter(f).map(|a| a.as_boolean().clone()))
.reduce(|a, b| Ok(and(&a?, &b?)?));

let mask = match mask {
Some(mask) => mask,
Some(Ok(mask)) => mask,
Some(Err(err)) => return Err(err),
None => return Ok(partitions),
};

Expand Down Expand Up @@ -401,8 +397,8 @@ fn evaluate_partition_prefix<'a>(

/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// `filters` might contain expressions that can be resolved only at the
/// file level (e.g. Parquet row group pruning).
/// `filters` should only contain expressions that can be evaluated
/// using only the partition columns.
pub async fn pruned_partition_list<'a>(
ctx: &'a SessionState,
store: &'a dyn ObjectStore,
Expand All @@ -413,6 +409,12 @@ pub async fn pruned_partition_list<'a>(
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
// if no partition col => simply list all the files
if partition_cols.is_empty() {
if !filters.is_empty() {
return internal_err!(
"Got partition filters for unpartitioned table {}",
table_path
);
}
return Ok(Box::pin(
table_path
.list_all_files(ctx, store, file_extension)
Expand Down Expand Up @@ -631,13 +633,11 @@ mod tests {
]);
let filter1 = Expr::eq(col("part1"), lit("p1v2"));
let filter2 = Expr::eq(col("part2"), lit("p2v1"));
// filter3 cannot be resolved at partition pruning
let filter3 = Expr::eq(col("part2"), col("other"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2, filter3],
&[filter1, filter2],
".parquet",
&[
(String::from("part1"), DataType::Utf8),
Expand Down
69 changes: 37 additions & 32 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,16 @@ impl ListingTable {
}
}

// Expressions can be used for parttion pruning if they can be evaluated using
// only the partiton columns and there are partition columns.
fn can_be_evaluted_for_partition_pruning(
partition_column_names: &[&str],
expr: &Expr,
) -> bool {
!partition_column_names.is_empty()
&& expr_applicable_for_cols(partition_column_names, expr)
}

#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
Expand All @@ -807,10 +817,28 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// extract types of partition columns
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let table_partition_col_names = table_partition_cols
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
// If the filters can be resolved using only partition cols, there is no need to
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
let (partition_filters, filters): (Vec<_>, Vec<_>) =
filters.iter().cloned().partition(|filter| {
can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
});
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
let (mut partitioned_file_lists, statistics) = self
.list_files_for_scan(session_state, filters, limit)
.list_files_for_scan(session_state, &partition_filters, limit)
.await?;

// if no files need to be read, return an `EmptyExec`
Expand Down Expand Up @@ -846,28 +874,6 @@ impl TableProvider for ListingTable {
None => {} // no ordering required
};

// extract types of partition columns
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

// If the filters can be resolved using only partition cols, there is no need to
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
let table_partition_col_names = table_partition_cols
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
let filters = filters
.iter()
.filter(|filter| {
!expr_applicable_for_cols(&table_partition_col_names, filter)
})
.cloned()
.collect::<Vec<_>>();

let filters = conjunction(filters.to_vec())
.map(|expr| -> Result<_> {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
Expand Down Expand Up @@ -908,18 +914,17 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
let partition_column_names = self
.options
.table_partition_cols
.iter()
.map(|col| col.0.as_str())
.collect::<Vec<_>>();
filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|col| col.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
if can_be_evaluted_for_partition_pruning(&partition_column_names, filter)
{
// if filter can be handled by partition pruning, it is exact
return Ok(TableProviderFilterPushDown::Exact);
}
Expand Down
Loading

0 comments on commit 5f405c3

Please sign in to comment.