Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Into arrow with hint #1730

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoCanonical};
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};

use crate::tokio_runtime::TOKIO_RUNTIME;

Expand Down Expand Up @@ -146,7 +146,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<Vec<Ar
let mut batches = vec![];
let mut stream = builder.build().await?;
while let Some(batch) = stream.next().await {
batches.push(batch?.into_arrow()?);
batches.push(batch?.into_inferred_arrow()?);
}
Ok(batches)
}
Expand Down Expand Up @@ -412,12 +412,7 @@ fn tpc_h_l_comment(c: &mut Criterion) {
"TPC-H l_comment chunked",
);

let comments_canonical = comments
.into_canonical()
.unwrap()
.into_struct()
.unwrap()
.into_array();
let comments_canonical = comments.into_struct().unwrap().into_array();
let dtype = comments_canonical.dtype().clone();
let comments_canonical_chunked =
ChunkedArray::try_new(vec![comments_canonical], dtype).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ mod test {
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::{ArrayData, IntoCanonical};
use vortex::ArrayData;

use crate::taxi_data::taxi_data_parquet;
use crate::{compress_taxi_data, setup_logger};
Expand All @@ -366,7 +366,7 @@ mod test {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);
let vortex_as_arrow = vortex_array.into_arrow().unwrap();
let vortex_as_arrow = vortex_array.into_inferred_arrow().unwrap();
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
}
}
Expand All @@ -387,7 +387,7 @@ mod test {
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);

let compressed = compressor.compress(&vortex_array).unwrap();
let compressed_as_arrow = compressed.into_arrow().unwrap();
let compressed_as_arrow = compressed.into_inferred_arrow().unwrap();
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
}
}
Expand Down
6 changes: 3 additions & 3 deletions pyvortex/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyInt, PyList};
use vortex::array::ChunkedArray;
use vortex::compute::{compare, fill_forward, scalar_at, slice, take, FilterMask, Operator};
use vortex::{ArrayDType, ArrayData, IntoCanonical};
use vortex::{ArrayDType, ArrayData};

use crate::dtype::PyDType;
use crate::python_repr::PythonRepr;
Expand Down Expand Up @@ -120,7 +120,7 @@ impl PyArray {
if let Ok(chunked_array) = ChunkedArray::try_from(vortex.clone()) {
let chunks: Vec<ArrayRef> = chunked_array
.chunks()
.map(|chunk| -> PyResult<ArrayRef> { Ok(chunk.into_arrow()?) })
.map(|chunk| -> PyResult<ArrayRef> { Ok(chunk.into_inferred_arrow()?) })
.collect::<PyResult<Vec<ArrayRef>>>()?;
if chunks.is_empty() {
return Err(PyValueError::new_err("No chunks in array"));
Expand All @@ -140,7 +140,7 @@ impl PyArray {
} else {
Ok(vortex
.clone()
.into_arrow()?
.into_inferred_arrow()?
.into_data()
.to_pyarrow(py)?
.into_bound(py))
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/array/constant/compute/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ mod test {
#[case(BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array(),
ConstantArray::new(true, 4).into_array())]
fn test_and(#[case] lhs: ArrayData, #[case] rhs: ArrayData) {
let rr = and(&lhs, &rhs);
println!("{:?}", rr);
let r = and(&lhs, &rhs).unwrap().into_bool().unwrap().into_array();

let v0 = scalar_at(&r, 0).unwrap().as_bool().value();
Expand Down
17 changes: 14 additions & 3 deletions vortex-array/src/array/varbin/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use arrow_array::ArrayRef;
use arrow_schema::DataType;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_error::{vortex_bail, VortexResult};

use crate::array::varbin::arrow::varbin_to_arrow;
use crate::array::varbin::VarBinArray;
Expand All @@ -23,10 +23,21 @@ impl IntoCanonical for VarBinArray {
VarBinViewArray::try_from(ArrayData::from_arrow(array, nullable)).map(Canonical::VarBinView)
}

fn into_arrow(self) -> VortexResult<ArrayRef> {
fn into_arrow(self, data_type: &DataType) -> VortexResult<ArrayRef> {
// Specialized implementation of `into_arrow` for VarBin since it has a direct
// Arrow representation.
varbin_to_arrow(&self)
let array_ref = varbin_to_arrow(&self)?;

// Note, arrow::cast clones the array
Ok(match data_type {
DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8 => {
array_ref
}
DataType::Utf8View | DataType::BinaryView => {
arrow_cast::cast(array_ref.as_ref(), data_type)?
}
_ => vortex_bail!("Unsupported data type: {:?}", data_type),
})
}
}

Expand Down
8 changes: 5 additions & 3 deletions vortex-array/src/arrow/datum.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use arrow_array::{Array, ArrayRef, Datum as ArrowDatum};
use vortex_error::VortexError;

use crate::arrow::infer_data_type;
use crate::compute::slice;
use crate::stats::{ArrayStatistics, Stat};
use crate::{ArrayData, IntoCanonical};
use crate::{ArrayDType, ArrayData, IntoCanonical};

/// A wrapper around a generic Arrow array that can be used as a Datum in Arrow compute.
pub struct Datum {
Expand All @@ -15,18 +16,19 @@ impl TryFrom<ArrayData> for Datum {
type Error = VortexError;

fn try_from(array: ArrayData) -> Result<Self, Self::Error> {
let data_type = infer_data_type(array.dtype())?;
if array
.statistics()
.get_as::<bool>(Stat::IsConstant)
.unwrap_or_default()
{
Ok(Self {
array: slice(array, 0, 1)?.into_arrow()?,
array: slice(array, 0, 1)?.into_arrow(&data_type)?,
is_scalar: true,
})
} else {
Ok(Self {
array: array.into_arrow()?,
array: array.into_arrow(&data_type)?,
is_scalar: false,
})
}
Expand Down
9 changes: 9 additions & 0 deletions vortex-array/src/arrow/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

use std::sync::Arc;

use arrow_array::ArrayRef;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef};
use itertools::Itertools;
use vortex_datetime_dtype::arrow::{make_arrow_temporal_dtype, make_temporal_ext_dtype};
Expand All @@ -20,6 +21,7 @@ use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::arrow::{FromArrowType, TryFromArrowType};
use crate::{ArrayDType, ArrayData, IntoCanonical};

impl TryFromArrowType<&DataType> for PType {
fn try_from_arrow(value: &DataType) -> VortexResult<Self> {
Expand Down Expand Up @@ -184,6 +186,13 @@ pub fn infer_data_type(dtype: &DType) -> VortexResult<DataType> {
})
}

impl ArrayData {
pub fn into_inferred_arrow(self) -> VortexResult<ArrayRef> {
let data_type = infer_data_type(self.dtype())?;
self.into_arrow(&data_type)
}
}

#[cfg(test)]
mod test {
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
Expand Down
23 changes: 17 additions & 6 deletions vortex-array/src/arrow/record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use arrow_array::cast::as_struct_array;
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use itertools::Itertools;
use vortex_error::{vortex_err, VortexError, VortexResult};

use crate::array::StructArray;
use crate::arrow::FromArrowArray;
use crate::validity::Validity;
use crate::{ArrayData, IntoArrayData, IntoArrayVariant, IntoCanonical};
use crate::{ArrayData, IntoArrayData, IntoArrayVariant};

impl TryFrom<RecordBatch> for ArrayData {
type Error = VortexError;
Expand Down Expand Up @@ -45,12 +45,23 @@ impl TryFrom<ArrayData> for RecordBatch {
}
}

#[allow(dead_code)]
impl StructArray {
fn into_record_batch(self) -> RecordBatch {
todo!()
// RecordBatch::from(self)
}

fn into_record_batch_with_schema(self, _schema: &Schema) -> RecordBatch {
todo!()
}
}

// TODO: remove this
impl TryFrom<StructArray> for RecordBatch {
type Error = VortexError;

fn try_from(value: StructArray) -> VortexResult<Self> {
let array_ref = value.into_canonical()?.into_arrow()?;
let struct_array = as_struct_array(array_ref.as_ref());
Ok(Self::from(struct_array))
fn try_from(_value: StructArray) -> VortexResult<Self> {
todo!()
}
}
Loading
Loading