diff --git a/pyvortex/src/array.rs b/pyvortex/src/array.rs index 3ec4166952..2043c874b1 100644 --- a/pyvortex/src/array.rs +++ b/pyvortex/src/array.rs @@ -1,10 +1,14 @@ +use arrow::array::{Array as ArrowArray, ArrayRef}; +use arrow::pyarrow::ToPyArrow; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; +use pyo3::types::{IntoPyDict, PyList}; +use vortex::array::ChunkedArray; use vortex::compute::take; -use vortex::{Array, ArrayDType}; +use vortex::{Array, ArrayDType, IntoCanonical}; use crate::dtype::PyDType; use crate::error::PyVortexError; -use crate::vortex_arrow; #[pyclass(name = "Array", module = "vortex", sequence, subclass)] pub struct PyArray { @@ -24,7 +28,51 @@ impl PyArray { #[pymethods] impl PyArray { fn to_arrow(self_: PyRef<'_, Self>) -> PyResult> { - vortex_arrow::export_array(self_.py(), &self_.inner) + // NOTE(ngates): for struct arrays, we could also return a RecordBatchStreamReader. + // NOTE(robert): Return RecordBatchStreamReader always? + let py = self_.py(); + let vortex = &self_.inner; + + let chunks: Vec = if let Ok(chunked_array) = ChunkedArray::try_from(vortex) { + chunked_array + .chunks() + .map(|chunk| -> PyResult { + Ok(chunk + .into_canonical() + .map_err(PyVortexError::map_err)? + .into_arrow()) + }) + .collect::>>()? + } else { + vec![vortex + .clone() + .into_canonical() + .map_err(PyVortexError::map_err)? + .into_arrow()] + }; + if chunks.is_empty() { + return Err(PyValueError::new_err("No chunks in array")); + } + + // Export the schema once + let data_type = chunks[0].data_type().clone(); + let pa_data_type = data_type.to_pyarrow(py)?; + + // Iterate each chunk, export it to Arrow FFI, then import as a pyarrow array + let chunks: PyResult> = chunks + .iter() + .map(|arrow_array| arrow_array.into_data().to_pyarrow(py)) + .collect(); + + // Import pyarrow and its Array class + let mod_pyarrow = PyModule::import_bound(py, "pyarrow")?; + + // Combine into a chunked array + mod_pyarrow.call_method( + "chunked_array", + (PyList::new_bound(py, chunks?),), + Some(&[("type", pa_data_type)].into_py_dict_bound(py)), + ) } fn __len__(&self) -> usize { diff --git a/pyvortex/src/encode.rs b/pyvortex/src/encode.rs index 2cc6adccff..838ca044aa 100644 --- a/pyvortex/src/encode.rs +++ b/pyvortex/src/encode.rs @@ -12,7 +12,6 @@ use vortex_dtype::DType; use crate::array::PyArray; use crate::error::PyVortexError; -use crate::vortex_arrow::map_arrow_err; /// The main entry point for creating enc arrays from other Python objects. /// @@ -54,7 +53,10 @@ pub fn encode<'py>(obj: &Bound<'py, PyAny>) -> PyResult> { let dtype = DType::from_arrow(array_stream.schema()); let chunks = array_stream .into_iter() - .map(|b| b.map(Array::from).map_err(map_arrow_err)) + .map(|b| { + b.map(Array::from) + .map_err(|e| PyValueError::new_err(e.to_string())) + }) .collect::>>()?; Bound::new( obj.py(), diff --git a/pyvortex/src/lib.rs b/pyvortex/src/lib.rs index 8da6147c1d..da2fbd79bd 100644 --- a/pyvortex/src/lib.rs +++ b/pyvortex/src/lib.rs @@ -9,7 +9,6 @@ mod array; mod dtype; mod encode; mod error; -mod vortex_arrow; /// A Python module implemented in Rust. #[pymodule] diff --git a/pyvortex/src/vortex_arrow.rs b/pyvortex/src/vortex_arrow.rs deleted file mode 100644 index d74f049631..0000000000 --- a/pyvortex/src/vortex_arrow.rs +++ /dev/null @@ -1,48 +0,0 @@ -use arrow::array::{Array as ArrowArray, ArrayRef}; -use arrow::error::ArrowError; -use arrow::pyarrow::ToPyArrow; -use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; -use pyo3::types::{IntoPyDict, PyList}; -use vortex::array::ChunkedArray; -use vortex::{Array, IntoCanonical}; - -pub fn map_arrow_err(error: ArrowError) -> PyErr { - PyValueError::new_err(error.to_string()) -} - -pub fn export_array<'py>(py: Python<'py>, array: &Array) -> PyResult> { - // NOTE(ngates): for struct arrays, we could also return a RecordBatchStreamReader. - // NOTE(robert): Return RecordBatchStreamReader always? - let chunks: Vec = if let Ok(chunked_array) = ChunkedArray::try_from(array) { - chunked_array - .chunks() - .map(|chunk| chunk.into_canonical().unwrap().into_arrow()) - .collect() - } else { - vec![array.clone().into_canonical().unwrap().into_arrow()] - }; - if chunks.is_empty() { - return Err(PyValueError::new_err("No chunks in array")); - } - - // Export the schema once - let data_type = chunks[0].data_type().clone(); - let pa_data_type = data_type.to_pyarrow(py)?; - - // Iterate each chunk, export it to Arrow FFI, then import as a pyarrow array - let chunks: PyResult> = chunks - .iter() - .map(|arrow_array| arrow_array.into_data().to_pyarrow(py)) - .collect(); - - // Import pyarrow and its Array class - let mod_pyarrow = PyModule::import_bound(py, "pyarrow")?; - - // Combine into a chunked array - mod_pyarrow.call_method( - "chunked_array", - (PyList::new_bound(py, chunks?),), - Some(&[("type", pa_data_type)].into_py_dict_bound(py)), - ) -}