diff --git a/encodings/fastlanes/src/bitpacking/compute/filter.rs b/encodings/fastlanes/src/bitpacking/compute/filter.rs index a0944683d..d9984dc62 100644 --- a/encodings/fastlanes/src/bitpacking/compute/filter.rs +++ b/encodings/fastlanes/src/bitpacking/compute/filter.rs @@ -1,6 +1,5 @@ use arrow_buffer::ArrowNativeType; use fastlanes::BitPacking; -use itertools::Itertools; use vortex_array::array::PrimitiveArray; use vortex_array::compute::{filter, FilterFn, FilterIter, FilterMask}; use vortex_array::variants::PrimitiveArrayTrait; @@ -8,6 +7,7 @@ use vortex_array::{ArrayData, IntoArrayData, IntoArrayVariant}; use vortex_dtype::{match_each_unsigned_integer_ptype, NativePType}; use vortex_error::VortexResult; +use super::chunked_indices; use crate::bitpacking::compute::take::UNPACK_CHUNK_THRESHOLD; use crate::{BitPackedArray, BitPackedEncoding}; @@ -66,19 +66,14 @@ fn filter_indices( let mut values = Vec::with_capacity(indices_len); // Some re-usable memory to store per-chunk indices. - let mut indices_within_chunk: Vec = Vec::with_capacity(1024); let mut unpacked = [T::zero(); 1024]; + let packed_bytes = array.packed_slice::(); // Group the indices by the FastLanes chunk they belong to. - let chunked = indices.chunk_by(|&idx| (idx + offset) / 1024); - let chunk_len = 128 * bit_width / size_of::(); + let chunk_size = 128 * bit_width / size_of::(); - chunked.into_iter().for_each(|(chunk_idx, indices)| { - let packed = &array.packed_slice::()[chunk_idx * chunk_len..(chunk_idx + 1) * chunk_len]; - - // Re-use the indices buffer to store the indices within the current chunk. - indices_within_chunk.clear(); - indices_within_chunk.extend(indices.map(|idx| (idx + offset) % 1024)); + chunked_indices(indices, offset, |chunk_idx, indices_within_chunk| { + let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size]; if indices_within_chunk.len() == 1024 { // Unpack the entire chunk. diff --git a/encodings/fastlanes/src/bitpacking/compute/mod.rs b/encodings/fastlanes/src/bitpacking/compute/mod.rs index 57ba36958..7dcbde73f 100644 --- a/encodings/fastlanes/src/bitpacking/compute/mod.rs +++ b/encodings/fastlanes/src/bitpacking/compute/mod.rs @@ -30,3 +30,33 @@ impl ComputeVTable for BitPackedEncoding { Some(self) } } + +fn chunked_indices( + mut indices: impl Iterator, + offset: usize, + mut chunk_fn: F, +) { + let mut indices_within_chunk: Vec = Vec::with_capacity(1024); + + let Some(first_idx) = indices.next() else { + return; + }; + + let mut current_chunk_idx = (first_idx + offset) / 1024; + indices_within_chunk.push((first_idx + offset) % 1024); + for idx in indices { + let new_chunk_idx = (idx + offset) / 1024; + + if new_chunk_idx != current_chunk_idx { + chunk_fn(current_chunk_idx, &indices_within_chunk); + indices_within_chunk.clear(); + } + + current_chunk_idx = new_chunk_idx; + indices_within_chunk.push((idx + offset) % 1024); + } + + if !indices_within_chunk.is_empty() { + chunk_fn(current_chunk_idx, &indices_within_chunk); + } +} diff --git a/encodings/fastlanes/src/bitpacking/compute/take.rs b/encodings/fastlanes/src/bitpacking/compute/take.rs index c433fd3e5..12911d2d8 100644 --- a/encodings/fastlanes/src/bitpacking/compute/take.rs +++ b/encodings/fastlanes/src/bitpacking/compute/take.rs @@ -1,5 +1,4 @@ use fastlanes::BitPacking; -use itertools::Itertools; use vortex_array::array::PrimitiveArray; use vortex_array::compute::{take, try_cast, TakeFn}; use vortex_array::variants::PrimitiveArrayTrait; @@ -12,6 +11,7 @@ use vortex_dtype::{ }; use vortex_error::{VortexExpect as _, VortexResult}; +use super::chunked_indices; use crate::{unpack_single_primitive, BitPackedArray, BitPackedEncoding}; // assuming the buffer is already allocated (which will happen at most once) then unpacking @@ -54,35 +54,30 @@ fn take_primitive( let packed = array.packed_slice::(); // Group indices by 1024-element chunk, *without* allocating on the heap - let chunked_indices = &indices - .maybe_null_slice::() - .iter() - .map(|i| { - i.to_usize() - .vortex_expect("index must be expressible as usize") - + offset - }) - .chunk_by(|idx| idx / 1024); + let indices_iter = indices.maybe_null_slice::().iter().map(|i| { + i.to_usize() + .vortex_expect("index must be expressible as usize") + }); let mut output = Vec::with_capacity(indices.len()); let mut unpacked = [T::zero(); 1024]; + let chunk_len = 128 * bit_width / size_of::(); - for (chunk, offsets) in chunked_indices { - let chunk_size = 128 * bit_width / size_of::(); - let packed_chunk = &packed[chunk * chunk_size..][..chunk_size]; + chunked_indices(indices_iter, offset, |chunk_idx, indices_within_chunk| { + let packed = &packed[chunk_idx * chunk_len..][..chunk_len]; // array_chunks produced a fixed size array, doesn't heap allocate let mut have_unpacked = false; - let mut offset_chunk_iter = offsets - // relativize indices to the start of the chunk - .map(|i| i % 1024) + let mut offset_chunk_iter = indices_within_chunk + .iter() + .copied() .array_chunks::(); // this loop only runs if we have at least UNPACK_CHUNK_THRESHOLD offsets for offset_chunk in &mut offset_chunk_iter { if !have_unpacked { unsafe { - BitPacking::unchecked_unpack(bit_width, packed_chunk, &mut unpacked); + BitPacking::unchecked_unpack(bit_width, packed, &mut unpacked); } have_unpacked = true; } @@ -103,13 +98,11 @@ fn take_primitive( // we had fewer than UNPACK_CHUNK_THRESHOLD offsets in the first place, // so we need to unpack each one individually for index in remainder { - output.push(unsafe { - unpack_single_primitive::(packed_chunk, bit_width, index) - }); + output.push(unsafe { unpack_single_primitive::(packed, bit_width, index) }); } } } - } + }); if let Some(patches) = array .patches()