Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Aug 15, 2024
1 parent 3b4703b commit 9ac4ec8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
1 change: 0 additions & 1 deletion bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ async fn bench_main(queries: Option<Vec<usize>>, iterations: usize, warmup: bool
for (ctx, format) in ctxs.iter().zip(formats.iter()) {
if warmup {
for i in 0..3 {
// warmup
let row_count: usize = rt.block_on(async {
ctx.sql(&query)
.await
Expand Down
19 changes: 11 additions & 8 deletions vortex-array/src/arrow/recordbatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use itertools::Itertools;
use crate::array::StructArray;
use crate::arrow::FromArrowArray;
use crate::validity::Validity;
use crate::{Array, IntoArray, IntoCanonical};
use crate::{Array, IntoArrayVariant, IntoCanonical};

impl From<RecordBatch> for Array {
fn from(value: RecordBatch) -> Self {
Expand Down Expand Up @@ -33,17 +33,20 @@ impl From<RecordBatch> for Array {

impl From<Array> for RecordBatch {
fn from(value: Array) -> Self {
let array_ref = value
.into_canonical()
.expect("struct arrays must canonicalize")
.into_arrow();
let struct_array = as_struct_array(array_ref.as_ref());
RecordBatch::from(struct_array)
let struct_arr = value
.into_struct()
.expect("RecordBatch can only be constructed from a Vortex StructArray");
Self::from(struct_arr)
}
}

impl From<StructArray> for RecordBatch {
fn from(value: StructArray) -> Self {
RecordBatch::from(value.into_array())
let array_ref = value
.into_canonical()
.expect("Struct arrays must canonicalize")
.into_arrow();
let struct_array = as_struct_array(array_ref.as_ref());
Self::from(struct_array)
}
}
21 changes: 19 additions & 2 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl FileOpener for VortexFileOpener {
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
{
// Check if the idx is not in the original projection AND that there's a matching column in the data
let projections_contains_idx = self
.projection
.as_ref()
Expand All @@ -67,6 +68,22 @@ impl FileOpener for VortexFileOpener {
{
predicate_projection.insert(column.index());
}

match self.arrow_schema.column_with_name(column.name()) {
Some(_) if !projections_contains_idx => {
predicate_projection.insert(column.index());
}
Some(_) => {}
None => {
return Err(DataFusionError::External(
format!(
"Could not find expected column {} in schema",
column.name()
)
.into(),
))
}
}
}
Ok(TreeNodeRecursion::Continue)
})?;
Expand All @@ -84,7 +101,7 @@ impl FileOpener for VortexFileOpener {
builder = builder.with_projection(Projection::new(projection))
}

let og_projection_len = self.projection.clone().map(|v| v.len());
let original_projection_len = self.projection.clone().map(|v| v.len());

Ok(async move {
let reader = builder.build().await?;
Expand All @@ -104,7 +121,7 @@ impl FileOpener for VortexFileOpener {

let rb = RecordBatch::from(array);

if let Some(len) = og_projection_len {
if let Some(len) = original_projection_len {
Ok(rb.project(&(0..len).collect_vec())?)
} else {
Ok(rb)
Expand Down

0 comments on commit 9ac4ec8

Please sign in to comment.