Skip to content

Commit

Permalink
feat: Faster bitpacked filter & take (#1667)
Browse files Browse the repository at this point in the history
Instead of using Itertools::chunk_by we do our own iteration that is
aware of magic 1024 element treshold

---------

Co-authored-by: Andrew Duffy <[email protected]>
  • Loading branch information
robert3005 and a10y authored Dec 12, 2024
1 parent b8d3ff2 commit 7873c9d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 31 deletions.
15 changes: 5 additions & 10 deletions encodings/fastlanes/src/bitpacking/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use arrow_buffer::ArrowNativeType;
use fastlanes::BitPacking;
use itertools::Itertools;
use vortex_array::array::PrimitiveArray;
use vortex_array::compute::{filter, FilterFn, FilterIter, FilterMask};
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_dtype::{match_each_unsigned_integer_ptype, NativePType};
use vortex_error::VortexResult;

use super::chunked_indices;
use crate::bitpacking::compute::take::UNPACK_CHUNK_THRESHOLD;
use crate::{BitPackedArray, BitPackedEncoding};

Expand Down Expand Up @@ -66,19 +66,14 @@ fn filter_indices<T: NativePType + BitPacking + ArrowNativeType>(
let mut values = Vec::with_capacity(indices_len);

// Some re-usable memory to store per-chunk indices.
let mut indices_within_chunk: Vec<usize> = Vec::with_capacity(1024);
let mut unpacked = [T::zero(); 1024];
let packed_bytes = array.packed_slice::<T>();

// Group the indices by the FastLanes chunk they belong to.
let chunked = indices.chunk_by(|&idx| (idx + offset) / 1024);
let chunk_len = 128 * bit_width / size_of::<T>();
let chunk_size = 128 * bit_width / size_of::<T>();

chunked.into_iter().for_each(|(chunk_idx, indices)| {
let packed = &array.packed_slice::<T>()[chunk_idx * chunk_len..(chunk_idx + 1) * chunk_len];

// Re-use the indices buffer to store the indices within the current chunk.
indices_within_chunk.clear();
indices_within_chunk.extend(indices.map(|idx| (idx + offset) % 1024));
chunked_indices(indices, offset, |chunk_idx, indices_within_chunk| {
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];

if indices_within_chunk.len() == 1024 {
// Unpack the entire chunk.
Expand Down
30 changes: 30 additions & 0 deletions encodings/fastlanes/src/bitpacking/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,33 @@ impl ComputeVTable for BitPackedEncoding {
Some(self)
}
}

fn chunked_indices<F: FnMut(usize, &[usize])>(
mut indices: impl Iterator<Item = usize>,
offset: usize,
mut chunk_fn: F,
) {
let mut indices_within_chunk: Vec<usize> = Vec::with_capacity(1024);

let Some(first_idx) = indices.next() else {
return;
};

let mut current_chunk_idx = (first_idx + offset) / 1024;
indices_within_chunk.push((first_idx + offset) % 1024);
for idx in indices {
let new_chunk_idx = (idx + offset) / 1024;

if new_chunk_idx != current_chunk_idx {
chunk_fn(current_chunk_idx, &indices_within_chunk);
indices_within_chunk.clear();
}

current_chunk_idx = new_chunk_idx;
indices_within_chunk.push((idx + offset) % 1024);
}

if !indices_within_chunk.is_empty() {
chunk_fn(current_chunk_idx, &indices_within_chunk);
}
}
35 changes: 14 additions & 21 deletions encodings/fastlanes/src/bitpacking/compute/take.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use fastlanes::BitPacking;
use itertools::Itertools;
use vortex_array::array::PrimitiveArray;
use vortex_array::compute::{take, try_cast, TakeFn};
use vortex_array::variants::PrimitiveArrayTrait;
Expand All @@ -12,6 +11,7 @@ use vortex_dtype::{
};
use vortex_error::{VortexExpect as _, VortexResult};

use super::chunked_indices;
use crate::{unpack_single_primitive, BitPackedArray, BitPackedEncoding};

// assuming the buffer is already allocated (which will happen at most once) then unpacking
Expand Down Expand Up @@ -54,35 +54,30 @@ fn take_primitive<T: NativePType + BitPacking, I: NativePType>(
let packed = array.packed_slice::<T>();

// Group indices by 1024-element chunk, *without* allocating on the heap
let chunked_indices = &indices
.maybe_null_slice::<I>()
.iter()
.map(|i| {
i.to_usize()
.vortex_expect("index must be expressible as usize")
+ offset
})
.chunk_by(|idx| idx / 1024);
let indices_iter = indices.maybe_null_slice::<I>().iter().map(|i| {
i.to_usize()
.vortex_expect("index must be expressible as usize")
});

let mut output = Vec::with_capacity(indices.len());
let mut unpacked = [T::zero(); 1024];
let chunk_len = 128 * bit_width / size_of::<T>();

for (chunk, offsets) in chunked_indices {
let chunk_size = 128 * bit_width / size_of::<T>();
let packed_chunk = &packed[chunk * chunk_size..][..chunk_size];
chunked_indices(indices_iter, offset, |chunk_idx, indices_within_chunk| {
let packed = &packed[chunk_idx * chunk_len..][..chunk_len];

// array_chunks produced a fixed size array, doesn't heap allocate
let mut have_unpacked = false;
let mut offset_chunk_iter = offsets
// relativize indices to the start of the chunk
.map(|i| i % 1024)
let mut offset_chunk_iter = indices_within_chunk
.iter()
.copied()
.array_chunks::<UNPACK_CHUNK_THRESHOLD>();

// this loop only runs if we have at least UNPACK_CHUNK_THRESHOLD offsets
for offset_chunk in &mut offset_chunk_iter {
if !have_unpacked {
unsafe {
BitPacking::unchecked_unpack(bit_width, packed_chunk, &mut unpacked);
BitPacking::unchecked_unpack(bit_width, packed, &mut unpacked);
}
have_unpacked = true;
}
Expand All @@ -103,13 +98,11 @@ fn take_primitive<T: NativePType + BitPacking, I: NativePType>(
// we had fewer than UNPACK_CHUNK_THRESHOLD offsets in the first place,
// so we need to unpack each one individually
for index in remainder {
output.push(unsafe {
unpack_single_primitive::<T>(packed_chunk, bit_width, index)
});
output.push(unsafe { unpack_single_primitive::<T>(packed, bit_width, index) });
}
}
}
}
});

if let Some(patches) = array
.patches()
Expand Down

0 comments on commit 7873c9d

Please sign in to comment.