Skip to content

Commit

Permalink
fix: canonicalization of chunked ExtensionArray (#499)
Browse files Browse the repository at this point in the history
Since #346, we've been canonicalizing chunked ExtensionArray's
incorrectly. Rather than unwrapping each chunk to its backing storage
array we've just been building a new chunkedarray of the ExtensionArray
chunks.

Before #480 , this was actually causing DateTimePartsCompressor to fail
in can_compress, so we weren't going down the compression codepath.

## The Fix

Fix `into_canonical` so that when we encounter a
`ChunkedArray(ExtensionArray(storage))` to yield an
`ExtensionArray(ChunkedArray(storage))` where each chunk is
canonicalized first (e.g. if you have chunks of DateTimePartsArray you
will end up with chunks of ExtensionArray(i64)).

Fix the DateTimePartsCompressor to canonicalize its input, to handle
that case where it may be a ChunkedArray.
  • Loading branch information
a10y authored Jul 24, 2024
1 parent de9ca7a commit 71e446f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 10 deletions.
10 changes: 7 additions & 3 deletions encodings/datetime-parts/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use vortex_error::{vortex_bail, VortexResult};
/// Splitting the components by granularity creates more small values, which enables better
/// cascading compression.
pub fn compress_temporal(array: TemporalArray) -> VortexResult<(Array, Array, Array)> {
let timestamps = try_cast(&array.temporal_values(), PType::I64.into())?.into_primitive()?;
// After this operation, timestamps will be PrimitiveArray<i64>
let timestamps = try_cast(
&array.temporal_values().into_primitive()?.into_array(),
PType::I64.into(),
)?;
let divisor = match array.temporal_metadata().time_unit() {
TimeUnit::Ns => 1_000_000_000,
TimeUnit::Us => 1_000_000,
Expand All @@ -24,14 +28,14 @@ pub fn compress_temporal(array: TemporalArray) -> VortexResult<(Array, Array, Ar
let mut seconds = Vec::with_capacity(length);
let mut subsecond = Vec::with_capacity(length);

for &t in timestamps.maybe_null_slice::<i64>().iter() {
for &t in timestamps.as_primitive().maybe_null_slice::<i64>().iter() {
days.push(t / (86_400 * divisor));
seconds.push((t % (86_400 * divisor)) / divisor);
subsecond.push((t % (86_400 * divisor)) % divisor);
}

Ok((
PrimitiveArray::from_vec(days, timestamps.validity()).into_array(),
PrimitiveArray::from_vec(days, timestamps.as_primitive().validity()).into_array(),
PrimitiveArray::from(seconds).into_array(),
PrimitiveArray::from(subsecond).into_array(),
))
Expand Down
50 changes: 43 additions & 7 deletions vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ pub(crate) fn try_canonicalize_chunks(
chunks: Vec<Array>,
dtype: &DType,
) -> VortexResult<Canonical> {
if chunks.is_empty() {
vortex_bail!(InvalidArgument: "chunks must be non-empty")
}

let mismatched = chunks
.iter()
.filter(|chunk| !chunk.dtype().eq(dtype))
Expand All @@ -44,15 +48,47 @@ pub(crate) fn try_canonicalize_chunks(
Ok(Canonical::Struct(struct_array))
}

// Extension arrays wrap an internal storage array, which can hold a ChunkedArray until
// it is safe to unpack them.
// Extension arrays are containers that wraps an inner storage array with some metadata.
// We delegate to the canonical format of the internal storage array for every chunk, and
// push the chunking down into the inner storage array.
//
// Input:
// ------
//
// ChunkedArray
// / \
// / \
// ExtensionArray ExtensionArray
// | |
// storage storage
//
//
// Output:
// ------
//
// ExtensionArray
// |
// ChunkedArray
// / \
// storage storage
//
DType::Extension(ext_dtype, _) => {
let ext_array = ExtensionArray::new(
// Recursively apply canonicalization and packing to the storage array backing
// each chunk of the extension array.
let storage_chunks: Vec<Array> = chunks
.iter()
// Extension-typed arrays can be compressed into something that is not an
// ExtensionArray, so we should canonicalize each chunk into ExtensionArray first.
.map(|chunk| chunk.clone().into_extension().unwrap().storage())
.collect();
let storage_dtype = storage_chunks.first().unwrap().dtype().clone();
let chunked_storage =
ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array();

Ok(Canonical::Extension(ExtensionArray::new(
ext_dtype.clone(),
ChunkedArray::try_new(chunks, dtype.clone())?.into_array(),
);

Ok(Canonical::Extension(ext_array))
chunked_storage,
)))
}

// TODO(aduffy): better list support
Expand Down

0 comments on commit 71e446f

Please sign in to comment.