Skip to content

Commit

Permalink
Patches Utility (#1601)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Dec 9, 2024
1 parent a93b3f6 commit 0fbc411
Show file tree
Hide file tree
Showing 34 changed files with 723 additions and 582 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/bench-pr.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
name: PR Benchmarks

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: ${{ github.ref != 'refs/heads/develop' }}

on:
pull_request:
types: [ labeled, synchronize ]
Expand Down
4 changes: 2 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the

>>> cvtx = vortex.compress(vtx)
>>> cvtx.nbytes
16835
16756
>>> cvtx.nbytes / vtx.nbytes
0.119...
0.118...

Vortex uses nearly ten times fewer bytes than Arrow. Fewer bytes means more of your data fits in
cache and RAM.
Expand Down
57 changes: 24 additions & 33 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::{Debug, Display};
use serde::{Deserialize, Serialize};
use vortex_array::array::PrimitiveArray;
use vortex_array::encoding::ids;
use vortex_array::patches::{Patches, PatchesMetadata};
use vortex_array::stats::StatisticsVTable;
use vortex_array::validity::{ArrayValidity, LogicalValidity, ValidityVTable};
use vortex_array::variants::{PrimitiveArrayTrait, VariantsVTable};
Expand All @@ -21,6 +22,7 @@ impl_encoding!("vortex.alp", ids::ALP, ALP);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ALPMetadata {
exponents: Exponents,
patches: Option<PatchesMetadata>,
}

impl Display for ALPMetadata {
Expand All @@ -33,7 +35,7 @@ impl ALPArray {
pub fn try_new(
encoded: ArrayData,
exponents: Exponents,
patches: Option<ArrayData>,
patches: Option<Patches>,
) -> VortexResult<Self> {
let dtype = match encoded.dtype() {
DType::Primitive(PType::I32, nullability) => DType::Primitive(PType::F32, *nullability),
Expand All @@ -42,35 +44,23 @@ impl ALPArray {
};

let length = encoded.len();
if let Some(parray) = patches.as_ref() {
if parray.len() != length {
vortex_bail!(
"Mismatched length in ALPArray between encoded({}) {} and it's patches({}) {}",
encoded.encoding().id(),
encoded.len(),
parray.encoding().id(),
parray.len()
)
}
}

let mut children = Vec::with_capacity(2);
children.push(encoded);
if let Some(patch) = patches {
if !patch.dtype().eq_ignore_nullability(&dtype) || !patch.dtype().is_nullable() {
vortex_bail!(
"ALP patches dtype, {}, must be nullable version of array dtype, {}",
patch.dtype(),
dtype,
);
}
children.push(patch);
if let Some(patches) = &patches {
children.push(patches.indices().clone());
children.push(patches.values().clone());
}

let patches = patches
.as_ref()
.map(|p| p.to_metadata(length, &dtype))
.transpose()?;

Self::try_from_parts(
dtype,
length,
ALPMetadata { exponents },
ALPMetadata { exponents, patches },
children.into(),
Default::default(),
)
Expand All @@ -95,11 +85,17 @@ impl ALPArray {
self.metadata().exponents
}

pub fn patches(&self) -> Option<ArrayData> {
(self.as_ref().nchildren() > 1).then(|| {
self.as_ref()
.child(1, &self.patches_dtype(), self.len())
.vortex_expect("Missing patches child in ALPArray")
pub fn patches(&self) -> Option<Patches> {
self.metadata().patches.as_ref().map(|p| {
Patches::new(
self.len(),
self.as_ref()
.child(1, &p.indices_dtype(), p.len())
.vortex_expect("ALPArray: patch indices"),
self.as_ref()
.child(2, self.dtype(), p.len())
.vortex_expect("ALPArray: patch values"),
)
})
}

Expand All @@ -115,11 +111,6 @@ impl ALPArray {
d => vortex_panic!(MismatchedTypes: "f32 or f64", d),
}
}

#[inline]
fn patches_dtype(&self) -> DType {
self.dtype().as_nullable()
}
}

impl ArrayTrait for ALPArray {}
Expand Down Expand Up @@ -152,7 +143,7 @@ impl VisitorVTable<ALPArray> for ALPEncoding {
fn accept(&self, array: &ALPArray, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("encoded", &array.encoded())?;
if let Some(patches) = array.patches().as_ref() {
visitor.visit_child("patches", patches)?;
visitor.visit_patches(patches)?;
}
Ok(())
}
Expand Down
36 changes: 13 additions & 23 deletions encodings/alp/src/alp/compress.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use vortex_array::array::{PrimitiveArray, SparseArray};
use vortex_array::array::PrimitiveArray;
use vortex_array::patches::Patches;
use vortex_array::validity::Validity;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_dtype::{NativePType, PType};
use vortex_error::{vortex_bail, VortexExpect as _, VortexResult};
use vortex_scalar::{Scalar, ScalarType};
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::ScalarType;

use crate::alp::{ALPArray, ALPFloat};
use crate::Exponents;
Expand All @@ -27,26 +28,27 @@ macro_rules! match_each_alp_float_ptype {
pub fn alp_encode_components<T>(
values: &PrimitiveArray,
exponents: Option<Exponents>,
) -> (Exponents, ArrayData, Option<ArrayData>)
) -> (Exponents, ArrayData, Option<Patches>)
where
T: ALPFloat + NativePType,
T::ALPInt: NativePType,
T: ScalarType,
{
let patch_validity = match values.validity() {
Validity::NonNullable => Validity::NonNullable,
_ => Validity::AllValid,
};
let (exponents, encoded, exc_pos, exc) = T::encode(values.maybe_null_slice::<T>(), exponents);
let len = encoded.len();
(
exponents,
PrimitiveArray::from_vec(encoded, values.validity()).into_array(),
(!exc.is_empty()).then(|| {
SparseArray::try_new(
PrimitiveArray::from(exc_pos).into_array(),
PrimitiveArray::from_vec(exc, Validity::AllValid).into_array(),
Patches::new(
len,
Scalar::null_typed::<T>(),
PrimitiveArray::from(exc_pos).into_array(),
PrimitiveArray::from_vec(exc, patch_validity).into_array(),
)
.vortex_expect("Failed to create SparseArray for ALP patches")
.into_array()
}),
)
}
Expand All @@ -73,24 +75,12 @@ pub fn decompress(array: ALPArray) -> VortexResult<PrimitiveArray> {
});

if let Some(patches) = array.patches() {
patch_decoded(decoded, &patches)
decoded.patch(patches)
} else {
Ok(decoded)
}
}

fn patch_decoded(array: PrimitiveArray, patches: &ArrayData) -> VortexResult<PrimitiveArray> {
let typed_patches = SparseArray::try_from(patches.clone())?;

match_each_alp_float_ptype!(array.ptype(), |$T| {
let primitive_values = typed_patches.values().into_primitive()?;
array.patch(
&typed_patches.resolved_indices(),
primitive_values.maybe_null_slice::<$T>(),
primitive_values.validity())
})
}

#[cfg(test)]
mod tests {
use core::f64;
Expand Down
22 changes: 13 additions & 9 deletions encodings/alp/src/alp/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use vortex_array::compute::{
filter, scalar_at, slice, take, ComputeVTable, FilterFn, FilterMask, ScalarAtFn, SliceFn,
TakeFn, TakeOptions,
};
use vortex_array::validity::ArrayValidity;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_error::VortexResult;
Expand Down Expand Up @@ -31,9 +30,8 @@ impl ComputeVTable for ALPEncoding {
impl ScalarAtFn<ALPArray> for ALPEncoding {
fn scalar_at(&self, array: &ALPArray, index: usize) -> VortexResult<Scalar> {
if let Some(patches) = array.patches() {
if patches.is_valid(index) {
// We need to make sure the value is actually in the patches array
return scalar_at(&patches, index);
if let Some(patch) = patches.get_patched(index)? {
return Ok(patch);
}
}

Expand Down Expand Up @@ -62,8 +60,9 @@ impl TakeFn<ALPArray> for ALPEncoding {
array.exponents(),
array
.patches()
.map(|p| take(&p, indices, options))
.transpose()?,
.map(|p| p.take(indices))
.transpose()?
.flatten(),
)?
.into_array())
}
Expand All @@ -74,7 +73,11 @@ impl SliceFn<ALPArray> for ALPEncoding {
Ok(ALPArray::try_new(
slice(array.encoded(), start, end)?,
array.exponents(),
array.patches().map(|p| slice(&p, start, end)).transpose()?,
array
.patches()
.map(|p| p.slice(start, end))
.transpose()?
.flatten(),
)?
.into_array())
}
Expand All @@ -84,8 +87,9 @@ impl FilterFn<ALPArray> for ALPEncoding {
fn filter(&self, array: &ALPArray, mask: FilterMask) -> VortexResult<ArrayData> {
let patches = array
.patches()
.map(|p| filter(&p, mask.clone()))
.transpose()?;
.map(|p| p.filter(mask.clone()))
.transpose()?
.flatten();

Ok(
ALPArray::try_new(filter(&array.encoded(), mask)?, array.exponents(), patches)?
Expand Down
2 changes: 1 addition & 1 deletion encodings/alp/src/alp_rd/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl IntoCanonical for ALPRDArray {
let left_parts_exceptions = SparseArray::try_from(left_parts_exceptions)
.vortex_expect("ALPRDArray: exceptions must be SparseArray encoded");
exc_pos = left_parts_exceptions
.resolved_indices()
.resolved_indices_usize()
.into_iter()
.map(|v| v as _)
.collect();
Expand Down
7 changes: 2 additions & 5 deletions encodings/fastlanes/benches/bitpacking_take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use rand::distributions::Uniform;
use rand::{thread_rng, Rng};
use vortex_array::array::{PrimitiveArray, SparseArray};
use vortex_array::array::PrimitiveArray;
use vortex_array::compute::{take, TakeOptions};
use vortex_fastlanes::{find_best_bit_width, BitPackedArray};

Expand Down Expand Up @@ -136,10 +136,7 @@ fn bench_patched_take(c: &mut Criterion) {
.unwrap();
assert!(packed.patches().is_some());
assert_eq!(
SparseArray::try_from(packed.patches().unwrap())
.unwrap()
.values()
.len(),
packed.patches().unwrap().num_patches(),
num_exceptions as usize
);

Expand Down
43 changes: 15 additions & 28 deletions encodings/fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use arrow_buffer::ArrowNativeType;
use fastlanes::BitPacking;
use vortex_array::array::{PrimitiveArray, SparseArray};
use vortex_array::array::PrimitiveArray;
use vortex_array::patches::Patches;
use vortex_array::stats::ArrayStatistics;
use vortex_array::validity::{ArrayValidity, Validity};
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoArrayVariant};
use vortex_array::{ArrayDType, ArrayLen, IntoArrayData};
use vortex_buffer::Buffer;
use vortex_dtype::{
match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType,
};
use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::BitPackedArray;
Expand Down Expand Up @@ -141,7 +142,11 @@ pub fn gather_patches(
parray: &PrimitiveArray,
bit_width: u8,
num_exceptions_hint: usize,
) -> Option<ArrayData> {
) -> Option<Patches> {
let patch_validity = match parray.validity() {
Validity::NonNullable => Validity::NonNullable,
_ => Validity::AllValid,
};
match_each_integer_ptype!(parray.ptype(), |$T| {
let mut indices: Vec<u64> = Vec::with_capacity(num_exceptions_hint);
let mut values: Vec<$T> = Vec::with_capacity(num_exceptions_hint);
Expand All @@ -151,17 +156,11 @@ pub fn gather_patches(
values.push(*v);
}
}

(!indices.is_empty()).then(|| {
SparseArray::try_new(
indices.into_array(),
PrimitiveArray::from_vec(values, Validity::AllValid).into_array(),
parray.len(),
Scalar::null(parray.dtype().as_nullable()),
)
.vortex_unwrap()
.into_array()
})
(!indices.is_empty()).then(|| Patches::new(
parray.len(),
indices.into_array(),
PrimitiveArray::from_vec(values, patch_validity).into_array(),
))
})
}

Expand All @@ -183,24 +182,12 @@ pub fn unpack(array: BitPackedArray) -> VortexResult<PrimitiveArray> {
}

if let Some(patches) = array.patches() {
patch_unpacked(unpacked, &patches)
unpacked.patch(patches)
} else {
Ok(unpacked)
}
}

fn patch_unpacked(array: PrimitiveArray, patches: &ArrayData) -> VortexResult<PrimitiveArray> {
let typed_patches = SparseArray::try_from(patches.clone())?;

match_each_integer_ptype!(array.ptype(), |$T| {
let primitive_values = typed_patches.values().into_primitive()?;
array.patch(
&typed_patches.resolved_indices(),
primitive_values.maybe_null_slice::<$T>(),
primitive_values.validity())
})
}

pub fn unpack_primitive<T: NativePType + BitPacking>(
packed: &[T],
bit_width: usize,
Expand Down
Loading

0 comments on commit 0fbc411

Please sign in to comment.