Skip to content

Commit

Permalink
Add maybe_from function to help downcast ArrayData into a specific …
Browse files Browse the repository at this point in the history
…encoded array without potentially capturing a backtrace (#1560)

closes #1377
  • Loading branch information
AdamGS authored Dec 4, 2024
1 parent ba5e7b4 commit 021cbe8
Show file tree
Hide file tree
Showing 14 changed files with 25 additions and 24 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn parquet_decompress_read(buf: bytes::Bytes) -> usize {
}

fn parquet_compressed_written_size(array: &ArrayData, compression: Compression) -> usize {
let chunked = ChunkedArray::try_from(array.clone()).unwrap();
let chunked = ChunkedArray::maybe_from(array.clone()).unwrap();
let (batches, schema) = chunked_to_vec_record_batch(chunked);
parquet_compress_write(batches, schema, compression, &mut Vec::new())
}
Expand Down
2 changes: 1 addition & 1 deletion encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl ALPArray {
}

pub fn encode(array: ArrayData) -> VortexResult<ArrayData> {
if let Ok(parray) = PrimitiveArray::try_from(array) {
if let Some(parray) = PrimitiveArray::maybe_from(array) {
Ok(alp_encode(&parray)?.into_array())
} else {
vortex_bail!("ALP can only encode primitive arrays");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl<'a, T: BitPacking + NativePType> BitPackedSearch<'a, T> {
let min_patch_offset = array
.patches()
.and_then(|p| {
SparseArray::try_from(p)
SparseArray::maybe_from(p)
.vortex_expect("Only sparse patches are supported")
.min_index()
})
Expand Down
2 changes: 1 addition & 1 deletion encodings/fastlanes/src/bitpacking/compute/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl SliceFn<BitPackedArray> for BitPackedEncoding {
.filter(|a| {
// If the sliced patch_indices is empty, we should not propagate the patches.
// There may be other logic that depends on Some(patches) indicating non-empty.
!SparseArray::try_from(a.clone())
!SparseArray::maybe_from(a.clone())
.vortex_expect("BitPackedArray must encode patches as SparseArray")
.indices()
.is_empty()
Expand Down
4 changes: 1 addition & 3 deletions vortex-array/src/array/extension/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use vortex_error::VortexResult;

use crate::array::{ConstantArray, ExtensionArray, ExtensionEncoding};
use crate::compute::{compare, CompareFn, Operator};
use crate::encoding::EncodingVTable;
use crate::{ArrayData, ArrayLen};

impl CompareFn<ExtensionArray> for ExtensionEncoding {
Expand All @@ -24,8 +23,7 @@ impl CompareFn<ExtensionArray> for ExtensionEncoding {
}

// If the RHS is an extension array matching ours, we can extract the storage.
if rhs.is_encoding(ExtensionEncoding.id()) {
let rhs_ext = ExtensionArray::try_from(rhs.clone())?;
if let Some(rhs_ext) = ExtensionArray::maybe_from(rhs.clone()) {
return compare(lhs.storage(), rhs_ext.storage(), operator).map(Some);
}

Expand Down
8 changes: 8 additions & 0 deletions vortex-array/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ macro_rules! impl_encoding {
stats
)?)
}

/// Optionally downcast an [`ArrayData`](crate::ArrayData) instance to a specific encoding.
///
/// Preferred in cases where a backtrace isn't needed, like when trying multiple encoding to go
/// down different code paths.
pub fn maybe_from(data: $crate::ArrayData) -> Option<Self> {
(data.encoding().id() == <[<$Name Encoding>] as $crate::encoding::Encoding>::ID).then_some(Self(data))
}
}

impl TryFrom<$crate::ArrayData> for [<$Name Array>] {
Expand Down
8 changes: 3 additions & 5 deletions vortex-ipc/src/stream_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::fmt::{Display, Formatter};
use std::ops::Range;

use futures_util::{Stream, TryStreamExt};
use vortex_array::array::{ChunkedArray, ChunkedEncoding};
use vortex_array::encoding::EncodingVTable;
use vortex_array::array::ChunkedArray;
use vortex_array::stream::ArrayStream;
use vortex_array::ArrayData;
use vortex_buffer::Buffer;
Expand Down Expand Up @@ -83,9 +82,8 @@ impl<W: VortexWrite> StreamArrayWriter<W> {
}

pub async fn write_array(self, array: ArrayData) -> VortexResult<Self> {
if array.is_encoding(ChunkedEncoding.id()) {
self.write_array_stream(ChunkedArray::try_from(array)?.array_stream())
.await
if let Some(chunked_array) = ChunkedArray::maybe_from(array.clone()) {
self.write_array_stream(chunked_array.array_stream()).await
} else {
self.write_array_stream(array.into_array_stream()).await
}
Expand Down
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl EncodingCompressor for ALPCompressor {

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array.clone()).ok()?;
let parray = PrimitiveArray::maybe_from(array.clone())?;

// Only supports f32 and f64
if !matches!(parray.ptype(), PType::F32 | PType::F64) {
Expand Down
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/alp_rd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl EncodingCompressor for ALPRDCompressor {

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array.clone()).ok()?;
let parray = PrimitiveArray::maybe_from(array.clone())?;

// Only supports f32 and f64
if !matches!(parray.ptype(), PType::F32 | PType::F64) {
Expand Down
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/bitpacked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl EncodingCompressor for BitPackedCompressor {

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array.clone()).ok()?;
let parray = PrimitiveArray::maybe_from(array.clone())?;

// Only supports unsigned ints
if !parray.ptype().is_unsigned_int() {
Expand Down
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl EncodingCompressor for DeltaCompressor {

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array.clone()).ok()?;
let parray = PrimitiveArray::maybe_from(array.clone())?;

// Only supports ints
if !parray.ptype().is_unsigned_int() {
Expand Down
9 changes: 3 additions & 6 deletions vortex-sampling-compressor/src/compressors/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,13 @@ impl EncodingCompressor for DictCompressor {
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let (codes, values) = if array.is_encoding(PrimitiveEncoding::ID) {
let p = PrimitiveArray::try_from(array.clone())?;
let (codes, values) = if let Some(p) = PrimitiveArray::maybe_from(array.clone()) {
let (codes, values) = dict_encode_primitive(&p);
(codes.into_array(), values.into_array())
} else if array.is_encoding(VarBinEncoding::ID) {
let vb = VarBinArray::try_from(array.clone())?;
} else if let Some(vb) = VarBinArray::maybe_from(array.clone()) {
let (codes, values) = dict_encode_varbin(&vb);
(codes.into_array(), values.into_array())
} else if array.is_encoding(VarBinViewEncoding::ID) {
let vb = VarBinViewArray::try_from(array.clone())?;
} else if let Some(vb) = VarBinViewArray::maybe_from(array.clone()) {
let (codes, values) = dict_encode_varbinview(&vb);
(codes.into_array(), values.into_array())
} else {
Expand Down
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl EncodingCompressor for FoRCompressor {

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array.clone()).ok()?;
let parray = PrimitiveArray::maybe_from(array.clone())?;

// Only supports integers
if !parray.ptype().is_int() {
Expand Down
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/compressors/zigzag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl EncodingCompressor for ZigZagCompressor {

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array.clone()).ok()?;
let parray = PrimitiveArray::maybe_from(array.clone())?;

// Only supports signed integers
if !parray.ptype().is_signed_int() {
Expand Down

0 comments on commit 021cbe8

Please sign in to comment.