Skip to content

Commit

Permalink
Narrow indices types during compression (#1558)
Browse files Browse the repository at this point in the history
Fixes #1557
  • Loading branch information
gatesn authored Dec 4, 2024
1 parent b5127b8 commit ba5e7b4
Show file tree
Hide file tree
Showing 19 changed files with 240 additions and 78 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the

>>> cvtx = vortex.compress(vtx)
>>> cvtx.nbytes
17780
16835
>>> cvtx.nbytes / vtx.nbytes
0.126...
0.119...

Vortex uses nearly ten times fewer bytes than Arrow. Fewer bytes means more of your data fits in
cache and RAM.
Expand Down
70 changes: 54 additions & 16 deletions encodings/datetime-parts/src/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
mod filter;
mod take;

use itertools::Itertools as _;
use vortex_array::array::{PrimitiveArray, TemporalArray};
use vortex_array::compute::{
scalar_at, slice, ComputeVTable, FilterFn, ScalarAtFn, SliceFn, TakeFn,
scalar_at, slice, try_cast, ComputeVTable, FilterFn, ScalarAtFn, SliceFn, TakeFn,
};
use vortex_array::validity::ArrayValidity;
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_datetime_dtype::{TemporalMetadata, TimeUnit};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::Scalar;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, PType};
use vortex_error::{vortex_bail, VortexExpect, VortexResult};
use vortex_scalar::{PrimitiveScalar, Scalar};

use crate::{DateTimePartsArray, DateTimePartsEncoding};

Expand Down Expand Up @@ -106,17 +106,55 @@ pub fn decode_to_temporal(array: &DateTimePartsArray) -> VortexResult<TemporalAr
TimeUnit::D => vortex_bail!(InvalidArgument: "cannot decode into TimeUnit::D"),
};

let days_buf = array.days().into_primitive()?;
let seconds_buf = array.seconds().into_primitive()?;
let subsecond_buf = array.subsecond().into_primitive()?;

let values = days_buf
.maybe_null_slice::<i64>()
.iter()
.zip_eq(seconds_buf.maybe_null_slice::<i64>().iter())
.zip_eq(subsecond_buf.maybe_null_slice::<i64>().iter())
.map(|((d, s), ss)| d * 86_400 * divisor + s * divisor + ss)
.collect::<Vec<_>>();
let days_buf = try_cast(
array.days(),
&DType::Primitive(PType::I64, array.dtype().nullability()),
)?
.into_primitive()?;
let mut values: Vec<i64> = days_buf
.into_maybe_null_slice::<i64>()
.into_iter()
.map(|d| d * 86_400 * divisor)
.collect();

if let Some(seconds) = array.seconds().as_constant() {
let seconds =
PrimitiveScalar::try_from(&seconds.cast(&DType::Primitive(PType::I64, NonNullable))?)?
.typed_value::<i64>()
.vortex_expect("non-nullable");
for v in values.iter_mut() {
*v += seconds * divisor;
}
} else {
let seconds_buf = try_cast(array.seconds(), &DType::Primitive(PType::U32, NonNullable))?
.into_primitive()?;
for (v, second) in values.iter_mut().zip(seconds_buf.maybe_null_slice::<u32>()) {
*v += (*second as i64) * divisor;
}
}

if let Some(subseconds) = array.subsecond().as_constant() {
let subseconds = PrimitiveScalar::try_from(
&subseconds.cast(&DType::Primitive(PType::I64, NonNullable))?,
)?
.typed_value::<i64>()
.vortex_expect("non-nullable");
for v in values.iter_mut() {
*v += subseconds;
}
} else {
let subsecond_buf = try_cast(
array.subsecond(),
&DType::Primitive(PType::I64, NonNullable),
)?
.into_primitive()?;
for (v, subsecond) in values
.iter_mut()
.zip(subsecond_buf.maybe_null_slice::<i64>())
{
*v += *subsecond;
}
}

Ok(TemporalArray::new_timestamp(
PrimitiveArray::from_vec(values, array.validity()).into_array(),
Expand Down
32 changes: 18 additions & 14 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use arrow_array::builder::make_view;
use arrow_buffer::Buffer;
use vortex_array::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{
ArrayDType, ArrayData, Canonical, IntoArrayData, IntoArrayVariant, IntoCanonical,
};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;

use crate::FSSTArray;
Expand Down Expand Up @@ -33,24 +35,26 @@ impl IntoCanonical for FSSTArray {
.uncompressed_lengths()
.into_canonical()?
.into_primitive()?;
let uncompressed_lens_slice = uncompressed_lens_array.maybe_null_slice::<i32>();

// Directly create the binary views.
let views: Vec<u128> = uncompressed_lens_slice
.iter()
.scan(0, |offset, len| {
let str_start = *offset;
let str_end = *offset + len;
let views: Vec<u128> = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |$P| {
uncompressed_lens_array.maybe_null_slice::<$P>()
.iter()
.map(|&len| len as usize)
.scan(0, |offset, len| {
let str_start = *offset;
let str_end = *offset + len;

*offset += len;
*offset += len;

Some(make_view(
&uncompressed_bytes[(str_start as usize)..(str_end as usize)],
0u32,
str_start as u32,
))
})
.collect();
Some(make_view(
&uncompressed_bytes[str_start..str_end],
0u32,
str_start as u32,
))
})
.collect()
});

let views_array: ArrayData = Buffer::from(views).into();
let uncompressed_bytes_array = PrimitiveArray::from(uncompressed_bytes).into_array();
Expand Down
18 changes: 7 additions & 11 deletions vortex-array/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::fmt::{Debug, Display};

use ::serde::{Deserialize, Serialize};
use vortex_dtype::{match_each_integer_ptype, DType};
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{match_each_integer_ptype, DType, PType};
use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult};
use vortex_scalar::{Scalar, ScalarValue};

Expand All @@ -27,8 +28,8 @@ pub struct SparseMetadata {
// Offset value for patch indices as a result of slicing
indices_offset: usize,
indices_len: usize,
indices_ptype: PType,
fill_value: ScalarValue,
u64_indices: bool,
}

impl Display for SparseMetadata {
Expand All @@ -54,9 +55,6 @@ impl SparseArray {
indices_offset: usize,
fill_value: Scalar,
) -> VortexResult<Self> {
if !matches!(indices.dtype(), &DType::IDX | &DType::IDX_32) {
vortex_bail!("Cannot use {} as indices", indices.dtype());
}
if fill_value.dtype() != values.dtype() {
vortex_bail!(
"fill value, {:?}, should be instance of values dtype, {}",
Expand All @@ -80,14 +78,16 @@ impl SparseArray {
}
}

let indices_ptype = PType::try_from(indices.dtype())?;

Self::try_from_parts(
values.dtype().clone(),
len,
SparseMetadata {
indices_offset,
indices_len: indices.len(),
indices_ptype,
fill_value: fill_value.into_value(),
u64_indices: matches!(indices.dtype(), &DType::IDX),
},
[indices, values].into(),
StatsSet::default(),
Expand All @@ -111,11 +111,7 @@ impl SparseArray {
self.as_ref()
.child(
0,
if self.metadata().u64_indices {
&DType::IDX
} else {
&DType::IDX_32
},
&DType::Primitive(self.metadata().indices_ptype, NonNullable),
self.metadata().indices_len,
)
.vortex_expect("Missing indices array in SparseArray")
Expand Down
30 changes: 23 additions & 7 deletions vortex-array/src/compute/cast.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError, VortexResult};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};

use crate::encoding::Encoding;
use crate::{ArrayDType, ArrayData};
use crate::{ArrayDType, ArrayData, IntoArrayData, IntoCanonical};

pub trait CastFn<Array> {
fn cast(&self, array: &Array, dtype: &DType) -> VortexResult<ArrayData>;
Expand Down Expand Up @@ -34,9 +34,25 @@ pub fn try_cast(array: impl AsRef<ArrayData>, dtype: &DType) -> VortexResult<Arr
}

// TODO(ngates): check for null_count if dtype is non-nullable
array
.encoding()
.cast_fn()
.map(|f| f.cast(array, dtype))
.unwrap_or_else(|| Err(vortex_err!(NotImplemented: "cast", array.encoding().id())))
if let Some(f) = array.encoding().cast_fn() {
return f.cast(array, dtype);
}

// Otherwise, we fall back to the canonical implementations.
log::debug!(
"Falling back to canonical cast for encoding {} and dtype {} to {}",
array.encoding().id(),
array.dtype(),
dtype
);
let canonicalized = array.clone().into_canonical()?.into_array();
if let Some(f) = canonicalized.encoding().cast_fn() {
return f.cast(&canonicalized, dtype);
}

vortex_bail!(
"No compute kernel to cast array from {} to {}",
array.dtype(),
dtype
)
}
6 changes: 0 additions & 6 deletions vortex-dtype/src/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ impl DType {
/// The default DType for bytes
pub const BYTES: Self = Primitive(PType::U8, Nullability::NonNullable);

/// The default DType for indices
pub const IDX: Self = Primitive(PType::U64, Nullability::NonNullable);

/// The DType for small indices (primarily created from bitmaps)
pub const IDX_32: Self = Primitive(PType::U32, Nullability::NonNullable);

/// Get the nullability of the DType
pub fn nullability(&self) -> Nullability {
self.is_nullable().into()
Expand Down
1 change: 1 addition & 0 deletions vortex-sampling-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ arbitrary = { workspace = true, optional = true }
fsst-rs = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
num-traits = { workspace = true }
rand = { workspace = true }
vortex-alp = { workspace = true }
vortex-array = { workspace = true }
Expand Down
22 changes: 13 additions & 9 deletions vortex-sampling-compressor/src/compressors/date_time_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use vortex_datetime_parts::{
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::downscale::downscale_integer_array;
use crate::{constants, SamplingCompressor};

#[derive(Debug)]
Expand Down Expand Up @@ -48,15 +49,18 @@ impl EncodingCompressor for DateTimePartsCompressor {
subseconds,
} = split_temporal(TemporalArray::try_from(array.clone())?)?;

let days = ctx
.named("days")
.compress(&days, like.as_ref().and_then(|l| l.child(0)))?;
let seconds = ctx
.named("seconds")
.compress(&seconds, like.as_ref().and_then(|l| l.child(1)))?;
let subsecond = ctx
.named("subsecond")
.compress(&subseconds, like.as_ref().and_then(|l| l.child(2)))?;
let days = ctx.named("days").compress(
&downscale_integer_array(days)?,
like.as_ref().and_then(|l| l.child(0)),
)?;
let seconds = ctx.named("seconds").compress(
&downscale_integer_array(seconds)?,
like.as_ref().and_then(|l| l.child(1)),
)?;
let subsecond = ctx.named("subsecond").compress(
&downscale_integer_array(subseconds)?,
like.as_ref().and_then(|l| l.child(2)),
)?;
Ok(CompressedArray::compressed(
DateTimePartsArray::try_new(
array.dtype().clone(),
Expand Down
8 changes: 5 additions & 3 deletions vortex-sampling-compressor/src/compressors/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vortex_dict::{
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::downscale::downscale_integer_array;
use crate::{constants, SamplingCompressor};

#[derive(Debug)]
Expand Down Expand Up @@ -70,9 +71,10 @@ impl EncodingCompressor for DictCompressor {
};

let (codes, values) = (
ctx.auxiliary("codes")
.excluding(self)
.compress(&codes, like.as_ref().and_then(|l| l.child(0)))?,
ctx.auxiliary("codes").excluding(self).compress(
&downscale_integer_array(codes)?,
like.as_ref().and_then(|l| l.child(0)),
)?,
ctx.named("values")
.excluding(self)
.compress(&values, like.as_ref().and_then(|l| l.child(1)))?,
Expand Down
3 changes: 2 additions & 1 deletion vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::delta::DeltaCompressor;
use super::r#for::FoRCompressor;
use super::varbin::VarBinCompressor;
use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
use crate::downscale::downscale_integer_array;
use crate::{constants, SamplingCompressor};

#[derive(Debug)]
Expand Down Expand Up @@ -109,7 +110,7 @@ impl EncodingCompressor for FSSTCompressor {
.auxiliary("uncompressed_lengths")
.excluding(self)
.compress(
&fsst_array.uncompressed_lengths(),
&downscale_integer_array(fsst_array.uncompressed_lengths())?,
like.as_ref().and_then(|l| l.child(3)),
)?;

Expand Down
3 changes: 2 additions & 1 deletion vortex-sampling-compressor/src/compressors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vortex_array::{ArrayData, IntoArrayData};
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::downscale::downscale_integer_array;
use crate::{constants, SamplingCompressor};

#[derive(Debug)]
Expand Down Expand Up @@ -36,7 +37,7 @@ impl EncodingCompressor for ListCompressor {
like.as_ref().and_then(|l| l.child(0)),
)?;
let compressed_offsets = ctx.auxiliary("offsets").compress(
&list_array.offsets(),
&downscale_integer_array(list_array.offsets())?,
like.as_ref().and_then(|l| l.child(1)),
)?;
Ok(CompressedArray::compressed(
Expand Down
Loading

0 comments on commit ba5e7b4

Please sign in to comment.