From 9ac4ec8a926e757796c9d6ff7a1fbae08caefd39 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 15 Aug 2024 13:46:46 +0100 Subject: [PATCH] . --- bench-vortex/src/bin/tpch_benchmark.rs | 1 - vortex-array/src/arrow/recordbatch.rs | 19 +++++++++++-------- vortex-datafusion/src/persistent/opener.rs | 21 +++++++++++++++++++-- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 9ed42e3134..befe6c69e6 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -112,7 +112,6 @@ async fn bench_main(queries: Option>, iterations: usize, warmup: bool for (ctx, format) in ctxs.iter().zip(formats.iter()) { if warmup { for i in 0..3 { - // warmup let row_count: usize = rt.block_on(async { ctx.sql(&query) .await diff --git a/vortex-array/src/arrow/recordbatch.rs b/vortex-array/src/arrow/recordbatch.rs index 7a23d6f913..020988890e 100644 --- a/vortex-array/src/arrow/recordbatch.rs +++ b/vortex-array/src/arrow/recordbatch.rs @@ -5,7 +5,7 @@ use itertools::Itertools; use crate::array::StructArray; use crate::arrow::FromArrowArray; use crate::validity::Validity; -use crate::{Array, IntoArray, IntoCanonical}; +use crate::{Array, IntoArrayVariant, IntoCanonical}; impl From for Array { fn from(value: RecordBatch) -> Self { @@ -33,17 +33,20 @@ impl From for Array { impl From for RecordBatch { fn from(value: Array) -> Self { - let array_ref = value - .into_canonical() - .expect("struct arrays must canonicalize") - .into_arrow(); - let struct_array = as_struct_array(array_ref.as_ref()); - RecordBatch::from(struct_array) + let struct_arr = value + .into_struct() + .expect("RecordBatch can only be constructed from a Vortex StructArray"); + Self::from(struct_arr) } } impl From for RecordBatch { fn from(value: StructArray) -> Self { - RecordBatch::from(value.into_array()) + let array_ref = value + .into_canonical() + .expect("Struct arrays must canonicalize") + .into_arrow(); + let struct_array = as_struct_array(array_ref.as_ref()); + Self::from(struct_array) } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 41b376d990..92e87f6c9e 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -57,6 +57,7 @@ impl FileOpener for VortexFileOpener { .as_any() .downcast_ref::() { + // Check if the idx is not in the original projection AND that there's a matching column in the data let projections_contains_idx = self .projection .as_ref() @@ -67,6 +68,22 @@ impl FileOpener for VortexFileOpener { { predicate_projection.insert(column.index()); } + + match self.arrow_schema.column_with_name(column.name()) { + Some(_) if !projections_contains_idx => { + predicate_projection.insert(column.index()); + } + Some(_) => {} + None => { + return Err(DataFusionError::External( + format!( + "Could not find expected column {} in schema", + column.name() + ) + .into(), + )) + } + } } Ok(TreeNodeRecursion::Continue) })?; @@ -84,7 +101,7 @@ impl FileOpener for VortexFileOpener { builder = builder.with_projection(Projection::new(projection)) } - let og_projection_len = self.projection.clone().map(|v| v.len()); + let original_projection_len = self.projection.clone().map(|v| v.len()); Ok(async move { let reader = builder.build().await?; @@ -104,7 +121,7 @@ impl FileOpener for VortexFileOpener { let rb = RecordBatch::from(array); - if let Some(len) = og_projection_len { + if let Some(len) = original_projection_len { Ok(rb.project(&(0..len).collect_vec())?) } else { Ok(rb)