From 0fbc4115e740e5e2702a98d5a07e1bf027f068e3 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 9 Dec 2024 07:27:58 -0500 Subject: [PATCH] Patches Utility (#1601) --- .github/workflows/bench-pr.yml | 4 - docs/quickstart.rst | 4 +- encodings/alp/src/alp/array.rs | 57 ++-- encodings/alp/src/alp/compress.rs | 36 +-- encodings/alp/src/alp/compute/mod.rs | 22 +- encodings/alp/src/alp_rd/array.rs | 2 +- .../fastlanes/benches/bitpacking_take.rs | 7 +- .../fastlanes/src/bitpacking/compress.rs | 43 +-- .../src/bitpacking/compute/filter.rs | 16 +- .../src/bitpacking/compute/scalar_at.rs | 50 +-- .../src/bitpacking/compute/search_sorted.rs | 19 +- .../fastlanes/src/bitpacking/compute/slice.rs | 22 +- .../fastlanes/src/bitpacking/compute/take.rs | 182 +++-------- encodings/fastlanes/src/bitpacking/mod.rs | 67 ++-- vortex-array/src/array/bool/mod.rs | 65 +--- vortex-array/src/array/bool/patch.rs | 72 +++++ vortex-array/src/array/null/compute.rs | 5 +- .../primitive/compute/subtract_scalar.rs | 8 - vortex-array/src/array/primitive/mod.rs | 61 +--- vortex-array/src/array/primitive/patch.rs | 51 ++++ vortex-array/src/array/sparse/canonical.rs | 55 ++-- vortex-array/src/array/sparse/compute/take.rs | 15 +- vortex-array/src/array/sparse/mod.rs | 13 +- vortex-array/src/compute/filter.rs | 8 +- vortex-array/src/compute/scalar_subtract.rs | 5 +- vortex-array/src/compute/slice.rs | 7 +- vortex-array/src/lib.rs | 1 + vortex-array/src/patches.rs | 286 ++++++++++++++++++ vortex-array/src/validity.rs | 71 ++--- vortex-array/src/visitor.rs | 7 + .../src/compressors/alp.rs | 18 +- .../src/compressors/bitpacked.rs | 11 +- .../src/compressors/chunked.rs | 2 +- .../src/sampling_compressor.rs | 13 +- 34 files changed, 723 insertions(+), 582 deletions(-) create mode 100644 vortex-array/src/array/bool/patch.rs create mode 100644 vortex-array/src/array/primitive/patch.rs create mode 100644 vortex-array/src/patches.rs diff --git a/.github/workflows/bench-pr.yml b/.github/workflows/bench-pr.yml index 5a215a2aa9..3f5fbbb7f3 100644 --- a/.github/workflows/bench-pr.yml +++ b/.github/workflows/bench-pr.yml @@ -1,9 +1,5 @@ name: PR Benchmarks -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: ${{ github.ref != 'refs/heads/develop' }} - on: pull_request: types: [ labeled, synchronize ] diff --git a/docs/quickstart.rst b/docs/quickstart.rst index b79fc6c5b0..2f57d4b7f3 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -46,9 +46,9 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the >>> cvtx = vortex.compress(vtx) >>> cvtx.nbytes - 16835 + 16756 >>> cvtx.nbytes / vtx.nbytes - 0.119... + 0.118... Vortex uses nearly ten times fewer bytes than Arrow. Fewer bytes means more of your data fits in cache and RAM. diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index eae2d6e3b3..4e723f87f3 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -3,6 +3,7 @@ use std::fmt::{Debug, Display}; use serde::{Deserialize, Serialize}; use vortex_array::array::PrimitiveArray; use vortex_array::encoding::ids; +use vortex_array::patches::{Patches, PatchesMetadata}; use vortex_array::stats::StatisticsVTable; use vortex_array::validity::{ArrayValidity, LogicalValidity, ValidityVTable}; use vortex_array::variants::{PrimitiveArrayTrait, VariantsVTable}; @@ -21,6 +22,7 @@ impl_encoding!("vortex.alp", ids::ALP, ALP); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ALPMetadata { exponents: Exponents, + patches: Option, } impl Display for ALPMetadata { @@ -33,7 +35,7 @@ impl ALPArray { pub fn try_new( encoded: ArrayData, exponents: Exponents, - patches: Option, + patches: Option, ) -> VortexResult { let dtype = match encoded.dtype() { DType::Primitive(PType::I32, nullability) => DType::Primitive(PType::F32, *nullability), @@ -42,35 +44,23 @@ impl ALPArray { }; let length = encoded.len(); - if let Some(parray) = patches.as_ref() { - if parray.len() != length { - vortex_bail!( - "Mismatched length in ALPArray between encoded({}) {} and it's patches({}) {}", - encoded.encoding().id(), - encoded.len(), - parray.encoding().id(), - parray.len() - ) - } - } let mut children = Vec::with_capacity(2); children.push(encoded); - if let Some(patch) = patches { - if !patch.dtype().eq_ignore_nullability(&dtype) || !patch.dtype().is_nullable() { - vortex_bail!( - "ALP patches dtype, {}, must be nullable version of array dtype, {}", - patch.dtype(), - dtype, - ); - } - children.push(patch); + if let Some(patches) = &patches { + children.push(patches.indices().clone()); + children.push(patches.values().clone()); } + let patches = patches + .as_ref() + .map(|p| p.to_metadata(length, &dtype)) + .transpose()?; + Self::try_from_parts( dtype, length, - ALPMetadata { exponents }, + ALPMetadata { exponents, patches }, children.into(), Default::default(), ) @@ -95,11 +85,17 @@ impl ALPArray { self.metadata().exponents } - pub fn patches(&self) -> Option { - (self.as_ref().nchildren() > 1).then(|| { - self.as_ref() - .child(1, &self.patches_dtype(), self.len()) - .vortex_expect("Missing patches child in ALPArray") + pub fn patches(&self) -> Option { + self.metadata().patches.as_ref().map(|p| { + Patches::new( + self.len(), + self.as_ref() + .child(1, &p.indices_dtype(), p.len()) + .vortex_expect("ALPArray: patch indices"), + self.as_ref() + .child(2, self.dtype(), p.len()) + .vortex_expect("ALPArray: patch values"), + ) }) } @@ -115,11 +111,6 @@ impl ALPArray { d => vortex_panic!(MismatchedTypes: "f32 or f64", d), } } - - #[inline] - fn patches_dtype(&self) -> DType { - self.dtype().as_nullable() - } } impl ArrayTrait for ALPArray {} @@ -152,7 +143,7 @@ impl VisitorVTable for ALPEncoding { fn accept(&self, array: &ALPArray, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_child("encoded", &array.encoded())?; if let Some(patches) = array.patches().as_ref() { - visitor.visit_child("patches", patches)?; + visitor.visit_patches(patches)?; } Ok(()) } diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index c1b3ea75fe..521d2cf01f 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -1,10 +1,11 @@ -use vortex_array::array::{PrimitiveArray, SparseArray}; +use vortex_array::array::PrimitiveArray; +use vortex_array::patches::Patches; use vortex_array::validity::Validity; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; use vortex_dtype::{NativePType, PType}; -use vortex_error::{vortex_bail, VortexExpect as _, VortexResult}; -use vortex_scalar::{Scalar, ScalarType}; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_scalar::ScalarType; use crate::alp::{ALPArray, ALPFloat}; use crate::Exponents; @@ -27,26 +28,27 @@ macro_rules! match_each_alp_float_ptype { pub fn alp_encode_components( values: &PrimitiveArray, exponents: Option, -) -> (Exponents, ArrayData, Option) +) -> (Exponents, ArrayData, Option) where T: ALPFloat + NativePType, T::ALPInt: NativePType, T: ScalarType, { + let patch_validity = match values.validity() { + Validity::NonNullable => Validity::NonNullable, + _ => Validity::AllValid, + }; let (exponents, encoded, exc_pos, exc) = T::encode(values.maybe_null_slice::(), exponents); let len = encoded.len(); ( exponents, PrimitiveArray::from_vec(encoded, values.validity()).into_array(), (!exc.is_empty()).then(|| { - SparseArray::try_new( - PrimitiveArray::from(exc_pos).into_array(), - PrimitiveArray::from_vec(exc, Validity::AllValid).into_array(), + Patches::new( len, - Scalar::null_typed::(), + PrimitiveArray::from(exc_pos).into_array(), + PrimitiveArray::from_vec(exc, patch_validity).into_array(), ) - .vortex_expect("Failed to create SparseArray for ALP patches") - .into_array() }), ) } @@ -73,24 +75,12 @@ pub fn decompress(array: ALPArray) -> VortexResult { }); if let Some(patches) = array.patches() { - patch_decoded(decoded, &patches) + decoded.patch(patches) } else { Ok(decoded) } } -fn patch_decoded(array: PrimitiveArray, patches: &ArrayData) -> VortexResult { - let typed_patches = SparseArray::try_from(patches.clone())?; - - match_each_alp_float_ptype!(array.ptype(), |$T| { - let primitive_values = typed_patches.values().into_primitive()?; - array.patch( - &typed_patches.resolved_indices(), - primitive_values.maybe_null_slice::<$T>(), - primitive_values.validity()) - }) -} - #[cfg(test)] mod tests { use core::f64; diff --git a/encodings/alp/src/alp/compute/mod.rs b/encodings/alp/src/alp/compute/mod.rs index ceb820acf5..a6c0bde235 100644 --- a/encodings/alp/src/alp/compute/mod.rs +++ b/encodings/alp/src/alp/compute/mod.rs @@ -2,7 +2,6 @@ use vortex_array::compute::{ filter, scalar_at, slice, take, ComputeVTable, FilterFn, FilterMask, ScalarAtFn, SliceFn, TakeFn, TakeOptions, }; -use vortex_array::validity::ArrayValidity; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayDType, ArrayData, IntoArrayData}; use vortex_error::VortexResult; @@ -31,9 +30,8 @@ impl ComputeVTable for ALPEncoding { impl ScalarAtFn for ALPEncoding { fn scalar_at(&self, array: &ALPArray, index: usize) -> VortexResult { if let Some(patches) = array.patches() { - if patches.is_valid(index) { - // We need to make sure the value is actually in the patches array - return scalar_at(&patches, index); + if let Some(patch) = patches.get_patched(index)? { + return Ok(patch); } } @@ -62,8 +60,9 @@ impl TakeFn for ALPEncoding { array.exponents(), array .patches() - .map(|p| take(&p, indices, options)) - .transpose()?, + .map(|p| p.take(indices)) + .transpose()? + .flatten(), )? .into_array()) } @@ -74,7 +73,11 @@ impl SliceFn for ALPEncoding { Ok(ALPArray::try_new( slice(array.encoded(), start, end)?, array.exponents(), - array.patches().map(|p| slice(&p, start, end)).transpose()?, + array + .patches() + .map(|p| p.slice(start, end)) + .transpose()? + .flatten(), )? .into_array()) } @@ -84,8 +87,9 @@ impl FilterFn for ALPEncoding { fn filter(&self, array: &ALPArray, mask: FilterMask) -> VortexResult { let patches = array .patches() - .map(|p| filter(&p, mask.clone())) - .transpose()?; + .map(|p| p.filter(mask.clone())) + .transpose()? + .flatten(); Ok( ALPArray::try_new(filter(&array.encoded(), mask)?, array.exponents(), patches)? diff --git a/encodings/alp/src/alp_rd/array.rs b/encodings/alp/src/alp_rd/array.rs index 66359b50a4..88a1dc4389 100644 --- a/encodings/alp/src/alp_rd/array.rs +++ b/encodings/alp/src/alp_rd/array.rs @@ -193,7 +193,7 @@ impl IntoCanonical for ALPRDArray { let left_parts_exceptions = SparseArray::try_from(left_parts_exceptions) .vortex_expect("ALPRDArray: exceptions must be SparseArray encoded"); exc_pos = left_parts_exceptions - .resolved_indices() + .resolved_indices_usize() .into_iter() .map(|v| v as _) .collect(); diff --git a/encodings/fastlanes/benches/bitpacking_take.rs b/encodings/fastlanes/benches/bitpacking_take.rs index 6613b91675..6eb215042d 100644 --- a/encodings/fastlanes/benches/bitpacking_take.rs +++ b/encodings/fastlanes/benches/bitpacking_take.rs @@ -5,7 +5,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use itertools::Itertools; use rand::distributions::Uniform; use rand::{thread_rng, Rng}; -use vortex_array::array::{PrimitiveArray, SparseArray}; +use vortex_array::array::PrimitiveArray; use vortex_array::compute::{take, TakeOptions}; use vortex_fastlanes::{find_best_bit_width, BitPackedArray}; @@ -136,10 +136,7 @@ fn bench_patched_take(c: &mut Criterion) { .unwrap(); assert!(packed.patches().is_some()); assert_eq!( - SparseArray::try_from(packed.patches().unwrap()) - .unwrap() - .values() - .len(), + packed.patches().unwrap().num_patches(), num_exceptions as usize ); diff --git a/encodings/fastlanes/src/bitpacking/compress.rs b/encodings/fastlanes/src/bitpacking/compress.rs index 8abd845be1..ad0a81bd26 100644 --- a/encodings/fastlanes/src/bitpacking/compress.rs +++ b/encodings/fastlanes/src/bitpacking/compress.rs @@ -1,15 +1,16 @@ use arrow_buffer::ArrowNativeType; use fastlanes::BitPacking; -use vortex_array::array::{PrimitiveArray, SparseArray}; +use vortex_array::array::PrimitiveArray; +use vortex_array::patches::Patches; use vortex_array::stats::ArrayStatistics; use vortex_array::validity::{ArrayValidity, Validity}; use vortex_array::variants::PrimitiveArrayTrait; -use vortex_array::{ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoArrayVariant}; +use vortex_array::{ArrayDType, ArrayLen, IntoArrayData}; use vortex_buffer::Buffer; use vortex_dtype::{ match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType, }; -use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_scalar::Scalar; use crate::BitPackedArray; @@ -141,7 +142,11 @@ pub fn gather_patches( parray: &PrimitiveArray, bit_width: u8, num_exceptions_hint: usize, -) -> Option { +) -> Option { + let patch_validity = match parray.validity() { + Validity::NonNullable => Validity::NonNullable, + _ => Validity::AllValid, + }; match_each_integer_ptype!(parray.ptype(), |$T| { let mut indices: Vec = Vec::with_capacity(num_exceptions_hint); let mut values: Vec<$T> = Vec::with_capacity(num_exceptions_hint); @@ -151,17 +156,11 @@ pub fn gather_patches( values.push(*v); } } - - (!indices.is_empty()).then(|| { - SparseArray::try_new( - indices.into_array(), - PrimitiveArray::from_vec(values, Validity::AllValid).into_array(), - parray.len(), - Scalar::null(parray.dtype().as_nullable()), - ) - .vortex_unwrap() - .into_array() - }) + (!indices.is_empty()).then(|| Patches::new( + parray.len(), + indices.into_array(), + PrimitiveArray::from_vec(values, patch_validity).into_array(), + )) }) } @@ -183,24 +182,12 @@ pub fn unpack(array: BitPackedArray) -> VortexResult { } if let Some(patches) = array.patches() { - patch_unpacked(unpacked, &patches) + unpacked.patch(patches) } else { Ok(unpacked) } } -fn patch_unpacked(array: PrimitiveArray, patches: &ArrayData) -> VortexResult { - let typed_patches = SparseArray::try_from(patches.clone())?; - - match_each_integer_ptype!(array.ptype(), |$T| { - let primitive_values = typed_patches.values().into_primitive()?; - array.patch( - &typed_patches.resolved_indices(), - primitive_values.maybe_null_slice::<$T>(), - primitive_values.validity()) - }) -} - pub fn unpack_primitive( packed: &[T], bit_width: usize, diff --git a/encodings/fastlanes/src/bitpacking/compute/filter.rs b/encodings/fastlanes/src/bitpacking/compute/filter.rs index a858339e41..a0944683d1 100644 --- a/encodings/fastlanes/src/bitpacking/compute/filter.rs +++ b/encodings/fastlanes/src/bitpacking/compute/filter.rs @@ -1,7 +1,7 @@ use arrow_buffer::ArrowNativeType; use fastlanes::BitPacking; use itertools::Itertools; -use vortex_array::array::{PrimitiveArray, SparseArray}; +use vortex_array::array::PrimitiveArray; use vortex_array::compute::{filter, FilterFn, FilterIter, FilterMask}; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayData, IntoArrayData, IntoArrayVariant}; @@ -28,10 +28,9 @@ fn filter_primitive( let patches = array .patches() - .map(|patches| filter(&patches, mask.clone())) + .map(|patches| patches.filter(mask.clone())) .transpose()? - .map(SparseArray::try_from) - .transpose()?; + .flatten(); // Short-circuit if the selectivity is high enough. if mask.selectivity() > 0.8 { @@ -51,16 +50,9 @@ fn filter_primitive( }; let mut values = PrimitiveArray::from_vec(values, validity); - if let Some(patches) = patches { - let patch_values = patches.values().into_primitive()?; - values = values.patch( - &patches.resolved_indices(), - patch_values.maybe_null_slice::(), - patch_values.validity(), - )?; + values = values.patch(patches)?; } - Ok(values) } diff --git a/encodings/fastlanes/src/bitpacking/compute/scalar_at.rs b/encodings/fastlanes/src/bitpacking/compute/scalar_at.rs index 93695a96c4..91d0229464 100644 --- a/encodings/fastlanes/src/bitpacking/compute/scalar_at.rs +++ b/encodings/fastlanes/src/bitpacking/compute/scalar_at.rs @@ -1,5 +1,4 @@ -use vortex_array::compute::{scalar_at, ScalarAtFn}; -use vortex_array::validity::ArrayValidity; +use vortex_array::compute::ScalarAtFn; use vortex_array::ArrayDType; use vortex_error::VortexResult; use vortex_scalar::Scalar; @@ -9,21 +8,22 @@ use crate::{unpack_single, BitPackedArray, BitPackedEncoding}; impl ScalarAtFn for BitPackedEncoding { fn scalar_at(&self, array: &BitPackedArray, index: usize) -> VortexResult { if let Some(patches) = array.patches() { - // NB: All non-null values are considered patches - if patches.is_valid(index) { - return scalar_at(&patches, index)?.cast(array.dtype()); + if let Some(patch) = patches.get_patched(index)? { + return Ok(patch); } } - unpack_single(array, index)?.cast(array.dtype()) } } #[cfg(test)] mod test { - use vortex_array::array::{PrimitiveArray, SparseArray}; + use itertools::Itertools; + use vortex_array::array::PrimitiveArray; use vortex_array::compute::scalar_at; + use vortex_array::patches::Patches; use vortex_array::validity::Validity; + use vortex_array::validity::Validity::NonNullable; use vortex_array::IntoArrayData; use vortex_buffer::Buffer; use vortex_dtype::{DType, Nullability, PType}; @@ -37,16 +37,11 @@ mod test { Buffer::from(vec![0u8; 128]), PType::U32, Validity::AllInvalid, - Some( - SparseArray::try_new( - PrimitiveArray::from(vec![1u64]).into_array(), - PrimitiveArray::from_vec(vec![999u32], Validity::AllValid).into_array(), - 8, - Scalar::null_typed::(), - ) - .unwrap() - .into_array(), - ), + Some(Patches::new( + 8, + PrimitiveArray::from_vec(vec![1u32], NonNullable).into_array(), + PrimitiveArray::from_vec(vec![999u32], Validity::AllValid).into_array(), + )), 1, 8, ) @@ -57,4 +52,25 @@ mod test { Scalar::null(DType::Primitive(PType::U32, Nullability::Nullable)) ); } + + #[test] + fn test_scalar_at() { + let values = (0u32..257).collect_vec(); + let uncompressed = PrimitiveArray::from(values.clone()).into_array(); + let packed = BitPackedArray::encode(&uncompressed, 8).unwrap(); + assert!(packed.patches().is_some()); + + let patches = packed.patches().unwrap().indices().clone(); + assert_eq!( + usize::try_from(&scalar_at(patches, 0).unwrap()).unwrap(), + 256 + ); + + values.iter().enumerate().for_each(|(i, v)| { + assert_eq!( + u32::try_from(scalar_at(packed.as_ref(), i).unwrap().as_ref()).unwrap(), + *v + ); + }); + } } diff --git a/encodings/fastlanes/src/bitpacking/compute/search_sorted.rs b/encodings/fastlanes/src/bitpacking/compute/search_sorted.rs index 6ffa46f78e..03078c25fb 100644 --- a/encodings/fastlanes/src/bitpacking/compute/search_sorted.rs +++ b/encodings/fastlanes/src/bitpacking/compute/search_sorted.rs @@ -5,10 +5,9 @@ use std::cmp::Ordering::Greater; use fastlanes::BitPacking; use itertools::Itertools; use num_traits::AsPrimitive; -use vortex_array::array::SparseArray; use vortex_array::compute::{ - search_sorted_usize, IndexOrd, Len, SearchResult, SearchSorted, SearchSortedFn, - SearchSortedSide, SearchSortedUsizeFn, + IndexOrd, Len, SearchResult, SearchSorted, SearchSortedFn, SearchSortedSide, + SearchSortedUsizeFn, }; use vortex_array::stats::ArrayStatistics; use vortex_array::validity::Validity; @@ -118,14 +117,12 @@ fn search_sorted_native( where T: NativePType + BitPacking + AsPrimitive + AsPrimitive, { - if let Some(patches_array) = array.patches() { + if let Some(patches) = array.patches() { // If patches exist they must be the last elements in the array, if the value we're looking for is greater than // max packed value just search the patches let usize_value: usize = value.as_(); if usize_value > array.max_packed_value() { - // FIXME(ngates): this is broken. Patches _aren't_ sorted because they're sparse and - // interspersed with nulls... - search_sorted_usize(&patches_array, usize_value, side) + patches.search_sorted(usize_value, side) } else { Ok(BitPackedSearch::<'_, T>::new(array).search_sorted(&value, side)) } @@ -150,11 +147,9 @@ impl<'a, T: BitPacking + NativePType> BitPackedSearch<'a, T> { pub fn new(array: &'a BitPackedArray) -> Self { let min_patch_offset = array .patches() - .and_then(|p| { - SparseArray::maybe_from(p) - .vortex_expect("Only sparse patches are supported") - .min_index() - }) + .map(|p| p.min_index()) + .transpose() + .vortex_expect("Failed to get min patch index") .unwrap_or_else(|| array.len()); let first_null_idx = match array.validity() { Validity::NonNullable | Validity::AllValid => array.len(), diff --git a/encodings/fastlanes/src/bitpacking/compute/slice.rs b/encodings/fastlanes/src/bitpacking/compute/slice.rs index 8fef04b8d4..d7b3728530 100644 --- a/encodings/fastlanes/src/bitpacking/compute/slice.rs +++ b/encodings/fastlanes/src/bitpacking/compute/slice.rs @@ -1,10 +1,9 @@ use std::cmp::max; -use vortex_array::array::SparseArray; -use vortex_array::compute::{slice, SliceFn}; +use vortex_array::compute::SliceFn; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayData, IntoArrayData}; -use vortex_error::{VortexExpect, VortexResult}; +use vortex_error::VortexResult; use crate::{BitPackedArray, BitPackedEncoding}; @@ -25,16 +24,9 @@ impl SliceFn for BitPackedEncoding { array.validity().slice(start, stop)?, array .patches() - .map(|p| slice(&p, start, stop)) + .map(|p| p.slice(start, stop)) .transpose()? - .filter(|a| { - // If the sliced patch_indices is empty, we should not propagate the patches. - // There may be other logic that depends on Some(patches) indicating non-empty. - !SparseArray::maybe_from(a.clone()) - .vortex_expect("BitPackedArray must encode patches as SparseArray") - .indices() - .is_empty() - }), + .flatten(), array.bit_width(), stop - start, offset as u16, @@ -46,7 +38,7 @@ impl SliceFn for BitPackedEncoding { #[cfg(test)] mod test { use itertools::Itertools; - use vortex_array::array::{PrimitiveArray, SparseArray}; + use vortex_array::array::PrimitiveArray; use vortex_array::compute::{scalar_at, slice, take, TakeOptions}; use vortex_array::{ArrayLen, IntoArrayData}; @@ -168,9 +160,7 @@ mod test { assert!(array.patches().is_some()); - let patch_indices = SparseArray::try_from(array.patches().unwrap()) - .unwrap() - .indices(); + let patch_indices = array.patches().unwrap().indices().clone(); assert_eq!(patch_indices.len(), 1); // Slicing drops the empty patches array. diff --git a/encodings/fastlanes/src/bitpacking/compute/take.rs b/encodings/fastlanes/src/bitpacking/compute/take.rs index 6942ea6e26..b1af0599df 100644 --- a/encodings/fastlanes/src/bitpacking/compute/take.rs +++ b/encodings/fastlanes/src/bitpacking/compute/take.rs @@ -1,15 +1,14 @@ -use std::cmp::min; - use fastlanes::BitPacking; use itertools::Itertools; -use vortex_array::array::{PrimitiveArray, SparseArray}; -use vortex_array::compute::{slice, take, TakeFn, TakeOptions}; +use vortex_array::array::PrimitiveArray; +use vortex_array::compute::{take, try_cast, TakeFn, TakeOptions}; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ - ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoArrayVariant, IntoCanonical, + ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoArrayVariant, IntoCanonical, ToArrayData, }; use vortex_dtype::{ - match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType, + match_each_integer_ptype, match_each_unsigned_integer_ptype, DType, NativePType, Nullability, + PType, }; use vortex_error::{VortexExpect as _, VortexResult}; @@ -19,7 +18,6 @@ use crate::{unpack_single_primitive, BitPackedArray, BitPackedEncoding}; // all 1024 elements takes ~8.8x as long as unpacking a single element on an M2 Macbook Air. // see https://github.com/spiraldb/vortex/pull/190#issue-2223752833 pub(super) const UNPACK_CHUNK_THRESHOLD: usize = 8; -const BULK_PATCH_THRESHOLD: usize = 64; impl TakeFn for BitPackedEncoding { fn take( @@ -54,7 +52,7 @@ impl TakeFn for BitPackedEncoding { fn take_primitive( array: &BitPackedArray, indices: &PrimitiveArray, - options: TakeOptions, + _options: TakeOptions, ) -> VortexResult> { if indices.is_empty() { return Ok(vec![]); @@ -64,7 +62,6 @@ fn take_primitive( let bit_width = array.bit_width() as usize; let packed = array.packed_slice::(); - let patches = array.patches().map(SparseArray::try_from).transpose()?; // Group indices by 1024-element chunk, *without* allocating on the heap let chunked_indices = &indices @@ -80,9 +77,7 @@ fn take_primitive( let mut output = Vec::with_capacity(indices.len()); let mut unpacked = [T::zero(); 1024]; - let mut batch_count = 0_usize; for (chunk, offsets) in chunked_indices { - batch_count += 1; let chunk_size = 128 * bit_width / size_of::(); let packed_chunk = &packed[chunk * chunk_size..][..chunk_size]; @@ -126,113 +121,29 @@ fn take_primitive( } } - if let Some(ref patches) = patches { - patch_for_take_primitive::( - patches, - indices, - offset, - batch_count, - &mut output, - options, - )?; - } - - Ok(output) -} - -fn patch_for_take_primitive( - patches: &SparseArray, - indices: &PrimitiveArray, - offset: usize, - batch_count: usize, - output: &mut [T], - options: TakeOptions, -) -> VortexResult<()> { - #[inline] - fn inner_patch( - patches: &SparseArray, - indices: &PrimitiveArray, - output: &mut [T], - options: TakeOptions, - ) -> VortexResult<()> { - let taken_patches = take(patches.as_ref(), indices.as_ref(), options)?; - let taken_patches = SparseArray::try_from(taken_patches)?; - - let base_index = output.len() - indices.len(); - let output_patches = taken_patches - .values() - .into_primitive()? - .reinterpret_cast(T::PTYPE); - taken_patches - .resolved_indices() - .iter() - .map(|idx| base_index + *idx) - .zip_eq(output_patches.maybe_null_slice::()) - .for_each(|(idx, val)| { - output[idx] = *val; - }); - - Ok(()) - } - - // if we have a small number of relatively large batches, we gain by slicing and then patching inside the loop - // if we have a large number of relatively small batches, the overhead isn't worth it, and we're better off with a bulk patch - // roughly, if we have an average of less than 64 elements per batch, we prefer bulk patching - let prefer_bulk_patch = batch_count * BULK_PATCH_THRESHOLD > indices.len(); - if prefer_bulk_patch { - return inner_patch(patches, indices, output, options); - } - - let min_index = patches.min_index().unwrap_or_default(); - let max_index = patches.max_index().unwrap_or_default(); - - // Group indices into 1024-element chunks and relativise them to the beginning of each chunk - let chunked_indices = &indices - .maybe_null_slice::() - .iter() - .map(|i| { - i.to_usize() - .vortex_expect("index must be expressible as usize") - + offset - }) - .filter(|i| *i >= min_index && *i <= max_index) // short-circuit - .chunk_by(|idx| idx / 1024); - - for (chunk, offsets) in chunked_indices { - // NOTE: we need to subtract the array offset before slicing into the patches. - // This is because BitPackedArray is rounded to block boundaries, but patches - // is sliced exactly. - let patches_start = if chunk == 0 { - 0 - } else { - (chunk * 1024) - offset - }; - let patches_end = min((chunk + 1) * 1024 - offset, patches.len()); - let patches_slice = slice(patches.as_ref(), patches_start, patches_end)?; - let patches_slice = SparseArray::try_from(patches_slice)?; - if patches_slice.is_empty() { - continue; - } - - let min_index = patches_slice.min_index().unwrap_or_default(); - let max_index = patches_slice.max_index().unwrap_or_default(); - let offsets = offsets - .map(|i| (i % 1024) as u16) - .filter(|i| *i as usize >= min_index && *i as usize <= max_index) - .collect_vec(); - if offsets.is_empty() { - continue; + if let Some(patches) = array + .patches() + .map(|p| p.take(&indices.to_array())) + .transpose()? + .flatten() + { + let indices = try_cast( + patches.indices(), + &DType::Primitive(PType::U64, Nullability::NonNullable), + )? + .into_primitive()?; + + // TODO(ngates): can patch values themselves have nulls, or do we ensure they're in our + // validity bitmap? + let values = patches.values().clone().into_primitive()?; + let values_slice = values.maybe_null_slice::(); + + for (idx, v) in indices.maybe_null_slice::().iter().zip(values_slice) { + output[*idx as usize] = *v; } - - inner_patch( - &patches_slice, - &PrimitiveArray::from(offsets), - output, - options, - )?; } - Ok(()) + Ok(output) } #[cfg(test)] @@ -241,7 +152,7 @@ mod test { use itertools::Itertools; use rand::distributions::Uniform; use rand::{thread_rng, Rng}; - use vortex_array::array::{PrimitiveArray, SparseArray}; + use vortex_array::array::PrimitiveArray; use vortex_array::compute::{scalar_at, slice, take, TakeOptions}; use vortex_array::{IntoArrayData, IntoArrayVariant}; @@ -263,6 +174,21 @@ mod test { assert_eq!(res_bytes, &[0, 62, 31, 33, 9, 18]); } + #[test] + fn take_with_patches() { + let unpacked = PrimitiveArray::from((0u32..100_000).collect_vec()).into_array(); + let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 2).unwrap(); + + let indices = PrimitiveArray::from(vec![0, 2, 4, 6]); + + let primitive_result = take(bitpacked.as_ref(), &indices, TakeOptions::default()) + .unwrap() + .into_primitive() + .unwrap(); + let res_bytes = primitive_result.maybe_null_slice::(); + assert_eq!(res_bytes, &[0, 2, 4, 6]); + } + #[test] fn take_sliced_indices() { let indices = PrimitiveArray::from(vec![1919, 1921]).into_array(); @@ -289,12 +215,6 @@ mod test { let packed = BitPackedArray::encode(uncompressed.as_ref(), 16).unwrap(); assert!(packed.patches().is_some()); - let patches = SparseArray::try_from(packed.patches().unwrap()).unwrap(); - assert_eq!( - patches.resolved_indices(), - ((values.len() + 1 - num_patches)..values.len()).collect_vec() - ); - let rng = thread_rng(); let range = Uniform::new(0, values.len()); let random_indices: PrimitiveArray = rng @@ -327,22 +247,4 @@ mod test { ); }); } - - #[test] - fn test_scalar_at() { - let values = (0u32..257).collect_vec(); - let uncompressed = PrimitiveArray::from(values.clone()).into_array(); - let packed = BitPackedArray::encode(&uncompressed, 8).unwrap(); - assert!(packed.patches().is_some()); - - let patches = SparseArray::try_from(packed.patches().unwrap()).unwrap(); - assert_eq!(patches.resolved_indices(), vec![256]); - - values.iter().enumerate().for_each(|(i, v)| { - assert_eq!( - u32::try_from(scalar_at(packed.as_ref(), i).unwrap().as_ref()).unwrap(), - *v - ); - }); - } } diff --git a/encodings/fastlanes/src/bitpacking/mod.rs b/encodings/fastlanes/src/bitpacking/mod.rs index 3cd8171332..04e7bd47a2 100644 --- a/encodings/fastlanes/src/bitpacking/mod.rs +++ b/encodings/fastlanes/src/bitpacking/mod.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use ::serde::{Deserialize, Serialize}; pub use compress::*; use fastlanes::BitPacking; -use vortex_array::array::{PrimitiveArray, SparseArray}; +use vortex_array::array::PrimitiveArray; use vortex_array::encoding::ids; +use vortex_array::patches::{Patches, PatchesMetadata}; use vortex_array::stats::{StatisticsVTable, StatsSet}; use vortex_array::validity::{LogicalValidity, Validity, ValidityMetadata, ValidityVTable}; use vortex_array::variants::{PrimitiveArrayTrait, VariantsVTable}; @@ -14,7 +15,7 @@ use vortex_array::{ impl_encoding, ArrayDType, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoCanonical, }; use vortex_buffer::Buffer; -use vortex_dtype::{DType, NativePType, Nullability, PType}; +use vortex_dtype::{DType, NativePType, PType}; use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult}; mod compress; @@ -27,7 +28,7 @@ pub struct BitPackedMetadata { validity: ValidityMetadata, bit_width: u8, offset: u16, // must be <1024 - has_patches: bool, + patches: Option, } impl Display for BitPackedMetadata { @@ -45,7 +46,7 @@ impl BitPackedArray { packed: Buffer, ptype: PType, validity: Validity, - patches: Option, + patches: Option, bit_width: u8, len: usize, ) -> VortexResult { @@ -56,7 +57,7 @@ impl BitPackedArray { packed: Buffer, ptype: PType, validity: Validity, - patches: Option, + patches: Option, bit_width: u8, length: usize, offset: u16, @@ -88,31 +89,20 @@ impl BitPackedArray { )); } - if let Some(parray) = patches.as_ref() { - if parray.len() != length { - vortex_bail!( - "Mismatched length in BitPackedArray between encoded {} and it's patches({}) {}", - length, - parray.encoding().id(), - parray.len() - ) - } - - if SparseArray::try_from(parray.clone())?.indices().is_empty() { - vortex_bail!("cannot construct BitPackedArray using patches without indices"); - } - } - let metadata = BitPackedMetadata { validity: validity.to_metadata(length)?, offset, bit_width, - has_patches: patches.is_some(), + patches: patches + .as_ref() + .map(|p| p.to_metadata(length, &dtype)) + .transpose()?, }; - let mut children = Vec::with_capacity(2); - if let Some(p) = patches { - children.push(p); + let mut children = Vec::with_capacity(3); + if let Some(p) = patches.as_ref() { + children.push(p.indices().clone()); + children.push(p.values().clone()); } if let Some(a) = validity.into_array() { children.push(a) @@ -161,15 +151,17 @@ impl BitPackedArray { /// If present, patches MUST be a `SparseArray` with equal-length to this array, and whose /// indices indicate the locations of patches. The indices must have non-zero length. #[inline] - pub fn patches(&self) -> Option { - self.metadata().has_patches.then(|| { - self.as_ref() - .child( - 0, - &self.dtype().with_nullability(Nullability::Nullable), - self.len(), - ) - .vortex_expect("BitPackedArray: patches child") + pub fn patches(&self) -> Option { + self.metadata().patches.as_ref().map(|patches| { + Patches::new( + self.len(), + self.as_ref() + .child(0, &patches.indices_dtype(), patches.len()) + .vortex_expect("BitPackedArray: patch indices"), + self.as_ref() + .child(1, self.dtype(), patches.len()) + .vortex_expect("BitPackedArray: patch values"), + ) }) } @@ -179,8 +171,11 @@ impl BitPackedArray { } pub fn validity(&self) -> Validity { - let validity_child_idx = if self.metadata().has_patches { 1 } else { 0 }; - + let validity_child_idx = if self.metadata().patches.is_some() { + 2 + } else { + 0 + }; self.metadata().validity.to_validity(|| { self.as_ref() .child(validity_child_idx, &Validity::DTYPE, self.len()) @@ -222,7 +217,7 @@ impl VisitorVTable for BitPackedEncoding { fn accept(&self, array: &BitPackedArray, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_buffer(array.packed())?; if let Some(patches) = array.patches().as_ref() { - visitor.visit_child("patches", patches)?; + visitor.visit_patches(patches)?; } visitor.visit_validity(&array.validity()) } diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index 910be55dd6..2113adebe3 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -4,11 +4,10 @@ use std::sync::Arc; use arrow_array::BooleanArray; use arrow_buffer::{BooleanBufferBuilder, MutableBuffer}; use itertools::Itertools; -use num_traits::AsPrimitive; use serde::{Deserialize, Serialize}; use vortex_buffer::Buffer; use vortex_dtype::{DType, Nullability}; -use vortex_error::{vortex_bail, VortexExpect as _, VortexResult}; +use vortex_error::{VortexExpect as _, VortexResult}; use crate::encoding::ids; use crate::stats::StatsSet; @@ -20,6 +19,7 @@ use crate::{ }; pub mod compute; +mod patch; mod stats; // Re-export the BooleanBuffer type on our API surface. @@ -135,34 +135,6 @@ impl BoolArray { .try_into() } - pub fn patch>( - self, - positions: &[P], - values: BoolArray, - ) -> VortexResult { - if positions.len() != values.len() { - vortex_bail!( - "Positions and values passed to patch had different lengths {} and {}", - positions.len(), - values.len() - ); - } - if let Some(last_pos) = positions.last() { - if last_pos.as_() >= self.len() { - vortex_bail!(OutOfBounds: last_pos.as_(), 0, self.len()) - } - } - - let len = self.len(); - let result_validity = self.validity().patch(len, positions, values.validity())?; - let (mut own_values, bit_offset) = self.into_boolean_builder(); - for (idx, value) in positions.iter().zip_eq(values.boolean_buffer().iter()) { - own_values.set_bit(idx.as_() + bit_offset, value); - } - - Self::try_new(own_values.finish().slice(bit_offset, len), result_validity) - } - /// Create a new BoolArray from a set of indices and a length. /// All indices must be less than the length. pub fn from_indices>(length: usize, indices: I) -> Self { @@ -235,12 +207,10 @@ impl VisitorVTable for BoolEncoding { #[cfg(test)] mod tests { - use arrow_buffer::BooleanBuffer; - use crate::array::BoolArray; - use crate::compute::{scalar_at, slice}; + use crate::compute::scalar_at; use crate::validity::Validity; - use crate::{IntoArrayData, IntoArrayVariant}; + use crate::IntoArrayData; #[test] fn bool_array() { @@ -283,31 +253,4 @@ mod tests { let scalar = scalar_at(&arr, 4).unwrap(); assert!(scalar.is_null()); } - - #[test] - fn patch_sliced_bools() { - let arr = BoolArray::from(BooleanBuffer::new_set(12)); - let sliced = slice(arr, 4, 12).unwrap(); - let (values, offset) = sliced.into_bool().unwrap().into_boolean_builder(); - assert_eq!(offset, 4); - assert_eq!(values.as_slice(), &[255, 15]); - } - - #[test] - fn patch_sliced_bools_offset() { - let arr = BoolArray::from(BooleanBuffer::new_set(15)); - let sliced = slice(arr, 4, 15).unwrap(); - let (values, offset) = sliced.into_bool().unwrap().into_boolean_builder(); - assert_eq!(offset, 4); - assert_eq!(values.as_slice(), &[255, 127]); - } - - #[test] - fn patch_sliced_bools_even() { - let arr = BoolArray::from(BooleanBuffer::new_set(31)); - let sliced = slice(arr, 8, 24).unwrap(); - let (values, offset) = sliced.into_bool().unwrap().into_boolean_builder(); - assert_eq!(offset, 0); - assert_eq!(values.as_slice(), &[255, 255]); - } } diff --git a/vortex-array/src/array/bool/patch.rs b/vortex-array/src/array/bool/patch.rs new file mode 100644 index 0000000000..8c2e1a4dfb --- /dev/null +++ b/vortex-array/src/array/bool/patch.rs @@ -0,0 +1,72 @@ +use itertools::Itertools; +use vortex_dtype::match_each_integer_ptype; +use vortex_error::VortexResult; + +use crate::array::BoolArray; +use crate::patches::Patches; +use crate::variants::PrimitiveArrayTrait; +use crate::{ArrayLen, IntoArrayVariant, ToArrayData}; + +impl BoolArray { + pub fn patch(self, patches: Patches) -> VortexResult { + let length = self.len(); + let indices = patches.indices().clone().into_primitive()?; + let values = patches.values().clone().into_bool()?; + + let patched_validity = + self.validity() + .patch(self.len(), &indices.to_array(), values.validity())?; + + let (mut own_values, bit_offset) = self.into_boolean_builder(); + match_each_integer_ptype!(indices.ptype(), |$I| { + for (idx, value) in indices + .into_maybe_null_slice::<$I>() + .into_iter() + .zip_eq(values.boolean_buffer().iter()) + { + own_values.set_bit(idx as usize + bit_offset, value); + } + }); + + Self::try_new( + own_values.finish().slice(bit_offset, length), + patched_validity, + ) + } +} + +#[cfg(test)] +mod tests { + use arrow_buffer::BooleanBuffer; + + use crate::array::BoolArray; + use crate::compute::slice; + use crate::IntoArrayVariant; + + #[test] + fn patch_sliced_bools() { + let arr = BoolArray::from(BooleanBuffer::new_set(12)); + let sliced = slice(arr, 4, 12).unwrap(); + let (values, offset) = sliced.into_bool().unwrap().into_boolean_builder(); + assert_eq!(offset, 4); + assert_eq!(values.as_slice(), &[255, 15]); + } + + #[test] + fn patch_sliced_bools_offset() { + let arr = BoolArray::from(BooleanBuffer::new_set(15)); + let sliced = slice(arr, 4, 15).unwrap(); + let (values, offset) = sliced.into_bool().unwrap().into_boolean_builder(); + assert_eq!(offset, 4); + assert_eq!(values.as_slice(), &[255, 127]); + } + + #[test] + fn patch_sliced_bools_even() { + let arr = BoolArray::from(BooleanBuffer::new_set(31)); + let sliced = slice(arr, 8, 24).unwrap(); + let (values, offset) = sliced.into_bool().unwrap().into_boolean_builder(); + assert_eq!(offset, 0); + assert_eq!(values.as_slice(), &[255, 255]); + } +} diff --git a/vortex-array/src/array/null/compute.rs b/vortex-array/src/array/null/compute.rs index 3bd98d5db7..eaa15f9943 100644 --- a/vortex-array/src/array/null/compute.rs +++ b/vortex-array/src/array/null/compute.rs @@ -63,7 +63,6 @@ mod test { use vortex_dtype::DType; use crate::array::null::NullArray; - use crate::array::ConstantArray; use crate::compute::{scalar_at, slice, take, TakeOptions}; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::{ArrayLen, IntoArrayData}; @@ -72,9 +71,7 @@ mod test { fn test_slice_nulls() { let nulls = NullArray::new(10); - // Turns out, the slice function has a short-cut for constant arrays. - // Sooo... we get back a constant! - let sliced = ConstantArray::try_from(slice(nulls.into_array(), 0, 4).unwrap()).unwrap(); + let sliced = NullArray::try_from(slice(nulls.into_array(), 0, 4).unwrap()).unwrap(); assert_eq!(sliced.len(), 4); assert!(matches!( diff --git a/vortex-array/src/array/primitive/compute/subtract_scalar.rs b/vortex-array/src/array/primitive/compute/subtract_scalar.rs index 74120a75a1..6b71b59a83 100644 --- a/vortex-array/src/array/primitive/compute/subtract_scalar.rs +++ b/vortex-array/src/array/primitive/compute/subtract_scalar.rs @@ -153,12 +153,4 @@ mod test { let _results = subtract_scalar(&values, &1.0f32.into()).unwrap(); let _results = subtract_scalar(&values, &f32::MAX.into()).unwrap(); } - - #[test] - fn test_scalar_subtract_type_mismatch_fails() { - let values = vec![1u64, 2, 3].into_array(); - // Subtracting incompatible dtypes should fail - let _results = - subtract_scalar(&values, &1.5f64.into()).expect_err("Expected type mismatch error"); - } } diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index 2497bc1858..9e011cbd9a 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -6,11 +6,10 @@ mod accessor; use arrow_buffer::{ArrowNativeType, Buffer as ArrowBuffer, MutableBuffer}; use bytes::Bytes; use itertools::Itertools; -use num_traits::AsPrimitive; use serde::{Deserialize, Serialize}; use vortex_buffer::Buffer; use vortex_dtype::{match_each_native_ptype, DType, NativePType, PType}; -use vortex_error::{vortex_bail, VortexExpect as _, VortexResult}; +use vortex_error::{VortexExpect as _, VortexResult}; use crate::encoding::ids; use crate::iter::Accessor; @@ -19,11 +18,11 @@ use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata use crate::variants::{PrimitiveArrayTrait, VariantsVTable}; use crate::visitor::{ArrayVisitor, VisitorVTable}; use crate::{ - impl_encoding, ArrayDType, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, - IntoCanonical, + impl_encoding, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, IntoCanonical, }; mod compute; +mod patch; mod stats; impl_encoding!("vortex.primitive", ids::PRIMITIVE, Primitive); @@ -157,40 +156,6 @@ impl PrimitiveArray { PrimitiveArray::new(self.buffer().clone(), ptype, self.validity()) } - pub fn patch, T: NativePType + ArrowNativeType>( - self, - positions: &[P], - values: &[T], - values_validity: Validity, - ) -> VortexResult { - if positions.len() != values.len() { - vortex_bail!( - "Positions and values passed to patch had different lengths {} and {}", - positions.len(), - values.len() - ); - } - if let Some(last_pos) = positions.last() { - if last_pos.as_() >= self.len() { - vortex_bail!(OutOfBounds: last_pos.as_(), 0, self.len()) - } - } - - if self.ptype() != T::PTYPE { - vortex_bail!(MismatchedTypes: self.dtype(), T::PTYPE) - } - - let result_validity = self - .validity() - .patch(self.len(), positions, values_validity)?; - let mut own_values = self.into_maybe_null_slice::(); - for (idx, value) in positions.iter().zip_eq(values) { - own_values[idx.as_()] = *value; - } - - Ok(Self::from_vec(own_values, result_validity)) - } - pub fn into_buffer(self) -> Buffer { self.into_array() .into_buffer() @@ -282,23 +247,3 @@ impl VisitorVTable for PrimitiveEncoding { visitor.visit_validity(&array.validity()) } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::compute::slice; - use crate::IntoArrayVariant; - - #[test] - fn patch_sliced() { - let input = PrimitiveArray::from_vec(vec![2u32; 10], Validity::AllValid); - let sliced = slice(input, 2, 8).unwrap(); - assert_eq!( - sliced - .into_primitive() - .unwrap() - .into_maybe_null_slice::(), - vec![2u32; 6] - ); - } -} diff --git a/vortex-array/src/array/primitive/patch.rs b/vortex-array/src/array/primitive/patch.rs new file mode 100644 index 0000000000..4620683d42 --- /dev/null +++ b/vortex-array/src/array/primitive/patch.rs @@ -0,0 +1,51 @@ +use itertools::Itertools; +use vortex_dtype::{match_each_integer_ptype, match_each_native_ptype}; +use vortex_error::VortexResult; + +use crate::array::PrimitiveArray; +use crate::patches::Patches; +use crate::variants::PrimitiveArrayTrait; +use crate::{ArrayLen, IntoArrayVariant}; + +impl PrimitiveArray { + #[allow(clippy::cognitive_complexity)] + pub fn patch(self, patches: Patches) -> VortexResult { + let indices = patches.indices().clone().into_primitive()?; + let values = patches.values().clone().into_primitive()?; + + let patched_validity = + self.validity() + .patch(self.len(), indices.as_ref(), values.validity())?; + + match_each_integer_ptype!(indices.ptype(), |$I| { + match_each_native_ptype!(self.ptype(), |$T| { + let mut own_values = self.into_maybe_null_slice::<$T>(); + for (idx, value) in indices.into_maybe_null_slice::<$I>().into_iter().zip_eq(values.into_maybe_null_slice::<$T>().into_iter()) { + own_values[idx as usize] = value; + } + Ok(Self::from_vec(own_values, patched_validity)) + }) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::compute::slice; + use crate::validity::Validity; + use crate::IntoArrayVariant; + + #[test] + fn patch_sliced() { + let input = PrimitiveArray::from_vec(vec![2u32; 10], Validity::AllValid); + let sliced = slice(input, 2, 8).unwrap(); + assert_eq!( + sliced + .into_primitive() + .unwrap() + .into_maybe_null_slice::(), + vec![2u32; 6] + ); + } +} diff --git a/vortex-array/src/array/sparse/canonical.rs b/vortex-array/src/array/sparse/canonical.rs index e3e690669b..ef01cebf9d 100644 --- a/vortex-array/src/array/sparse/canonical.rs +++ b/vortex-array/src/array/sparse/canonical.rs @@ -1,30 +1,29 @@ use arrow_buffer::{ArrowNativeType, BooleanBuffer}; -use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability}; +use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability, PType}; use vortex_error::{VortexError, VortexResult}; use vortex_scalar::Scalar; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; -use crate::array::BoolArray; +use crate::array::{BoolArray, ConstantArray}; +use crate::patches::Patches; use crate::validity::Validity; -use crate::variants::PrimitiveArrayTrait; -use crate::{ArrayDType, ArrayLen, Canonical, IntoArrayVariant, IntoCanonical}; +use crate::{ArrayDType, ArrayLen, Canonical, IntoCanonical}; impl IntoCanonical for SparseArray { fn into_canonical(self) -> VortexResult { - // Resolve our indices into a vector of usize applying the offset - let indices = self.resolved_indices(); + if self.indices().is_empty() { + return ConstantArray::new(self.fill_scalar(), self.len()).into_canonical(); + } + let patches = Patches::new(self.len(), self.resolved_indices()?, self.values()); if matches!(self.dtype(), DType::Bool(_)) { - let values = self.values().into_bool()?; - canonicalize_sparse_bools(values, &indices, self.len(), &self.fill_scalar()) + canonicalize_sparse_bools(patches, &self.fill_scalar()) } else { - let values = self.values().into_primitive()?; - match_each_native_ptype!(values.ptype(), |$P| { + let ptype = PType::try_from(self.values().dtype())?; + match_each_native_ptype!(ptype, |$P| { canonicalize_sparse_primitives::<$P>( - values, - &indices, - self.len(), + patches, &self.fill_scalar(), ) }) @@ -32,18 +31,13 @@ impl IntoCanonical for SparseArray { } } -fn canonicalize_sparse_bools( - values: BoolArray, - indices: &[usize], - len: usize, - fill_value: &Scalar, -) -> VortexResult { +fn canonicalize_sparse_bools(patches: Patches, fill_value: &Scalar) -> VortexResult { let (fill_bool, validity) = if fill_value.is_null() { (false, Validity::AllInvalid) } else { ( fill_value.try_into()?, - if values.dtype().nullability() == Nullability::NonNullable { + if patches.dtype().nullability() == Nullability::NonNullable { Validity::NonNullable } else { Validity::AllValid @@ -53,31 +47,28 @@ fn canonicalize_sparse_bools( let bools = BoolArray::try_new( if fill_bool { - BooleanBuffer::new_set(len) + BooleanBuffer::new_set(patches.array_len()) } else { - BooleanBuffer::new_unset(len) + BooleanBuffer::new_unset(patches.array_len()) }, validity, )?; - let patched = bools.patch(indices, values)?; - Ok(Canonical::Bool(patched)) + + bools.patch(patches).map(Canonical::Bool) } fn canonicalize_sparse_primitives< T: NativePType + for<'a> TryFrom<&'a Scalar, Error = VortexError> + ArrowNativeType, >( - values: PrimitiveArray, - indices: &[usize], - len: usize, + patches: Patches, fill_value: &Scalar, ) -> VortexResult { - let values_validity = values.validity(); let (primitive_fill, validity) = if fill_value.is_null() { (T::default(), Validity::AllInvalid) } else { ( fill_value.try_into()?, - if values_validity == Validity::NonNullable { + if patches.dtype().nullability() == Nullability::NonNullable { Validity::NonNullable } else { Validity::AllValid @@ -85,9 +76,9 @@ fn canonicalize_sparse_primitives< ) }; - let parray = PrimitiveArray::from_vec(vec![primitive_fill; len], validity); - let patched = parray.patch(indices, values.maybe_null_slice::(), values_validity)?; - Ok(Canonical::Primitive(patched)) + let parray = PrimitiveArray::from_vec(vec![primitive_fill; patches.array_len()], validity); + + parray.patch(patches).map(Canonical::Primitive) } #[cfg(test)] diff --git a/vortex-array/src/array/sparse/compute/take.rs b/vortex-array/src/array/sparse/compute/take.rs index 88d0749bf0..4af03fa723 100644 --- a/vortex-array/src/array/sparse/compute/take.rs +++ b/vortex-array/src/array/sparse/compute/take.rs @@ -1,14 +1,14 @@ use std::convert::identity; use itertools::Itertools; -use vortex_dtype::match_each_integer_ptype; +use vortex_dtype::{match_each_integer_ptype, DType, Nullability, PType}; use vortex_error::VortexResult; use crate::aliases::hash_map::HashMap; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; use crate::array::SparseEncoding; -use crate::compute::{take, TakeFn, TakeOptions}; +use crate::compute::{take, try_cast, TakeFn, TakeOptions}; use crate::variants::PrimitiveArrayTrait; use crate::{ArrayData, IntoArrayData, IntoArrayVariant}; @@ -43,11 +43,16 @@ fn take_map( array: &SparseArray, indices: &PrimitiveArray, ) -> VortexResult<(PrimitiveArray, PrimitiveArray)> { - let indices_map: HashMap = array - .resolved_indices() + let resolved_indices = try_cast( + array.resolved_indices()?, + &DType::Primitive(PType::U64, Nullability::NonNullable), + )? + .into_primitive()?; + let indices_map: HashMap = resolved_indices + .maybe_null_slice::() .iter() .enumerate() - .map(|(i, r)| (*r as u64, i as u64)) + .map(|(i, r)| (*r, i as u64)) .collect(); let min_index = array.min_index().unwrap_or_default() as u64; let max_index = array.max_index().unwrap_or_default() as u64; diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 3a758edfa1..21fbce00ef 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -7,7 +7,9 @@ use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult}; use vortex_scalar::{Scalar, ScalarValue}; use crate::array::constant::ConstantArray; -use crate::compute::{scalar_at, search_sorted_usize, SearchResult, SearchSortedSide}; +use crate::compute::{ + scalar_at, search_sorted_usize, subtract_scalar, SearchResult, SearchSortedSide, +}; use crate::encoding::ids; use crate::stats::{ArrayStatistics, Stat, StatisticsVTable, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity, ValidityVTable}; @@ -131,8 +133,13 @@ impl SparseArray { ) } - /// Return indices as a vector of usize with the indices_offset applied. - pub fn resolved_indices(&self) -> Vec { + /// Return indices with the indices_offset applied. + pub fn resolved_indices(&self) -> VortexResult { + subtract_scalar(self.indices(), &Scalar::from(self.indices_offset())) + } + + /// Return the resolved indices as a `Vec`. + pub fn resolved_indices_usize(&self) -> Vec { let flat_indices = self .indices() .into_primitive() diff --git a/vortex-array/src/compute/filter.rs b/vortex-array/src/compute/filter.rs index 5c951ccba6..baf6c91768 100644 --- a/vortex-array/src/compute/filter.rs +++ b/vortex-array/src/compute/filter.rs @@ -3,6 +3,7 @@ use std::sync::OnceLock; use arrow_array::BooleanArray; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, MutableBuffer}; +use num_traits::AsPrimitive; use vortex_dtype::{DType, Nullability}; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexExpect, VortexResult}; @@ -197,11 +198,14 @@ pub enum FilterIter<'a> { impl FilterMask { /// Create a new FilterMask where the given indices are set. - pub fn from_indices>(length: usize, indices: I) -> Self { + pub fn from_indices, I: IntoIterator>( + length: usize, + indices: I, + ) -> Self { let mut buffer = MutableBuffer::new_null(length); indices .into_iter() - .for_each(|idx| arrow_buffer::bit_util::set_bit(&mut buffer, idx)); + .for_each(|idx| arrow_buffer::bit_util::set_bit(&mut buffer, idx.as_())); Self::from(BooleanBufferBuilder::new_from_buffer(buffer, length).finish()) } diff --git a/vortex-array/src/compute/scalar_subtract.rs b/vortex-array/src/compute/scalar_subtract.rs index 014b14bd33..ba505107c1 100644 --- a/vortex-array/src/compute/scalar_subtract.rs +++ b/vortex-array/src/compute/scalar_subtract.rs @@ -30,15 +30,16 @@ pub fn subtract_scalar( to_subtract: &Scalar, ) -> VortexResult { let array = array.as_ref(); + let to_subtract = to_subtract.cast(array.dtype())?; if let Some(f) = array.encoding().subtract_scalar_fn() { - return f.subtract_scalar(array, to_subtract); + return f.subtract_scalar(array, &to_subtract); } // if subtraction is not implemented for the given array type, but the array has a numeric // DType, we can flatten the array and apply subtraction to the flattened primitive array match array.dtype() { - DType::Primitive(..) => subtract_scalar(array.clone().into_primitive()?, to_subtract), + DType::Primitive(..) => subtract_scalar(array.clone().into_primitive()?, &to_subtract), _ => Err(vortex_err!( NotImplemented: "scalar_subtract", array.encoding().id() diff --git a/vortex-array/src/compute/slice.rs b/vortex-array/src/compute/slice.rs index 4aecbb3367..4abacb0b2f 100644 --- a/vortex-array/src/compute/slice.rs +++ b/vortex-array/src/compute/slice.rs @@ -1,8 +1,7 @@ use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; -use crate::array::ConstantArray; use crate::encoding::Encoding; -use crate::{ArrayData, IntoArrayData}; +use crate::ArrayData; /// Limit array to start...stop range pub trait SliceFn { @@ -41,10 +40,6 @@ pub fn slice(array: impl AsRef, start: usize, stop: usize) -> VortexR let array = array.as_ref(); check_slice_bounds(array, start, stop)?; - if let Some(const_scalar) = array.as_constant() { - return Ok(ConstantArray::new(const_scalar, stop - start).into_array()); - } - array .encoding() .slice_fn() diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 6c5aa1589a..ac4b53476a 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -38,6 +38,7 @@ pub mod iter; mod macros; mod metadata; pub mod nbytes; +pub mod patches; pub mod stats; pub mod stream; pub mod tree; diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs new file mode 100644 index 0000000000..5581d90916 --- /dev/null +++ b/vortex-array/src/patches.rs @@ -0,0 +1,286 @@ +use std::fmt::Debug; + +use serde::{Deserialize, Serialize}; +use vortex_dtype::Nullability::NonNullable; +use vortex_dtype::{match_each_integer_ptype, DType, PType}; +use vortex_error::{vortex_bail, VortexExpect, VortexResult}; +use vortex_scalar::Scalar; + +use crate::array::PrimitiveArray; +use crate::compute::{ + scalar_at, search_sorted, search_sorted_many, search_sorted_usize, slice, subtract_scalar, + take, try_cast, FilterMask, SearchResult, SearchSortedSide, TakeOptions, +}; +use crate::stats::{ArrayStatistics, Stat}; +use crate::validity::Validity; +use crate::variants::PrimitiveArrayTrait; +use crate::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PatchesMetadata { + len: usize, + indices_ptype: PType, +} + +impl PatchesMetadata { + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + #[inline] + pub fn indices_dtype(&self) -> DType { + DType::Primitive(self.indices_ptype, NonNullable) + } +} + +/// A helper for working with patched arrays. +#[derive(Debug, Clone)] +pub struct Patches { + array_len: usize, + indices: ArrayData, + values: ArrayData, +} + +impl Patches { + pub fn new(array_len: usize, indices: ArrayData, values: ArrayData) -> Self { + assert_eq!( + indices.len(), + values.len(), + "Patch indices and values must have the same length" + ); + assert!(indices.dtype().is_int(), "Patch indices must be integers"); + assert!( + indices.len() <= array_len, + "Patch indices must be shorter than the array length" + ); + assert!(!indices.is_empty(), "Patch indices must not be empty"); + if let Some(max) = indices.statistics().get_as_cast::(Stat::Max) { + assert!( + max < array_len as u64, + "Patch indices {} are longer than the array length {}", + max, + array_len + ); + } + Self { + array_len, + indices, + values, + } + } + + pub fn array_len(&self) -> usize { + self.array_len + } + + pub fn num_patches(&self) -> usize { + self.indices.len() + } + + pub fn dtype(&self) -> &DType { + self.values.dtype() + } + + pub fn indices(&self) -> &ArrayData { + &self.indices + } + + pub fn values(&self) -> &ArrayData { + &self.values + } + + pub fn indices_ptype(&self) -> PType { + PType::try_from(self.indices.dtype()).vortex_expect("primitive indices") + } + + pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult { + if self.indices.len() > len { + vortex_bail!( + "Patch indices {} are longer than the array length {}", + self.indices.len(), + len + ); + } + if self.values.dtype() != dtype { + vortex_bail!( + "Patch values dtype {} does not match array dtype {}", + self.values.dtype(), + dtype + ); + } + Ok(PatchesMetadata { + len: self.indices.len(), + indices_ptype: PType::try_from(self.indices.dtype()).vortex_expect("primitive indices"), + }) + } + + /// Get the patched value at a given index if it exists. + pub fn get_patched(&self, index: usize) -> VortexResult> { + if let Some(patch_idx) = + search_sorted_usize(self.indices(), index, SearchSortedSide::Left)?.to_found() + { + scalar_at(self.values(), patch_idx).map(Some) + } else { + Ok(None) + } + } + + /// Return the search_sorted result for the given target re-mapped into the original indices. + pub fn search_sorted>( + &self, + target: T, + side: SearchSortedSide, + ) -> VortexResult { + Ok(match search_sorted(self.values(), target.into(), side)? { + SearchResult::Found(idx) => SearchResult::Found(if idx == self.indices().len() { + self.array_len() + } else { + usize::try_from(&scalar_at(self.indices(), idx)?)? + }), + SearchResult::NotFound(idx) => SearchResult::NotFound(if idx == self.indices().len() { + self.array_len() + } else { + usize::try_from(&scalar_at(self.indices(), idx)?)? + }), + }) + } + + /// Returns the minimum patch index + pub fn min_index(&self) -> VortexResult { + usize::try_from(&scalar_at(self.indices(), 0)?) + } + + /// Returns the maximum patch index + pub fn max_index(&self) -> VortexResult { + usize::try_from(&scalar_at(self.indices(), self.indices().len() - 1)?) + } + + /// Filter the patches by a mask, resulting in new patches for the filtered array. + pub fn filter(&self, mask: FilterMask) -> VortexResult> { + if mask.is_empty() { + return Ok(None); + } + + let buffer = mask.to_boolean_buffer()?; + let mut coordinate_indices: Vec = Vec::new(); + let mut value_indices = Vec::new(); + let mut last_inserted_index: usize = 0; + + let flat_indices = self.indices().clone().into_primitive()?; + match_each_integer_ptype!(flat_indices.ptype(), |$I| { + for (value_idx, coordinate) in flat_indices.into_maybe_null_slice::<$I>().into_iter().enumerate() { + if buffer.value(coordinate as usize) { + // We count the number of truthy values between this coordinate and the previous truthy one + let adjusted_coordinate = buffer.slice(last_inserted_index, (coordinate as usize) - last_inserted_index).count_set_bits() as u64; + coordinate_indices.push(adjusted_coordinate + coordinate_indices.last().copied().unwrap_or_default()); + last_inserted_index = coordinate as usize; + value_indices.push(value_idx as u64); + } + } + }); + + if coordinate_indices.is_empty() { + return Ok(None); + } + + let indices = PrimitiveArray::from(coordinate_indices).into_array(); + let values = take( + self.values(), + PrimitiveArray::from(value_indices), + TakeOptions::default(), + )?; + + Ok(Some(Self::new(mask.len(), indices, values))) + } + + /// Slice the patches by a range of the patched array. + pub fn slice(&self, start: usize, stop: usize) -> VortexResult> { + let patch_start = + search_sorted_usize(self.indices(), start, SearchSortedSide::Left)?.to_index(); + let patch_stop = + search_sorted_usize(self.indices(), stop, SearchSortedSide::Left)?.to_index(); + + if patch_start == patch_stop { + return Ok(None); + } + + // Slice out the values + let values = slice(self.values(), patch_start, patch_stop)?; + + // Subtract the start value from the indices + let indices = slice(self.indices(), patch_start, patch_stop)?; + let indices = subtract_scalar(&indices, &Scalar::from(start).cast(indices.dtype())?)?; + + Ok(Some(Self::new(stop - start, indices, values))) + } + + /// Take the indices from the patches. + pub fn take(&self, indices: &ArrayData) -> VortexResult> { + if indices.is_empty() { + return Ok(None); + } + + // TODO(ngates): plenty of optimisations to be made here + let take_indices = + try_cast(indices, &DType::Primitive(PType::U64, NonNullable))?.into_primitive()?; + + let (values_indices, new_indices): (Vec, Vec) = search_sorted_many( + self.indices(), + take_indices.maybe_null_slice::(), + SearchSortedSide::Left, + )? + .iter() + .enumerate() + .filter_map(|(idx_in_take, search_result)| { + search_result + .to_found() + .map(|patch_idx| (patch_idx as u64, idx_in_take as u64)) + }) + .unzip(); + + if new_indices.is_empty() { + return Ok(None); + } + + let new_indices = PrimitiveArray::from_vec(new_indices, Validity::NonNullable).into_array(); + + let values_indices = + PrimitiveArray::from_vec(values_indices, Validity::NonNullable).into_array(); + let new_values = take(self.values(), values_indices, TakeOptions::default())?; + + Ok(Some(Self::new(indices.len(), new_indices, new_values))) + } +} + +#[cfg(test)] +mod test { + use crate::array::PrimitiveArray; + use crate::compute::FilterMask; + use crate::patches::Patches; + use crate::{IntoArrayData, IntoArrayVariant}; + + #[test] + fn test_filter() { + let patches = Patches::new( + 100, + PrimitiveArray::from(vec![10u32, 11, 20]).into_array(), + PrimitiveArray::from(vec![100, 110, 200]).into_array(), + ); + + let filtered = patches + .filter(FilterMask::from_indices(100, [10u32, 20, 30])) + .unwrap() + .unwrap(); + + let indices = filtered.indices().clone().into_primitive().unwrap(); + let values = filtered.values().clone().into_primitive().unwrap(); + assert_eq!(indices.maybe_null_slice::(), &[0, 1]); + assert_eq!(values.maybe_null_slice::(), &[100, 200]); + } +} diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index f26e96ec8a..8491d39ab1 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -4,8 +4,8 @@ use std::fmt::{Debug, Display}; use std::ops::BitAnd; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; -use num_traits::AsPrimitive; use serde::{Deserialize, Serialize}; +use vortex_dtype::Nullability::NonNullable; use vortex_dtype::{DType, Nullability}; use vortex_error::{ vortex_bail, vortex_err, vortex_panic, VortexError, VortexExpect as _, VortexResult, @@ -14,6 +14,7 @@ use vortex_error::{ use crate::array::{BoolArray, ConstantArray}; use crate::compute::{filter, scalar_at, slice, take, FilterMask, TakeOptions}; use crate::encoding::Encoding; +use crate::patches::Patches; use crate::stats::ArrayStatistics; use crate::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; @@ -99,7 +100,7 @@ pub enum Validity { impl Validity { /// The [`DType`] of the underlying validity array (if it exists). - pub const DTYPE: DType = DType::Bool(Nullability::NonNullable); + pub const DTYPE: DType = DType::Bool(NonNullable); pub fn to_metadata(&self, length: usize) -> VortexResult { match self { @@ -160,7 +161,7 @@ impl Validity { pub fn nullability(&self) -> Nullability { match self { - Self::NonNullable => Nullability::NonNullable, + Self::NonNullable => NonNullable, _ => Nullability::Nullable, } } @@ -269,30 +270,19 @@ impl Validity { Ok(validity) } - pub fn patch>( - self, - len: usize, - positions: &[P], - patches: Validity, - ) -> VortexResult { - if let Some(last_pos) = positions.last() { - if last_pos.as_() >= len { - vortex_bail!(OutOfBounds: last_pos.as_(), 0, len) + pub fn patch(self, len: usize, indices: &ArrayData, patches: Validity) -> VortexResult { + match (&self, &patches) { + (Validity::NonNullable, Validity::NonNullable) => return Ok(Validity::NonNullable), + (Validity::NonNullable, _) => { + vortex_bail!("Can't patch a non-nullable validity with nullable validity") } - } - - if matches!(self, Validity::NonNullable) { - if patches.null_count(positions.len())? > 0 { - vortex_bail!("Can't patch a non-nullable validity with null values") + (_, Validity::NonNullable) => { + vortex_bail!("Can't patch a nullable validity with non-nullable validity") } - return Ok(self); - } - - if matches!(self, Validity::AllValid) && matches!(patches, Validity::AllValid) - || self == patches - { - return Ok(self); - } + (Validity::AllValid, Validity::AllValid) => return Ok(Validity::AllValid), + (Validity::AllInvalid, Validity::AllInvalid) => return Ok(Validity::AllInvalid), + _ => {} + }; let source = match self { Validity::NonNullable => BoolArray::from(BooleanBuffer::new_set(len)), @@ -302,13 +292,15 @@ impl Validity { }; let patch_values = match patches { - Validity::NonNullable => BoolArray::from(BooleanBuffer::new_set(positions.len())), - Validity::AllValid => BoolArray::from(BooleanBuffer::new_set(positions.len())), - Validity::AllInvalid => BoolArray::from(BooleanBuffer::new_unset(positions.len())), + Validity::NonNullable => BoolArray::from(BooleanBuffer::new_set(indices.len())), + Validity::AllValid => BoolArray::from(BooleanBuffer::new_set(indices.len())), + Validity::AllInvalid => BoolArray::from(BooleanBuffer::new_unset(indices.len())), Validity::Array(a) => a.into_bool()?, }; - Validity::try_from(source.patch(positions, patch_values)?.into_array()) + let patches = Patches::new(len, indices.clone(), patch_values.into_array()); + + Validity::try_from(source.patch(patches)?.into_array()) } /// Convert into a nullable variant @@ -515,28 +507,21 @@ impl IntoArrayData for LogicalValidity { mod tests { use rstest::rstest; - use crate::array::BoolArray; + use crate::array::{BoolArray, PrimitiveArray}; use crate::validity::Validity; use crate::IntoArrayData; #[rstest] - #[case(Validity::NonNullable, 5, &[2, 4], Validity::NonNullable, Validity::NonNullable)] - #[case(Validity::NonNullable, 5, &[2, 4], Validity::AllValid, Validity::NonNullable)] - #[case(Validity::AllValid, 5, &[2, 4], Validity::NonNullable, Validity::AllValid)] #[case(Validity::AllValid, 5, &[2, 4], Validity::AllValid, Validity::AllValid)] #[case(Validity::AllValid, 5, &[2, 4], Validity::AllInvalid, Validity::Array(BoolArray::from_iter([true, true, false, true, false]).into_array()) )] #[case(Validity::AllValid, 5, &[2, 4], Validity::Array(BoolArray::from_iter([true, false]).into_array()), Validity::Array(BoolArray::from_iter([true, true, true, true, false]).into_array()) )] - #[case(Validity::AllInvalid, 5, &[2, 4], Validity::NonNullable, Validity::Array(BoolArray::from_iter([false, false, true, false, true]).into_array()) - )] #[case(Validity::AllInvalid, 5, &[2, 4], Validity::AllValid, Validity::Array(BoolArray::from_iter([false, false, true, false, true]).into_array()) )] #[case(Validity::AllInvalid, 5, &[2, 4], Validity::AllInvalid, Validity::AllInvalid)] #[case(Validity::AllInvalid, 5, &[2, 4], Validity::Array(BoolArray::from_iter([true, false]).into_array()), Validity::Array(BoolArray::from_iter([false, false, true, false, false]).into_array()) )] - #[case(Validity::Array(BoolArray::from_iter([false, true, false, true, false]).into_array()), 5, &[2, 4], Validity::NonNullable, Validity::Array(BoolArray::from_iter([false, true, true, true, true]).into_array()) - )] #[case(Validity::Array(BoolArray::from_iter([false, true, false, true, false]).into_array()), 5, &[2, 4], Validity::AllValid, Validity::Array(BoolArray::from_iter([false, true, true, true, true]).into_array()) )] #[case(Validity::Array(BoolArray::from_iter([false, true, false, true, false]).into_array()), 5, &[2, 4], Validity::AllInvalid, Validity::Array(BoolArray::from_iter([false, true, false, true, false]).into_array()) @@ -546,18 +531,24 @@ mod tests { fn patch_validity( #[case] validity: Validity, #[case] len: usize, - #[case] positions: &[usize], + #[case] positions: &[u64], #[case] patches: Validity, #[case] expected: Validity, ) { - assert_eq!(validity.patch(len, positions, patches).unwrap(), expected); + let indices = + PrimitiveArray::from_vec(positions.to_vec(), Validity::NonNullable).into_array(); + assert_eq!(validity.patch(len, &indices, patches).unwrap(), expected); } #[test] #[should_panic] fn out_of_bounds_patch() { Validity::NonNullable - .patch(2, &[4], Validity::AllInvalid) + .patch( + 2, + &PrimitiveArray::from_vec(vec![4], Validity::NonNullable).into_array(), + Validity::AllInvalid, + ) .unwrap(); } } diff --git a/vortex-array/src/visitor.rs b/vortex-array/src/visitor.rs index 8293bb4562..16bc1f8248 100644 --- a/vortex-array/src/visitor.rs +++ b/vortex-array/src/visitor.rs @@ -2,6 +2,7 @@ use vortex_buffer::Buffer; use vortex_error::{vortex_err, VortexError, VortexResult}; use crate::encoding::Encoding; +use crate::patches::Patches; use crate::validity::Validity; use crate::ArrayData; @@ -40,6 +41,12 @@ pub trait ArrayVisitor { } } + /// Utility for visiting Array patches. + fn visit_patches(&mut self, patches: &Patches) -> VortexResult<()> { + self.visit_child("patch_indices", patches.indices())?; + self.visit_child("patch_values", patches.values()) + } + fn visit_buffer(&mut self, _buffer: &Buffer) -> VortexResult<()> { Ok(()) } diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index d0495897db..0bf6954bdb 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -59,24 +59,14 @@ impl EncodingCompressor for ALPCompressor { ctx.auxiliary("patches") .excluding(self) .including(&ALPRDCompressor) - .compress(&p, like.as_ref().and_then(|l| l.child(1))) + .compress_patches(p) }) .transpose()?; Ok(CompressedArray::compressed( - ALPArray::try_new( - compressed_encoded.array, - exponents, - compressed_patches.as_ref().map(|p| p.array.clone()), - )? - .into_array(), - Some(CompressionTree::new( - self, - vec![ - compressed_encoded.path, - compressed_patches.and_then(|p| p.path), - ], - )), + ALPArray::try_new(compressed_encoded.array, exponents, compressed_patches)? + .into_array(), + Some(CompressionTree::new(self, vec![compressed_encoded.path])), array, )) } diff --git a/vortex-sampling-compressor/src/compressors/bitpacked.rs b/vortex-sampling-compressor/src/compressors/bitpacked.rs index 597c2533cc..a1baa92e53 100644 --- a/vortex-sampling-compressor/src/compressors/bitpacked.rs +++ b/vortex-sampling-compressor/src/compressors/bitpacked.rs @@ -75,7 +75,7 @@ impl EncodingCompressor for BitPackedCompressor { fn compress<'a>( &'a self, array: &ArrayData, - like: Option>, + _like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { let parray = array.clone().into_primitive()?; @@ -107,7 +107,7 @@ impl EncodingCompressor for BitPackedCompressor { ctx.auxiliary("patches") .excluding(&BITPACK_WITH_PATCHES) .including(&BITPACK_NO_PATCHES) - .compress(&p, like.as_ref().and_then(|l| l.child(0))) + .compress_patches(p) }) }) .flatten() @@ -118,15 +118,12 @@ impl EncodingCompressor for BitPackedCompressor { packed_buffer, parray.ptype(), validity, - patches.as_ref().map(|p| p.array.clone()), + patches, bit_width, parray.len(), )? .into_array(), - Some(CompressionTree::new( - self, - vec![patches.and_then(|p| p.path)], - )), + Some(CompressionTree::new(self, vec![])), array, )) } diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 9d713e7ec6..93a173cc4b 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -107,7 +107,7 @@ impl ChunkedCompressor { .unwrap_or(false); if ratio > 1.0 || exceeded_target_ratio { - log::info!("unsatisfactory ratio {}, previous: {:?}", ratio, previous); + log::debug!("unsatisfactory ratio {}, previous: {:?}", ratio, previous); let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); diff --git a/vortex-sampling-compressor/src/sampling_compressor.rs b/vortex-sampling-compressor/src/sampling_compressor.rs index 84fbc0fa50..c69e3d9950 100644 --- a/vortex-sampling-compressor/src/sampling_compressor.rs +++ b/vortex-sampling-compressor/src/sampling_compressor.rs @@ -11,6 +11,7 @@ use vortex_array::compress::{ }; use vortex_array::compute::slice; use vortex_array::encoding::{Encoding, EncodingRef}; +use vortex_array::patches::Patches; use vortex_array::validity::Validity; use vortex_array::{ArrayDType, ArrayData, IntoCanonical}; use vortex_error::{VortexExpect as _, VortexResult}; @@ -20,6 +21,7 @@ use super::compressors::struct_::StructCompressor; use super::{CompressConfig, Objective, DEFAULT_COMPRESSORS}; use crate::compressors::constant::ConstantCompressor; use crate::compressors::{CompressedArray, CompressionTree, CompressorRef, EncodingCompressor}; +use crate::downscale::downscale_integer_array; use crate::sampling::stratified_slices; #[derive(Debug, Clone)] @@ -146,7 +148,7 @@ impl<'a> SamplingCompressor<'a> { check_statistics_unchanged(arr, compressed.as_ref()); return Ok(compressed); } else { - log::warn!("{} cannot compress {} like {}", self, arr, l); + log::info!("{} cannot compress {} like {}", self, arr, l); } } @@ -166,6 +168,15 @@ impl<'a> SamplingCompressor<'a> { } } + pub fn compress_patches(&self, patches: Patches) -> VortexResult { + Ok(Patches::new( + patches.array_len(), + self.compress(&downscale_integer_array(patches.indices().clone())?, None)? + .into_array(), + self.compress(patches.values(), None)?.into_array(), + )) + } + pub(crate) fn compress_array(&self, array: &ArrayData) -> VortexResult> { let mut rng = StdRng::seed_from_u64(self.options.rng_seed);