Skip to content

Commit

Permalink
Add Take for Bitpacked array (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Mar 28, 2024
1 parent f3ce3ac commit 5008629
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 34 deletions.
4 changes: 2 additions & 2 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bench_vortex::reader::{take_lance, take_parquet, take_vortex};
use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex_compressed};
use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex};
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn random_access(c: &mut Criterion) {
Expand All @@ -8,7 +8,7 @@ fn random_access(c: &mut Criterion) {

let indices = [10, 11, 12, 13, 100_000, 3_000_000];

let taxi_vortex = taxi_data_vortex_compressed();
let taxi_vortex = taxi_data_vortex();
group.bench_function("vortex", |b| {
b.iter(|| black_box(take_vortex(&taxi_vortex, &indices).unwrap()))
});
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/src/bin/serde.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use log::LevelFilter;

use bench_vortex::reader::take_vortex;
use bench_vortex::setup_logger;
use bench_vortex::taxi_data::taxi_data_vortex;
use log::LevelFilter;

pub fn main() {
setup_logger(LevelFilter::Debug);
setup_logger(LevelFilter::Error);
let taxi_vortex = taxi_data_vortex();
let rows = take_vortex(&taxi_vortex, &[10, 11, 12, 13, 100_000, 3_000_000]).unwrap();
println!("TAKE TAXI DATA: {:?}", rows);
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn taxi_data_lance() -> PathBuf {
.unwrap()
}

pub fn taxi_data_vortex() -> PathBuf {
pub fn taxi_data_vortex_uncompressed() -> PathBuf {
idempotent("taxi-uncompressed.vortex", |path| {
let taxi_pq = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap();
Expand All @@ -76,7 +76,7 @@ pub fn taxi_data_vortex() -> PathBuf {
.unwrap()
}

pub fn taxi_data_vortex_compressed() -> PathBuf {
pub fn taxi_data_vortex() -> PathBuf {
idempotent("taxi.vortex", |path| {
let mut write = File::create(path).unwrap();
compress_vortex(&taxi_data_parquet(), &mut write)
Expand Down
9 changes: 8 additions & 1 deletion vortex-array/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,14 @@ impl CompressCtx {
.map(|c| c.compress(arr, Some(l), self.for_encoding(c)))
{
let compressed = compressed?;
assert_eq!(compressed.dtype(), arr.dtype());
if compressed.dtype() != arr.dtype() {
panic!(
"Compression changed dtype: {:?} -> {:?} for {}",
arr.dtype(),
compressed.dtype(),
display_tree(&compressed),
);
}
return Ok(compressed);
} else {
warn!(
Expand Down
12 changes: 11 additions & 1 deletion vortex-array/src/compute/search_sorted.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use vortex_error::{VortexError, VortexResult};

use crate::array::Array;
use crate::compute::flatten::flatten;
use crate::compute::ArrayCompute;
use crate::scalar::Scalar;
use log::info;
use std::cmp::Ordering;

pub enum SearchSortedSide {
Expand All @@ -19,7 +22,14 @@ pub fn search_sorted<T: Into<Scalar>>(
side: SearchSortedSide,
) -> VortexResult<usize> {
let scalar = target.into().cast(array.dtype())?;
array
if let Some(search_sorted) = array.search_sorted() {
return search_sorted.search_sorted(&scalar, side);
}

// Otherwise, flatten and try again.
info!("SearchSorted not implemented for {}, flattening", array);
flatten(array)?
.into_array()
.search_sorted()
.map(|f| f.search_sorted(&scalar, side))
.unwrap_or_else(|| {
Expand Down
5 changes: 1 addition & 4 deletions vortex-array/src/scalar/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,7 @@ pscalar!(f64, F64);

impl<T: NativePType> From<Option<T>> for Scalar {
fn from(value: Option<T>) -> Self {
match value {
Some(value) => PrimitiveScalar::some::<T>(value).into(),
None => PrimitiveScalar::none::<T>().into(),
}
PrimitiveScalar::nullable(value).into()
}
}

Expand Down
6 changes: 2 additions & 4 deletions vortex-dict/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ impl ScalarAtFn for DictArray {
impl TakeFn for DictArray {
fn take(&self, indices: &dyn Array) -> VortexResult<ArrayRef> {
let codes = take(self.codes(), indices)?;
// TODO(ngates): we could wrap this back up as a DictArray with the same dictionary.
// But we may later want to run some compaction function to ensure all values in the
// dictionary are actually used.
take(self.values(), &codes)
// TODO(ngates): Add function to remove unused entries from dictionary
Ok(DictArray::new(codes, self.values().clone()).to_array())
}
}

Expand Down
66 changes: 62 additions & 4 deletions vortex-fastlanes/src/bitpacking/compute.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use crate::bitpacking::compress::bitunpack;
use crate::BitPackedArray;
use itertools::Itertools;

use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef};
use vortex::compute::flatten::{flatten, FlattenFn, FlattenedArray};
use vortex::compute::as_contiguous::as_contiguous;
use vortex::compute::flatten::{flatten_primitive, FlattenFn, FlattenedArray};
use vortex::compute::take::{take, TakeFn};
use vortex::compute::ArrayCompute;
use vortex::match_each_integer_ptype;
use vortex_error::VortexResult;

use crate::bitpacking::compress::bitunpack;
use crate::downcast::DowncastFastlanes;
use crate::BitPackedArray;

impl ArrayCompute for BitPackedArray {
fn flatten(&self) -> Option<&dyn FlattenFn> {
Some(self)
Expand All @@ -24,6 +31,57 @@ impl FlattenFn for BitPackedArray {

impl TakeFn for BitPackedArray {
fn take(&self, indices: &dyn Array) -> VortexResult<ArrayRef> {
take(&flatten(self)?.into_array(), indices)
let prim_indices = flatten_primitive(indices)?;
// Group indices into 1024 chunks and relativise them to the beginning of each chunk
let relative_indices: Vec<(usize, Vec<u16>)> = match_each_integer_ptype!(prim_indices.ptype(), |$P| {
let groupped_indices = prim_indices
.typed_data::<$P>()
.iter()
.group_by(|idx| (**idx / 1024) as usize);
groupped_indices
.into_iter()
.map(|(k, g)| (k, g.map(|idx| (*idx % 1024) as u16).collect()))
.collect()
});

let taken = relative_indices
.into_iter()
.map(|(chunk, offsets)| {
let sliced = self.slice(chunk * 1024, (chunk + 1) * 1024)?;

take(
&bitunpack(sliced.as_bitpacked())?,
&PrimitiveArray::from(offsets),
)
})
.collect::<VortexResult<Vec<_>>>()?;
as_contiguous(&taken)
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding};
use vortex::array::{Array, EncodingRef};
use vortex::compress::{CompressConfig, CompressCtx};
use vortex::compute::take::take;

use crate::BitPackedEncoding;

#[test]
fn take_indices() {
let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]);
let ctx = CompressCtx::new(Arc::new(cfg));

let indices = PrimitiveArray::from(vec![0, 125, 2047, 2049, 2151, 2790]);
let unpacked = PrimitiveArray::from((0..4096).map(|i| (i % 63) as u8).collect::<Vec<_>>());
let bitpacked = ctx.compress(&unpacked, None).unwrap();
let result = take(&bitpacked, &indices).unwrap();
assert_eq!(result.encoding().id(), PrimitiveEncoding::ID);
let res_bytes = result.as_primitive().typed_data::<u8>();
assert_eq!(res_bytes, &[0, 62, 31, 33, 9, 18]);
}
}
45 changes: 40 additions & 5 deletions vortex-fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::cmp::min;
use std::sync::{Arc, RwLock};

use vortex::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef};
use vortex::compress::EncodingCompression;
use vortex::compute::flatten::flatten_primitive;
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::impl_array;
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stat, Stats, StatsCompute, StatsSet};
use vortex::validity::{ArrayValidity, Validity};
use vortex_error::VortexResult;
use vortex_schema::DType;
use vortex_error::{VortexError, VortexResult};
use vortex_schema::{DType, IntWidth, Nullability, Signedness};

mod compress;
mod compute;
Expand All @@ -26,6 +28,9 @@ pub struct BitPackedArray {
}

impl BitPackedArray {
const ENCODED_DTYPE: DType =
DType::Int(IntWidth::_8, Signedness::Unsigned, Nullability::NonNullable);

pub fn try_new(
encoded: ArrayRef,
validity: Option<Validity>,
Expand All @@ -34,10 +39,15 @@ impl BitPackedArray {
dtype: DType,
len: usize,
) -> VortexResult<Self> {
if encoded.dtype() != &Self::ENCODED_DTYPE {
return Err(VortexError::MismatchedTypes(
Self::ENCODED_DTYPE,
encoded.dtype().clone(),
));
}
if let Some(v) = &validity {
assert_eq!(v.len(), len);
}
// TODO(ngates): check encoded has type u8

Ok(Self {
encoded,
Expand Down Expand Up @@ -89,8 +99,33 @@ impl Array for BitPackedArray {
Stats::new(&self.stats, self)
}

fn slice(&self, _start: usize, _stop: usize) -> VortexResult<ArrayRef> {
unimplemented!("BitPackedArray::slice")
fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
if start % 1024 != 0 || stop % 1024 != 0 {
return flatten_primitive(self)?.slice(start, stop);
}

if start > self.len() {
return Err(VortexError::OutOfBounds(start, 0, self.len()));
}
// If we are slicing more than one 1024 element chunk beyond end, we consider this out of bounds
if stop / 1024 > ((self.len() + 1023) / 1024) {
return Err(VortexError::OutOfBounds(stop, 0, self.len()));
}

let encoded_start = (start / 8) * self.bit_width;
let encoded_stop = (stop / 8) * self.bit_width;
Self::try_new(
self.encoded().slice(encoded_start, encoded_stop)?,
self.validity()
.map(|v| v.slice(start, min(stop, self.len()))),
self.patches()
.map(|p| p.slice(start, min(stop, self.len())))
.transpose()?,
self.bit_width(),
self.dtype().clone(),
min(stop - start, self.len()),
)
.map(|a| a.into_array())
}

#[inline]
Expand Down
7 changes: 4 additions & 3 deletions vortex-fastlanes/src/for/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ impl ScalarAtFn for FoRArray {
(Scalar::Primitive(p), Scalar::Primitive(r)) => match p.value() {
None => Ok(encoded_scalar),
Some(pv) => match_each_integer_ptype!(pv.ptype(), |$P| {
Ok(PrimitiveScalar::some::<$P>(
(p.typed_value::<$P>().unwrap() << self.shift()) + r.typed_value::<$P>().unwrap(),
).into())
Ok(PrimitiveScalar::try_new::<$P>(
Some((p.typed_value::<$P>().unwrap() << self.shift()) + r.typed_value::<$P>().unwrap()),
p.dtype().nullability()
).unwrap().into())
}),
},
_ => unreachable!("Reference and encoded values had different dtypes"),
Expand Down
7 changes: 1 addition & 6 deletions vortex-fastlanes/src/for/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@ impl FoRArray {
"Reference value cannot be null".into(),
));
}
if child.dtype() != reference.dtype() {
return Err(VortexError::MismatchedTypes(
child.dtype().clone(),
reference.dtype().clone(),
));
}
let reference = reference.cast(child.dtype())?;
Ok(Self {
encoded: child,
reference,
Expand Down

0 comments on commit 5008629

Please sign in to comment.