Skip to content

Commit

Permalink
Inline vortex_arrow::export_array as PyArray.to_arrow (#719)
Browse files Browse the repository at this point in the history
A bit of a nit, but: the indirection confused me more than helped me
navigate the code-base.

Moreover, inlining the code revealed the uses of unwrap which now return
errors to Python instead of panic'ing.
  • Loading branch information
danking authored Sep 5, 2024
1 parent 36c33bd commit 0ccdaf2
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 54 deletions.
54 changes: 51 additions & 3 deletions pyvortex/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use arrow::array::{Array as ArrowArray, ArrayRef};
use arrow::pyarrow::ToPyArrow;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyList};
use vortex::array::ChunkedArray;
use vortex::compute::take;
use vortex::{Array, ArrayDType};
use vortex::{Array, ArrayDType, IntoCanonical};

use crate::dtype::PyDType;
use crate::error::PyVortexError;
use crate::vortex_arrow;

#[pyclass(name = "Array", module = "vortex", sequence, subclass)]
pub struct PyArray {
Expand All @@ -24,7 +28,51 @@ impl PyArray {
#[pymethods]
impl PyArray {
fn to_arrow(self_: PyRef<'_, Self>) -> PyResult<Bound<PyAny>> {
vortex_arrow::export_array(self_.py(), &self_.inner)
// NOTE(ngates): for struct arrays, we could also return a RecordBatchStreamReader.
// NOTE(robert): Return RecordBatchStreamReader always?
let py = self_.py();
let vortex = &self_.inner;

let chunks: Vec<ArrayRef> = if let Ok(chunked_array) = ChunkedArray::try_from(vortex) {
chunked_array
.chunks()
.map(|chunk| -> PyResult<ArrayRef> {
Ok(chunk
.into_canonical()
.map_err(PyVortexError::map_err)?
.into_arrow())
})
.collect::<PyResult<Vec<ArrayRef>>>()?
} else {
vec![vortex
.clone()
.into_canonical()
.map_err(PyVortexError::map_err)?
.into_arrow()]
};
if chunks.is_empty() {
return Err(PyValueError::new_err("No chunks in array"));
}

// Export the schema once
let data_type = chunks[0].data_type().clone();
let pa_data_type = data_type.to_pyarrow(py)?;

// Iterate each chunk, export it to Arrow FFI, then import as a pyarrow array
let chunks: PyResult<Vec<PyObject>> = chunks
.iter()
.map(|arrow_array| arrow_array.into_data().to_pyarrow(py))
.collect();

// Import pyarrow and its Array class
let mod_pyarrow = PyModule::import_bound(py, "pyarrow")?;

// Combine into a chunked array
mod_pyarrow.call_method(
"chunked_array",
(PyList::new_bound(py, chunks?),),
Some(&[("type", pa_data_type)].into_py_dict_bound(py)),
)
}

fn __len__(&self) -> usize {
Expand Down
6 changes: 4 additions & 2 deletions pyvortex/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use vortex_dtype::DType;

use crate::array::PyArray;
use crate::error::PyVortexError;
use crate::vortex_arrow::map_arrow_err;

/// The main entry point for creating enc arrays from other Python objects.
///
Expand Down Expand Up @@ -54,7 +53,10 @@ pub fn encode<'py>(obj: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyArray>> {
let dtype = DType::from_arrow(array_stream.schema());
let chunks = array_stream
.into_iter()
.map(|b| b.map(Array::from).map_err(map_arrow_err))
.map(|b| {
b.map(Array::from)
.map_err(|e| PyValueError::new_err(e.to_string()))
})
.collect::<PyResult<Vec<_>>>()?;
Bound::new(
obj.py(),
Expand Down
1 change: 0 additions & 1 deletion pyvortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod array;
mod dtype;
mod encode;
mod error;
mod vortex_arrow;

/// A Python module implemented in Rust.
#[pymodule]
Expand Down
48 changes: 0 additions & 48 deletions pyvortex/src/vortex_arrow.rs

This file was deleted.

0 comments on commit 0ccdaf2

Please sign in to comment.