Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jul 22, 2024
1 parent 87221d3 commit 6c5e1f2
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 6 deletions.
2 changes: 1 addition & 1 deletion encodings/datetime-parts/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use vortex_error::{vortex_bail, VortexResult};

use crate::compute::decode_to_temporal;

impl_encoding!("vortex.datetimeparts", 22u16, DateTimeParts);
impl_encoding!("vortex.datetimeparts", 23u16, DateTimeParts);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DateTimePartsMetadata {
Expand Down
9 changes: 7 additions & 2 deletions encodings/datetime-parts/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ use vortex::array::datetime::{TemporalArray, TimeUnit};
use vortex::array::primitive::PrimitiveArray;
use vortex::compute::unary::try_cast;
use vortex::{Array, IntoArray, IntoArrayVariant};
use vortex_dtype::PType;
use vortex_dtype::{DType, Nullability, PType};
use vortex_error::{vortex_bail, VortexResult};

/// Compress a `TemporalArray` into day, second, and subsecond components.
///
/// Splitting the components by granularity creates more small values, which enables better
/// cascading compression.
pub fn compress_temporal(array: TemporalArray) -> VortexResult<(Array, Array, Array)> {
let timestamps = try_cast(&array.temporal_values(), PType::I64.into())?.into_primitive()?;
let timestamps = try_cast(
&array.temporal_values(),
&DType::Primitive(PType::I64, Nullability::Nullable),
)?
.into_primitive()?;
let divisor = match array.temporal_metadata().time_unit() {
TimeUnit::Ns => 1_000_000_000,
TimeUnit::Us => 1_000_000,
Expand All @@ -24,6 +28,7 @@ pub fn compress_temporal(array: TemporalArray) -> VortexResult<(Array, Array, Ar
let mut seconds = Vec::with_capacity(length);
let mut subsecond = Vec::with_capacity(length);

// Store if the array timestamp is valid or not.
for &t in timestamps.maybe_null_slice::<i64>().iter() {
days.push(t / (86_400 * divisor));
seconds.push((t % (86_400 * divisor)) / divisor);
Expand Down
9 changes: 8 additions & 1 deletion vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub(crate) fn try_canonicalize_chunks(
chunks: Vec<Array>,
dtype: &DType,
) -> VortexResult<Canonical> {
assert!(!chunks.is_empty(), "chunks must be non-empty");

let mismatched = chunks
.iter()
.filter(|chunk| !chunk.dtype().eq(dtype))
Expand All @@ -47,9 +49,14 @@ pub(crate) fn try_canonicalize_chunks(
// Extension arrays wrap an internal storage array, which can hold a ChunkedArray until
// it is safe to unpack them.
DType::Extension(ext_dtype, _) => {
let storage_chunks: Vec<Array> = chunks
.iter()
.map(|chunk| ExtensionArray::try_from(chunk).unwrap().storage())
.collect();
let storage_dtype = storage_chunks.first().unwrap().dtype().clone();
let ext_array = ExtensionArray::new(
ext_dtype.clone(),
ChunkedArray::try_new(chunks, dtype.clone())?.into_array(),
ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array(),
);

Ok(Canonical::Extension(ext_array))
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/chunked/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ mod test {
// Two levels of chunking, just to be fancy.
let root = ChunkedArray::try_new(
vec![chunked],
DType::Primitive(PType::U32, Nullability::NonNullable),
DType::Primitive(PType::U32, Nullability::Nullable),
)
.unwrap()
.into_array();
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/array/extension/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use serde::{Deserialize, Serialize};

use vortex_dtype::{DType, ExtDType, ExtID};
use vortex_error::VortexResult;

use crate::{Array, ArrayDef, ArrayDType, ArrayTrait, Canonical, impl_encoding, IntoCanonical};
use crate::stats::ArrayStatisticsCompute;
use crate::validity::{ArrayValidity, LogicalValidity};
use crate::variants::{ArrayVariants, ExtensionArrayTrait};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoCanonical};

mod compute;

Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/array/primitive/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl CastFn for PrimitiveArray {
}

// FIXME(ngates): #260 - check validity and nullability
// TODO(aduffy): if casting from nullable -> non-nullable, throw if not AllValid.
match_each_native_ptype!(ptype, |$T| {
Ok(PrimitiveArray::from_vec(
cast::<$T>(self)?,
Expand Down
22 changes: 22 additions & 0 deletions vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt::{Debug, Display, Formatter};
use log::{debug, info, warn};
use vortex::array::chunked::{Chunked, ChunkedArray};
use vortex::array::constant::Constant;
use vortex::array::extension::{Extension, ExtensionArray};
use vortex::array::struct_::{Struct, StructArray};
use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy};
use vortex::compute::slice;
Expand Down Expand Up @@ -218,6 +219,27 @@ impl<'a> SamplingCompressor<'a> {
.into_array(),
))
}

// For internally-chunked extension arrays, we recursively compress every chunk.
Extension::ID
if ExtensionArray::try_from(arr)?.storage().encoding().id() == Chunked::ID =>
{
println!("compressing Extension type with chunked storage");
let ext = ExtensionArray::try_from(arr).unwrap();
let ext_dtype = ext.ext_dtype().clone();

let chunked_array = ChunkedArray::try_from(ext.storage()).unwrap();
// Compress each chunk as an ExtensionArray.
let mut compressed_chunks = Vec::with_capacity(chunked_array.nchunks());
for chunk in chunked_array.chunks() {
let ext_chunk = ExtensionArray::new(ext_dtype.clone(), chunk);
compressed_chunks
.push(self.compress_array(&ext_chunk.into_array())?.into_array());
}
Ok(CompressedArray::uncompressed(
ChunkedArray::try_new(compressed_chunks, ext.dtype().clone())?.into_array(),
))
}
_ => {
// Otherwise, we run sampled compression over pluggable encodings
let sampled = sampled_compression(arr, self)?;
Expand Down

0 comments on commit 6c5e1f2

Please sign in to comment.