diff --git a/encodings/datetime-parts/src/array.rs b/encodings/datetime-parts/src/array.rs index 34722985cc..eb733f4ca5 100644 --- a/encodings/datetime-parts/src/array.rs +++ b/encodings/datetime-parts/src/array.rs @@ -11,7 +11,7 @@ use vortex_error::{vortex_bail, VortexResult}; use crate::compute::decode_to_temporal; -impl_encoding!("vortex.datetimeparts", 22u16, DateTimeParts); +impl_encoding!("vortex.datetimeparts", 23u16, DateTimeParts); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DateTimePartsMetadata { diff --git a/encodings/datetime-parts/src/compress.rs b/encodings/datetime-parts/src/compress.rs index 541754b0d6..bf313cf8b8 100644 --- a/encodings/datetime-parts/src/compress.rs +++ b/encodings/datetime-parts/src/compress.rs @@ -2,7 +2,7 @@ use vortex::array::datetime::{TemporalArray, TimeUnit}; use vortex::array::primitive::PrimitiveArray; use vortex::compute::unary::try_cast; use vortex::{Array, IntoArray, IntoArrayVariant}; -use vortex_dtype::PType; +use vortex_dtype::{DType, Nullability, PType}; use vortex_error::{vortex_bail, VortexResult}; /// Compress a `TemporalArray` into day, second, and subsecond components. @@ -10,7 +10,11 @@ use vortex_error::{vortex_bail, VortexResult}; /// Splitting the components by granularity creates more small values, which enables better /// cascading compression. pub fn compress_temporal(array: TemporalArray) -> VortexResult<(Array, Array, Array)> { - let timestamps = try_cast(&array.temporal_values(), PType::I64.into())?.into_primitive()?; + let timestamps = try_cast( + &array.temporal_values(), + &DType::Primitive(PType::I64, Nullability::Nullable), + )? + .into_primitive()?; let divisor = match array.temporal_metadata().time_unit() { TimeUnit::Ns => 1_000_000_000, TimeUnit::Us => 1_000_000, @@ -24,6 +28,7 @@ pub fn compress_temporal(array: TemporalArray) -> VortexResult<(Array, Array, Ar let mut seconds = Vec::with_capacity(length); let mut subsecond = Vec::with_capacity(length); + // Store if the array timestamp is valid or not. for &t in timestamps.maybe_null_slice::().iter() { days.push(t / (86_400 * divisor)); seconds.push((t % (86_400 * divisor)) / divisor); diff --git a/vortex-array/src/array/chunked/canonical.rs b/vortex-array/src/array/chunked/canonical.rs index 778a33f61e..789dc10a05 100644 --- a/vortex-array/src/array/chunked/canonical.rs +++ b/vortex-array/src/array/chunked/canonical.rs @@ -28,6 +28,8 @@ pub(crate) fn try_canonicalize_chunks( chunks: Vec, dtype: &DType, ) -> VortexResult { + assert!(!chunks.is_empty(), "chunks must be non-empty"); + let mismatched = chunks .iter() .filter(|chunk| !chunk.dtype().eq(dtype)) @@ -47,9 +49,14 @@ pub(crate) fn try_canonicalize_chunks( // Extension arrays wrap an internal storage array, which can hold a ChunkedArray until // it is safe to unpack them. DType::Extension(ext_dtype, _) => { + let storage_chunks: Vec = chunks + .iter() + .map(|chunk| ExtensionArray::try_from(chunk).unwrap().storage()) + .collect(); + let storage_dtype = storage_chunks.first().unwrap().dtype().clone(); let ext_array = ExtensionArray::new( ext_dtype.clone(), - ChunkedArray::try_new(chunks, dtype.clone())?.into_array(), + ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array(), ); Ok(Canonical::Extension(ext_array)) diff --git a/vortex-array/src/array/chunked/compute/mod.rs b/vortex-array/src/array/chunked/compute/mod.rs index c3ad05a173..90da1d0171 100644 --- a/vortex-array/src/array/chunked/compute/mod.rs +++ b/vortex-array/src/array/chunked/compute/mod.rs @@ -75,7 +75,7 @@ mod test { // Two levels of chunking, just to be fancy. let root = ChunkedArray::try_new( vec![chunked], - DType::Primitive(PType::U32, Nullability::NonNullable), + DType::Primitive(PType::U32, Nullability::Nullable), ) .unwrap() .into_array(); diff --git a/vortex-array/src/array/extension/mod.rs b/vortex-array/src/array/extension/mod.rs index 8f3f11e0db..b9d9015dbe 100644 --- a/vortex-array/src/array/extension/mod.rs +++ b/vortex-array/src/array/extension/mod.rs @@ -1,12 +1,13 @@ use serde::{Deserialize, Serialize}; + use vortex_dtype::{DType, ExtDType, ExtID}; use vortex_error::VortexResult; +use crate::{Array, ArrayDef, ArrayDType, ArrayTrait, Canonical, impl_encoding, IntoCanonical}; use crate::stats::ArrayStatisticsCompute; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::variants::{ArrayVariants, ExtensionArrayTrait}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoCanonical}; mod compute; diff --git a/vortex-array/src/array/primitive/compute/cast.rs b/vortex-array/src/array/primitive/compute/cast.rs index 6c879330dd..e375b27bed 100644 --- a/vortex-array/src/array/primitive/compute/cast.rs +++ b/vortex-array/src/array/primitive/compute/cast.rs @@ -21,6 +21,7 @@ impl CastFn for PrimitiveArray { } // FIXME(ngates): #260 - check validity and nullability + // TODO(aduffy): if casting from nullable -> non-nullable, throw if not AllValid. match_each_native_ptype!(ptype, |$T| { Ok(PrimitiveArray::from_vec( cast::<$T>(self)?, diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 7fdd66ca13..b242612f92 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -4,6 +4,7 @@ use std::fmt::{Debug, Display, Formatter}; use log::{debug, info, warn}; use vortex::array::chunked::{Chunked, ChunkedArray}; use vortex::array::constant::Constant; +use vortex::array::extension::{Extension, ExtensionArray}; use vortex::array::struct_::{Struct, StructArray}; use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy}; use vortex::compute::slice; @@ -218,6 +219,27 @@ impl<'a> SamplingCompressor<'a> { .into_array(), )) } + + // For internally-chunked extension arrays, we recursively compress every chunk. + Extension::ID + if ExtensionArray::try_from(arr)?.storage().encoding().id() == Chunked::ID => + { + println!("compressing Extension type with chunked storage"); + let ext = ExtensionArray::try_from(arr).unwrap(); + let ext_dtype = ext.ext_dtype().clone(); + + let chunked_array = ChunkedArray::try_from(ext.storage()).unwrap(); + // Compress each chunk as an ExtensionArray. + let mut compressed_chunks = Vec::with_capacity(chunked_array.nchunks()); + for chunk in chunked_array.chunks() { + let ext_chunk = ExtensionArray::new(ext_dtype.clone(), chunk); + compressed_chunks + .push(self.compress_array(&ext_chunk.into_array())?.into_array()); + } + Ok(CompressedArray::uncompressed( + ChunkedArray::try_new(compressed_chunks, ext.dtype().clone())?.into_array(), + )) + } _ => { // Otherwise, we run sampled compression over pluggable encodings let sampled = sampled_compression(arr, self)?;