From 26801e994d19e44690bc77d91e6b6dc597914f8e Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 19 Dec 2024 14:59:13 +0000 Subject: [PATCH] fix up rest --- bench-vortex/benches/compress.rs | 11 ++---- bench-vortex/src/lib.rs | 6 +-- pyvortex/src/array.rs | 6 +-- vortex-array/src/array/varbin/arrow.rs | 6 +-- vortex-array/src/array/varbin/canonical.rs | 19 ++++++--- vortex-array/src/arrow/dtype.rs | 9 +++++ vortex-array/src/canonical.rs | 39 ++++++++++++------- vortex-datafusion/src/memory/plans.rs | 6 +-- .../src/persistent/statistics.rs | 9 ++--- 9 files changed, 66 insertions(+), 45 deletions(-) diff --git a/bench-vortex/benches/compress.rs b/bench-vortex/benches/compress.rs index 91bd8c0f2d..bb764e133b 100644 --- a/bench-vortex/benches/compress.rs +++ b/bench-vortex/benches/compress.rs @@ -34,7 +34,7 @@ use vortex::error::VortexResult; use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder}; use vortex::sampling_compressor::compressors::fsst::FSSTCompressor; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; -use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoCanonical}; +use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; use crate::tokio_runtime::TOKIO_RUNTIME; @@ -146,7 +146,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult = chunked_array .chunks() - .map(|chunk| -> PyResult { Ok(chunk.into_arrow()?) }) + .map(|chunk| -> PyResult { Ok(chunk.into_inferred_arrow()?) }) .collect::>>()?; if chunks.is_empty() { return Err(PyValueError::new_err("No chunks in array")); @@ -140,7 +140,7 @@ impl PyArray { } else { Ok(vortex .clone() - .into_arrow()? + .into_inferred_arrow()? .into_data() .to_pyarrow(py)? .into_bound(py)) diff --git a/vortex-array/src/array/varbin/arrow.rs b/vortex-array/src/array/varbin/arrow.rs index a3579cb2c9..270444bb1a 100644 --- a/vortex-array/src/array/varbin/arrow.rs +++ b/vortex-array/src/array/varbin/arrow.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray}; -use arrow_schema::DataType; use vortex_dtype::{DType, PType}; use vortex_error::{vortex_bail, VortexResult}; @@ -13,10 +12,7 @@ use crate::variants::PrimitiveArrayTrait; use crate::{ArrayDType, IntoArrayVariant, ToArrayData}; /// Convert the array to Arrow variable length binary array type. -pub(crate) fn varbin_to_arrow( - varbin_array: &VarBinArray, - _data_type: &DataType, -) -> VortexResult { +pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult { let offsets = varbin_array .offsets() .into_primitive() diff --git a/vortex-array/src/array/varbin/canonical.rs b/vortex-array/src/array/varbin/canonical.rs index 8d1e94d833..a9fb9f1e81 100644 --- a/vortex-array/src/array/varbin/canonical.rs +++ b/vortex-array/src/array/varbin/canonical.rs @@ -1,7 +1,7 @@ use arrow_array::ArrayRef; use arrow_schema::DataType; use vortex_dtype::DType; -use vortex_error::VortexResult; +use vortex_error::{vortex_bail, VortexResult}; use crate::array::varbin::arrow::varbin_to_arrow; use crate::array::varbin::VarBinArray; @@ -12,9 +12,7 @@ use crate::{ArrayDType, ArrayData, Canonical, IntoCanonical}; impl IntoCanonical for VarBinArray { fn into_canonical(self) -> VortexResult { let nullable = self.dtype().is_nullable(); - // TODO: can we avoid this conversion since it might convert to a utf8_view and then to - // utf8. - let array_ref = varbin_to_arrow(&self, &DataType::Utf8View)?; + let array_ref = varbin_to_arrow(&self)?; let array = match self.dtype() { DType::Utf8(_) => arrow_cast::cast(array_ref.as_ref(), &DataType::Utf8View)?, DType::Binary(_) => arrow_cast::cast(array_ref.as_ref(), &DataType::BinaryView)?, @@ -28,7 +26,18 @@ impl IntoCanonical for VarBinArray { fn into_arrow(self, data_type: &DataType) -> VortexResult { // Specialized implementation of `into_arrow` for VarBin since it has a direct // Arrow representation. - varbin_to_arrow(&self, data_type) + let array_ref = varbin_to_arrow(&self)?; + + // Note, arrow::cast clones the array + Ok(match data_type { + DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => { + array_ref + } + DataType::Utf8View | DataType::BinaryView => { + arrow_cast::cast(array_ref.as_ref(), data_type)? + } + _ => vortex_bail!("Unsupported data type: {:?}", data_type), + }) } } diff --git a/vortex-array/src/arrow/dtype.rs b/vortex-array/src/arrow/dtype.rs index d5582c4890..e3a5bce1be 100644 --- a/vortex-array/src/arrow/dtype.rs +++ b/vortex-array/src/arrow/dtype.rs @@ -12,6 +12,7 @@ use std::sync::Arc; +use arrow_array::ArrayRef; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef}; use itertools::Itertools; use vortex_datetime_dtype::arrow::{make_arrow_temporal_dtype, make_temporal_ext_dtype}; @@ -20,6 +21,7 @@ use vortex_dtype::{DType, Nullability, PType, StructDType}; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::arrow::{FromArrowType, TryFromArrowType}; +use crate::{ArrayDType, ArrayData, IntoCanonical}; impl TryFromArrowType<&DataType> for PType { fn try_from_arrow(value: &DataType) -> VortexResult { @@ -184,6 +186,13 @@ pub fn infer_data_type(dtype: &DType) -> VortexResult { }) } +impl ArrayData { + pub fn into_inferred_arrow(self) -> VortexResult { + let data_type = infer_data_type(self.dtype())?; + self.into_arrow(&data_type) + } +} + #[cfg(test)] mod test { use arrow_schema::{DataType, Field, FieldRef, Fields, Schema}; diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 10317c30db..ed3f9ae77d 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -72,24 +72,24 @@ impl Canonical { /// zero copies, while more complex variants such as Struct may require allocations if its child /// arrays require decompression. pub fn into_arrow(self, data_type: &DataType) -> VortexResult { - Ok(match self { - Canonical::Null(a) => null_to_arrow(a, data_type)?, - Canonical::Bool(a) => bool_to_arrow(a, data_type)?, - Canonical::Primitive(a) => primitive_to_arrow(a, data_type)?, - Canonical::Struct(a) => struct_to_arrow(a, data_type)?, - Canonical::List(a) => list_to_arrow(a, data_type)?, - Canonical::VarBinView(a) => varbinview_as_arrow(&a), + match self { + Canonical::Null(a) => null_to_arrow(a, data_type), + Canonical::Bool(a) => bool_to_arrow(a, data_type), + Canonical::Primitive(a) => primitive_to_arrow(a, data_type), + Canonical::Struct(a) => struct_to_arrow(a, data_type), + Canonical::List(a) => list_to_arrow(a, data_type), + Canonical::VarBinView(a) => varbinview_to_arrow(&a, data_type), Canonical::Extension(a) => { if is_temporal_ext_type(a.id()) { - temporal_to_arrow(TemporalArray::try_from(a.into_array())?)? + temporal_to_arrow(TemporalArray::try_from(a.into_array())?) } else { // Convert storage array directly into arrow, losing type information // that will let us round-trip. // TODO(aduffy): https://github.com/spiraldb/vortex/issues/1167 - a.storage().into_arrow(data_type)? + a.storage().into_arrow(data_type) } } - }) + } } } @@ -252,6 +252,21 @@ fn struct_to_arrow(struct_array: StructArray, data_type: &DataType) -> VortexRes } } +pub(crate) fn varbinview_to_arrow( + var_bin_view: &VarBinViewArray, + data_type: &DataType, +) -> VortexResult { + let arrow_arr = varbinview_as_arrow(var_bin_view); + // Note, arrow cast clones the array + Ok(match data_type { + DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => { + arrow_cast::cast(arrow_arr.as_ref(), data_type)? + } + DataType::Utf8View | DataType::BinaryView => arrow_arr, + _ => vortex_bail!("Unsupported data type: {:?}", data_type), + }) +} + // TODO(joe): unify with varbin fn list_to_arrow(list: ListArray, data_type: &DataType) -> VortexResult { let offsets = list @@ -387,7 +402,6 @@ pub trait IntoCanonical { where Self: Sized, { - // TODO: FIXME self.into_canonical()?.into_arrow(data_type) } } @@ -545,11 +559,10 @@ mod test { use arrow_array::cast::AsArray; use arrow_array::types::{Int32Type, Int64Type, UInt64Type}; use arrow_array::{ - ArrayRef, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray, StringArray, + ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray, StringArray, StringViewArray, StructArray as ArrowStructArray, }; use arrow_buffer::{NullBufferBuilder, OffsetBuffer}; - use arrow_cast::cast; use arrow_schema::{DataType, Field}; use crate::array::{PrimitiveArray, SparseArray, StructArray}; diff --git a/vortex-datafusion/src/memory/plans.rs b/vortex-datafusion/src/memory/plans.rs index a416fc6ff1..b86f97a77a 100644 --- a/vortex-datafusion/src/memory/plans.rs +++ b/vortex-datafusion/src/memory/plans.rs @@ -21,7 +21,7 @@ use pin_project::pin_project; use vortex_array::array::ChunkedArray; use vortex_array::arrow::FromArrowArray; use vortex_array::compute::take; -use vortex_array::{ArrayData, IntoArrayVariant, IntoCanonical}; +use vortex_array::{ArrayData, IntoArrayVariant}; use vortex_dtype::field::Field; use vortex_error::{vortex_err, vortex_panic, VortexError}; use vortex_expr::ExprRef; @@ -160,7 +160,7 @@ impl Stream for RowIndicesStream { .conjunction_expr .evaluate(vortex_struct.as_ref()) .map_err(|e| DataFusionError::External(e.into()))? - .into_arrow()?; + .into_inferred_arrow()?; // Convert the `selection` BooleanArray into a UInt64Array of indices. let selection_indices = selection @@ -348,7 +348,7 @@ where // We should find a way to avoid decoding the filter columns and only decode the other // columns, then stitch the StructArray back together from those. let projected_for_output = chunk.project(this.output_projection)?; - let decoded = take(projected_for_output, &row_indices)?.into_arrow()?; + let decoded = take(projected_for_output, &row_indices)?.into_inferred_arrow()?; // Send back a single record batch of the decoded data. let output_batch = RecordBatch::from(decoded.as_struct()); diff --git a/vortex-datafusion/src/persistent/statistics.rs b/vortex-datafusion/src/persistent/statistics.rs index 3113a755be..10ebf1d864 100644 --- a/vortex-datafusion/src/persistent/statistics.rs +++ b/vortex-datafusion/src/persistent/statistics.rs @@ -7,14 +7,13 @@ use datafusion_expr::Accumulator; use vortex_array::array::StructArray; use vortex_array::stats::Stat; use vortex_array::variants::StructArrayTrait as _; -use vortex_array::IntoCanonical; use vortex_error::VortexResult; pub fn array_to_col_statistics(array: &StructArray) -> VortexResult { let mut stats = ColumnStatistics::new_unknown(); if let Some(null_count_array) = array.field_by_name(Stat::NullCount.name()) { - let array = null_count_array.into_arrow()?; + let array = null_count_array.into_inferred_arrow()?; let array = array.as_primitive::(); let null_count = array.iter().map(|v| v.unwrap_or_default()).sum::(); @@ -22,7 +21,7 @@ pub fn array_to_col_statistics(array: &StructArray) -> VortexResult VortexResult VortexResult> { match array.field_by_name(Stat::UncompressedSizeInBytes.name()) { None => Ok(None), Some(array) => { - let array = array.into_arrow()?; + let array = array.into_inferred_arrow()?; let array = array.as_primitive::(); let uncompressed_size = array.iter().map(|v| v.unwrap_or_default()).sum::();