Skip to content

Commit

Permalink
fix up rest
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph-isaacs committed Dec 19, 2024
1 parent bc1979e commit 26801e9
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 45 deletions.
11 changes: 3 additions & 8 deletions bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoCanonical};
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};

use crate::tokio_runtime::TOKIO_RUNTIME;

Expand Down Expand Up @@ -146,7 +146,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<Vec<Ar
let mut batches = vec![];
let mut stream = builder.build().await?;
while let Some(batch) = stream.next().await {
batches.push(batch?.into_arrow()?);
batches.push(batch?.into_inferred_arrow()?);
}
Ok(batches)
}
Expand Down Expand Up @@ -412,12 +412,7 @@ fn tpc_h_l_comment(c: &mut Criterion) {
"TPC-H l_comment chunked",
);

let comments_canonical = comments
.into_canonical()
.unwrap()
.into_struct()
.unwrap()
.into_array();
let comments_canonical = comments.into_struct().unwrap().into_array();
let dtype = comments_canonical.dtype().clone();
let comments_canonical_chunked =
ChunkedArray::try_new(vec![comments_canonical], dtype).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ mod test {
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::{ArrayData, IntoCanonical};
use vortex::ArrayData;

use crate::taxi_data::taxi_data_parquet;
use crate::{compress_taxi_data, setup_logger};
Expand All @@ -366,7 +366,7 @@ mod test {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);
let vortex_as_arrow = vortex_array.into_arrow().unwrap();
let vortex_as_arrow = vortex_array.into_inferred_arrow().unwrap();
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
}
}
Expand All @@ -387,7 +387,7 @@ mod test {
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);

let compressed = compressor.compress(&vortex_array).unwrap();
let compressed_as_arrow = compressed.into_arrow().unwrap();
let compressed_as_arrow = compressed.into_inferred_arrow().unwrap();
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
}
}
Expand Down
6 changes: 3 additions & 3 deletions pyvortex/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyInt, PyList};
use vortex::array::ChunkedArray;
use vortex::compute::{compare, fill_forward, scalar_at, slice, take, FilterMask, Operator};
use vortex::{ArrayDType, ArrayData, IntoCanonical};
use vortex::{ArrayDType, ArrayData};

use crate::dtype::PyDType;
use crate::python_repr::PythonRepr;
Expand Down Expand Up @@ -120,7 +120,7 @@ impl PyArray {
if let Ok(chunked_array) = ChunkedArray::try_from(vortex.clone()) {
let chunks: Vec<ArrayRef> = chunked_array
.chunks()
.map(|chunk| -> PyResult<ArrayRef> { Ok(chunk.into_arrow()?) })
.map(|chunk| -> PyResult<ArrayRef> { Ok(chunk.into_inferred_arrow()?) })
.collect::<PyResult<Vec<ArrayRef>>>()?;
if chunks.is_empty() {
return Err(PyValueError::new_err("No chunks in array"));
Expand All @@ -140,7 +140,7 @@ impl PyArray {
} else {
Ok(vortex
.clone()
.into_arrow()?
.into_inferred_arrow()?
.into_data()
.to_pyarrow(py)?
.into_bound(py))
Expand Down
6 changes: 1 addition & 5 deletions vortex-array/src/array/varbin/arrow.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use arrow_array::{ArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray};
use arrow_schema::DataType;
use vortex_dtype::{DType, PType};
use vortex_error::{vortex_bail, VortexResult};

Expand All @@ -13,10 +12,7 @@ use crate::variants::PrimitiveArrayTrait;
use crate::{ArrayDType, IntoArrayVariant, ToArrayData};

/// Convert the array to Arrow variable length binary array type.
pub(crate) fn varbin_to_arrow(
varbin_array: &VarBinArray,
_data_type: &DataType,
) -> VortexResult<ArrayRef> {
pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult<ArrayRef> {
let offsets = varbin_array
.offsets()
.into_primitive()
Expand Down
19 changes: 14 additions & 5 deletions vortex-array/src/array/varbin/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use arrow_array::ArrayRef;
use arrow_schema::DataType;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_error::{vortex_bail, VortexResult};

use crate::array::varbin::arrow::varbin_to_arrow;
use crate::array::varbin::VarBinArray;
Expand All @@ -12,9 +12,7 @@ use crate::{ArrayDType, ArrayData, Canonical, IntoCanonical};
impl IntoCanonical for VarBinArray {
fn into_canonical(self) -> VortexResult<Canonical> {
let nullable = self.dtype().is_nullable();
// TODO: can we avoid this conversion since it might convert to a utf8_view and then to
// utf8.
let array_ref = varbin_to_arrow(&self, &DataType::Utf8View)?;
let array_ref = varbin_to_arrow(&self)?;
let array = match self.dtype() {
DType::Utf8(_) => arrow_cast::cast(array_ref.as_ref(), &DataType::Utf8View)?,
DType::Binary(_) => arrow_cast::cast(array_ref.as_ref(), &DataType::BinaryView)?,
Expand All @@ -28,7 +26,18 @@ impl IntoCanonical for VarBinArray {
fn into_arrow(self, data_type: &DataType) -> VortexResult<ArrayRef> {
// Specialized implementation of `into_arrow` for VarBin since it has a direct
// Arrow representation.
varbin_to_arrow(&self, data_type)
let array_ref = varbin_to_arrow(&self)?;

// Note, arrow::cast clones the array
Ok(match data_type {
DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => {
array_ref
}
DataType::Utf8View | DataType::BinaryView => {
arrow_cast::cast(array_ref.as_ref(), data_type)?
}
_ => vortex_bail!("Unsupported data type: {:?}", data_type),
})
}
}

Expand Down
9 changes: 9 additions & 0 deletions vortex-array/src/arrow/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use std::sync::Arc;

use arrow_array::ArrayRef;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef};
use itertools::Itertools;
use vortex_datetime_dtype::arrow::{make_arrow_temporal_dtype, make_temporal_ext_dtype};
Expand All @@ -20,6 +21,7 @@ use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::arrow::{FromArrowType, TryFromArrowType};
use crate::{ArrayDType, ArrayData, IntoCanonical};

impl TryFromArrowType<&DataType> for PType {
fn try_from_arrow(value: &DataType) -> VortexResult<Self> {
Expand Down Expand Up @@ -184,6 +186,13 @@ pub fn infer_data_type(dtype: &DType) -> VortexResult<DataType> {
})
}

impl ArrayData {
pub fn into_inferred_arrow(self) -> VortexResult<ArrayRef> {
let data_type = infer_data_type(self.dtype())?;
self.into_arrow(&data_type)
}
}

#[cfg(test)]
mod test {
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
Expand Down
39 changes: 26 additions & 13 deletions vortex-array/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ impl Canonical {
/// zero copies, while more complex variants such as Struct may require allocations if its child
/// arrays require decompression.
pub fn into_arrow(self, data_type: &DataType) -> VortexResult<ArrayRef> {
Ok(match self {
Canonical::Null(a) => null_to_arrow(a, data_type)?,
Canonical::Bool(a) => bool_to_arrow(a, data_type)?,
Canonical::Primitive(a) => primitive_to_arrow(a, data_type)?,
Canonical::Struct(a) => struct_to_arrow(a, data_type)?,
Canonical::List(a) => list_to_arrow(a, data_type)?,
Canonical::VarBinView(a) => varbinview_as_arrow(&a),
match self {
Canonical::Null(a) => null_to_arrow(a, data_type),
Canonical::Bool(a) => bool_to_arrow(a, data_type),
Canonical::Primitive(a) => primitive_to_arrow(a, data_type),
Canonical::Struct(a) => struct_to_arrow(a, data_type),
Canonical::List(a) => list_to_arrow(a, data_type),
Canonical::VarBinView(a) => varbinview_to_arrow(&a, data_type),
Canonical::Extension(a) => {
if is_temporal_ext_type(a.id()) {
temporal_to_arrow(TemporalArray::try_from(a.into_array())?)?
temporal_to_arrow(TemporalArray::try_from(a.into_array())?)
} else {
// Convert storage array directly into arrow, losing type information
// that will let us round-trip.
// TODO(aduffy): https://github.com/spiraldb/vortex/issues/1167
a.storage().into_arrow(data_type)?
a.storage().into_arrow(data_type)
}
}
})
}
}
}

Expand Down Expand Up @@ -252,6 +252,21 @@ fn struct_to_arrow(struct_array: StructArray, data_type: &DataType) -> VortexRes
}
}

pub(crate) fn varbinview_to_arrow(
var_bin_view: &VarBinViewArray,
data_type: &DataType,
) -> VortexResult<ArrayRef> {
let arrow_arr = varbinview_as_arrow(var_bin_view);
// Note, arrow cast clones the array
Ok(match data_type {
DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => {
arrow_cast::cast(arrow_arr.as_ref(), data_type)?
}
DataType::Utf8View | DataType::BinaryView => arrow_arr,
_ => vortex_bail!("Unsupported data type: {:?}", data_type),
})
}

// TODO(joe): unify with varbin
fn list_to_arrow(list: ListArray, data_type: &DataType) -> VortexResult<ArrayRef> {
let offsets = list
Expand Down Expand Up @@ -387,7 +402,6 @@ pub trait IntoCanonical {
where
Self: Sized,
{
// TODO: FIXME
self.into_canonical()?.into_arrow(data_type)
}
}
Expand Down Expand Up @@ -545,11 +559,10 @@ mod test {
use arrow_array::cast::AsArray;
use arrow_array::types::{Int32Type, Int64Type, UInt64Type};
use arrow_array::{
ArrayRef, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray, StringArray,
ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray, StringArray,
StringViewArray, StructArray as ArrowStructArray,
};
use arrow_buffer::{NullBufferBuilder, OffsetBuffer};
use arrow_cast::cast;
use arrow_schema::{DataType, Field};

use crate::array::{PrimitiveArray, SparseArray, StructArray};
Expand Down
6 changes: 3 additions & 3 deletions vortex-datafusion/src/memory/plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use pin_project::pin_project;
use vortex_array::array::ChunkedArray;
use vortex_array::arrow::FromArrowArray;
use vortex_array::compute::take;
use vortex_array::{ArrayData, IntoArrayVariant, IntoCanonical};
use vortex_array::{ArrayData, IntoArrayVariant};
use vortex_dtype::field::Field;
use vortex_error::{vortex_err, vortex_panic, VortexError};
use vortex_expr::ExprRef;
Expand Down Expand Up @@ -160,7 +160,7 @@ impl Stream for RowIndicesStream {
.conjunction_expr
.evaluate(vortex_struct.as_ref())
.map_err(|e| DataFusionError::External(e.into()))?
.into_arrow()?;
.into_inferred_arrow()?;

// Convert the `selection` BooleanArray into a UInt64Array of indices.
let selection_indices = selection
Expand Down Expand Up @@ -348,7 +348,7 @@ where
// We should find a way to avoid decoding the filter columns and only decode the other
// columns, then stitch the StructArray back together from those.
let projected_for_output = chunk.project(this.output_projection)?;
let decoded = take(projected_for_output, &row_indices)?.into_arrow()?;
let decoded = take(projected_for_output, &row_indices)?.into_inferred_arrow()?;

// Send back a single record batch of the decoded data.
let output_batch = RecordBatch::from(decoded.as_struct());
Expand Down
9 changes: 4 additions & 5 deletions vortex-datafusion/src/persistent/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@ use datafusion_expr::Accumulator;
use vortex_array::array::StructArray;
use vortex_array::stats::Stat;
use vortex_array::variants::StructArrayTrait as _;
use vortex_array::IntoCanonical;
use vortex_error::VortexResult;

pub fn array_to_col_statistics(array: &StructArray) -> VortexResult<ColumnStatistics> {
let mut stats = ColumnStatistics::new_unknown();

if let Some(null_count_array) = array.field_by_name(Stat::NullCount.name()) {
let array = null_count_array.into_arrow()?;
let array = null_count_array.into_inferred_arrow()?;
let array = array.as_primitive::<UInt64Type>();

let null_count = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();
stats.null_count = Precision::Exact(null_count as usize);
}

if let Some(max_value_array) = array.field_by_name(Stat::Max.name()) {
let array = max_value_array.into_arrow()?;
let array = max_value_array.into_inferred_arrow()?;
let mut acc = MaxAccumulator::try_new(array.data_type())?;
acc.update_batch(&[array])?;

Expand All @@ -31,7 +30,7 @@ pub fn array_to_col_statistics(array: &StructArray) -> VortexResult<ColumnStatis
}

if let Some(min_value_array) = array.field_by_name(Stat::Min.name()) {
let array = min_value_array.into_arrow()?;
let array = min_value_array.into_inferred_arrow()?;
let mut acc = MinAccumulator::try_new(array.data_type())?;
acc.update_batch(&[array])?;

Expand All @@ -46,7 +45,7 @@ pub fn uncompressed_col_size(array: &StructArray) -> VortexResult<Option<u64>> {
match array.field_by_name(Stat::UncompressedSizeInBytes.name()) {
None => Ok(None),
Some(array) => {
let array = array.into_arrow()?;
let array = array.into_inferred_arrow()?;
let array = array.as_primitive::<UInt64Type>();

let uncompressed_size = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();
Expand Down

0 comments on commit 26801e9

Please sign in to comment.