From dac788dde61aa11d09bcc133ff8c4c74abc23dcf Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 8 Aug 2024 10:21:09 +0100 Subject: [PATCH] some simplification --- vortex-datafusion/src/lib.rs | 8 -------- vortex-datafusion/src/persistent/execution.rs | 18 ++++-------------- vortex-datafusion/src/persistent/provider.rs | 14 +++++++------- 3 files changed, 11 insertions(+), 29 deletions(-) diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index d91f62b224..308920f885 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -80,10 +80,6 @@ pub trait SessionContextExt { options: VortexMemTableOptions, ) -> DFResult; - fn register_disk_vortex>(&self, name: S, url: ObjectStoreUrl) -> DFResult<()> { - self.register_disk_vortex_opts(name, url, VortexTableOptions::default()) - } - fn register_disk_vortex_opts>( &self, name: S, @@ -91,10 +87,6 @@ pub trait SessionContextExt { options: VortexTableOptions, ) -> DFResult<()>; - fn read_disk_vortex(&self, url: ObjectStoreUrl) -> DFResult { - self.read_disk_vortex_opts(url, VortexTableOptions::default()) - } - fn read_disk_vortex_opts( &self, url: ObjectStoreUrl, diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 51e2b78225..3c54f0e729 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -2,7 +2,7 @@ use std::fmt; use std::sync::Arc; use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; -use datafusion_common::Result as DFResult; +use datafusion_common::{project_schema, Result as DFResult}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -18,7 +18,6 @@ pub struct VortexExec { metrics: ExecutionPlanMetricsSet, predicate: Option>, plan_properties: PlanProperties, - projection: Option>, } impl VortexExec { @@ -28,26 +27,17 @@ impl VortexExec { projection: Option<&Vec>, predicate: Option>, ) -> DFResult { - let partitioning = Partitioning::UnknownPartitioning(1); - - let projected_schema = if let Some(projection) = projection { - Arc::new(file_scan_config.file_schema.clone().project(projection)?) - } else { - file_scan_config.file_schema.clone() - }; - + let projected_schema = project_schema(&file_scan_config.file_schema, projection)?; let plan_properties = PlanProperties::new( EquivalenceProperties::new(projected_schema), - partitioning, + Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); - let projection = projection.cloned(); Ok(Self { file_scan_config, metrics, predicate, - projection, plan_properties, }) } @@ -99,7 +89,7 @@ impl ExecutionPlan for VortexExec { .object_store(&self.file_scan_config.object_store_url)?; let opener = VortexFileOpener { object_store, - projection: self.projection.clone(), + projection: self.file_scan_config.projection.clone(), batch_size: None, predicate: self.predicate.clone(), }; diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs index 5159e01481..3cb9cfdc81 100644 --- a/vortex-datafusion/src/persistent/provider.rs +++ b/vortex-datafusion/src/persistent/provider.rs @@ -6,10 +6,11 @@ use async_trait::async_trait; use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; -use datafusion_common::{DFSchema, Result as DFResult, Statistics}; +use datafusion_common::{project_schema, Result as DFResult, Statistics, ToDFSchema}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::utils::conjunction; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::ExecutionPlan; @@ -53,13 +54,12 @@ impl TableProvider for VortexFileTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - let projected_schema = if let Some(projection) = projection { - Arc::new(self.schema().project(projection)?) - } else { - self.schema() - }; + if self.config.data_files.is_empty() { + let projected_schema = project_schema(&self.schema(), projection)?; + return Ok(Arc::new(EmptyExec::new(projected_schema))); + } - let df_schema = DFSchema::try_from(projected_schema.clone())?; + let df_schema = self.schema().to_dfschema()?; let predicate = conjunction(filters.to_vec()); let predicate = predicate .map(|predicate| state.create_physical_expr(predicate, &df_schema))