From 6709da84a9fe612c110a591dd808f31bfe42e4ad Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 8 Aug 2024 10:21:09 +0100 Subject: [PATCH] some simplification --- bench-vortex/src/tpch/mod.rs | 12 +++++------- vortex-datafusion/src/lib.rs | 8 -------- vortex-datafusion/src/persistent/execution.rs | 18 ++++-------------- vortex-datafusion/src/persistent/provider.rs | 14 +++++++------- 4 files changed, 16 insertions(+), 36 deletions(-) diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index a6f7d0a650..7d25ac147f 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -3,14 +3,14 @@ use std::fs; use std::path::Path; use std::sync::Arc; -use arrow_array::StructArray; +use arrow_array::StructArray as ArrowStructArray; use arrow_schema::Schema; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; use tokio::fs::OpenOptions; -use vortex::array::ChunkedArray; +use vortex::array::{ChunkedArray, StructArray}; use vortex::arrow::FromArrowArray; use vortex::variants::StructArrayTrait; use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; @@ -249,7 +249,7 @@ async fn register_vortex_file( }) .collect::>(); - let data = vortex::array::StructArray::from_fields(&fields).into_array(); + let data = StructArray::from_fields(&fields).into_array(); let data = if enable_compression { let compressor = SamplingCompressor::default(); @@ -266,9 +266,7 @@ async fn register_vortex_file( .await?; let mut writer = LayoutWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; anyhow::Ok(()) @@ -276,7 +274,7 @@ async fn register_vortex_file( ) .await?; - let f = tokio::fs::File::options() + let f = OpenOptions::new() .read(true) .write(true) .open(&vtx_file) @@ -324,7 +322,7 @@ async fn register_vortex( let chunks: Vec = record_batches .iter() .cloned() - .map(StructArray::from) + .map(ArrowStructArray::from) .map(|struct_array| Array::from_arrow(&struct_array, false)) .collect(); diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index d91f62b224..308920f885 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -80,10 +80,6 @@ pub trait SessionContextExt { options: VortexMemTableOptions, ) -> DFResult; - fn register_disk_vortex>(&self, name: S, url: ObjectStoreUrl) -> DFResult<()> { - self.register_disk_vortex_opts(name, url, VortexTableOptions::default()) - } - fn register_disk_vortex_opts>( &self, name: S, @@ -91,10 +87,6 @@ pub trait SessionContextExt { options: VortexTableOptions, ) -> DFResult<()>; - fn read_disk_vortex(&self, url: ObjectStoreUrl) -> DFResult { - self.read_disk_vortex_opts(url, VortexTableOptions::default()) - } - fn read_disk_vortex_opts( &self, url: ObjectStoreUrl, diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 51e2b78225..3c54f0e729 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -2,7 +2,7 @@ use std::fmt; use std::sync::Arc; use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; -use datafusion_common::Result as DFResult; +use datafusion_common::{project_schema, Result as DFResult}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -18,7 +18,6 @@ pub struct VortexExec { metrics: ExecutionPlanMetricsSet, predicate: Option>, plan_properties: PlanProperties, - projection: Option>, } impl VortexExec { @@ -28,26 +27,17 @@ impl VortexExec { projection: Option<&Vec>, predicate: Option>, ) -> DFResult { - let partitioning = Partitioning::UnknownPartitioning(1); - - let projected_schema = if let Some(projection) = projection { - Arc::new(file_scan_config.file_schema.clone().project(projection)?) - } else { - file_scan_config.file_schema.clone() - }; - + let projected_schema = project_schema(&file_scan_config.file_schema, projection)?; let plan_properties = PlanProperties::new( EquivalenceProperties::new(projected_schema), - partitioning, + Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); - let projection = projection.cloned(); Ok(Self { file_scan_config, metrics, predicate, - projection, plan_properties, }) } @@ -99,7 +89,7 @@ impl ExecutionPlan for VortexExec { .object_store(&self.file_scan_config.object_store_url)?; let opener = VortexFileOpener { object_store, - projection: self.projection.clone(), + projection: self.file_scan_config.projection.clone(), batch_size: None, predicate: self.predicate.clone(), }; diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs index 5159e01481..3cb9cfdc81 100644 --- a/vortex-datafusion/src/persistent/provider.rs +++ b/vortex-datafusion/src/persistent/provider.rs @@ -6,10 +6,11 @@ use async_trait::async_trait; use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; -use datafusion_common::{DFSchema, Result as DFResult, Statistics}; +use datafusion_common::{project_schema, Result as DFResult, Statistics, ToDFSchema}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::utils::conjunction; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::ExecutionPlan; @@ -53,13 +54,12 @@ impl TableProvider for VortexFileTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - let projected_schema = if let Some(projection) = projection { - Arc::new(self.schema().project(projection)?) - } else { - self.schema() - }; + if self.config.data_files.is_empty() { + let projected_schema = project_schema(&self.schema(), projection)?; + return Ok(Arc::new(EmptyExec::new(projected_schema))); + } - let df_schema = DFSchema::try_from(projected_schema.clone())?; + let df_schema = self.schema().to_dfschema()?; let predicate = conjunction(filters.to_vec()); let predicate = predicate .map(|predicate| state.create_physical_expr(predicate, &df_schema))