Skip to content

Commit

Permalink
Search sorted many
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Mar 28, 2024
2 parents f2284ed + 0c00972 commit 1a500ba
Show file tree
Hide file tree
Showing 19 changed files with 352 additions and 62 deletions.
24 changes: 13 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,26 @@ canonical representations of each of the logical data types. The canonical encod

### Compressed Encodings

Vortex includes a set of compressed encodings that can hold compression in-memory arrays allowing us to defer
compression. These are:
Vortex includes a set of highly data-parallel, vectorized encodings. These encodings each correspond to a compressed
in-memory array implementation, allowing us to defer decompression. Currently, these are:

* BitPacked
* Adaptive Lossless Floating Point (ALP)
* BitPacked (FastLanes)
* Constant
* Chunked
* Delta (FastLanes)
* Dictionary
* Frame-of-Reference
* Run-end
* Run-end Encoding
* RoaringUInt
* RoaringBool
* Sparse
* ZigZag

### Compression

Vortex's compression scheme is based on
the [BtrBlocks](https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf) paper.
Vortex's top-level compression strategy is based on the
[BtrBlocks](https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf) paper.

Roughly, for each chunk of data, a sample of at least ~1% of the data is taken. Compression is then attempted (
recursively) with a set of lightweight encodings. The best-performing combination of encodings is then chosen to encode
Expand Down Expand Up @@ -135,13 +137,13 @@ Vortex serde is currently in the design phase. The goals of this implementation
* Forward statistical information (such as sortedness) to consumers.
* To provide a building block for file format authors to store compressed array data.

## Vs Apache Arrow
## Integration with Apache Arrow

It is important to note that Vortex and Arrow have different design goals. As such, it is somewhat
unfair to make any comparison at all. But given both can be used as array libraries, it is worth noting the differences.
Apache Arrow is the de facto standard for interoperating on columnar array data. Naturally, Vortex is designed to
be maximally compatible with Apache Arrow. All Arrow arrays can be converted into Vortex arrays with zero-copy,
and a Vortex array constructed from an Arrow array can be converted back to Arrow, again with zero-copy.

Vortex is designed to be maximally compatible with Apache Arrow. All Arrow arrays can be converted into Vortex arrays
with zero-copy, and a Vortex array constructed from an Arrow array can be converted back to Arrow, again with zero-copy.
It is important to note that Vortex and Arrow have different--albeit complementary--goals.

Vortex explicitly separates logical types from physical encodings, distinguishing it from Arrow. This allows
Vortex to model more complex arrays while still exposing a logical interface. For example, Vortex can model a UTF8
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bench_vortex::reader::{take_lance, take_parquet, take_vortex};
use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex_compressed};
use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex};
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn random_access(c: &mut Criterion) {
Expand All @@ -8,7 +8,7 @@ fn random_access(c: &mut Criterion) {

let indices = [10, 11, 12, 13, 100_000, 3_000_000];

let taxi_vortex = taxi_data_vortex_compressed();
let taxi_vortex = taxi_data_vortex();
group.bench_function("vortex", |b| {
b.iter(|| black_box(take_vortex(&taxi_vortex, &indices).unwrap()))
});
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/src/bin/serde.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use log::LevelFilter;

use bench_vortex::reader::take_vortex;
use bench_vortex::setup_logger;
use bench_vortex::taxi_data::taxi_data_vortex;
use log::LevelFilter;

pub fn main() {
setup_logger(LevelFilter::Debug);
setup_logger(LevelFilter::Error);
let taxi_vortex = taxi_data_vortex();
let rows = take_vortex(&taxi_vortex, &[10, 11, 12, 13, 100_000, 3_000_000]).unwrap();
println!("TAKE TAXI DATA: {:?}", rows);
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn taxi_data_lance() -> PathBuf {
.unwrap()
}

pub fn taxi_data_vortex() -> PathBuf {
pub fn taxi_data_vortex_uncompressed() -> PathBuf {
idempotent("taxi-uncompressed.vortex", |path| {
let taxi_pq = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap();
Expand All @@ -76,7 +76,7 @@ pub fn taxi_data_vortex() -> PathBuf {
.unwrap()
}

pub fn taxi_data_vortex_compressed() -> PathBuf {
pub fn taxi_data_vortex() -> PathBuf {
idempotent("taxi.vortex", |path| {
let mut write = File::create(path).unwrap();
compress_vortex(&taxi_data_parquet(), &mut write)
Expand Down
9 changes: 8 additions & 1 deletion vortex-array/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,14 @@ impl CompressCtx {
.map(|c| c.compress(arr, Some(l), self.for_encoding(c)))
{
let compressed = compressed?;
assert_eq!(compressed.dtype(), arr.dtype());
if compressed.dtype() != arr.dtype() {
panic!(
"Compression changed dtype: {:?} -> {:?} for {}",
arr.dtype(),
compressed.dtype(),
display_tree(&compressed),
);
}
return Ok(compressed);
} else {
warn!(
Expand Down
12 changes: 11 additions & 1 deletion vortex-array/src/compute/search_sorted.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use vortex_error::{VortexError, VortexResult};

use crate::array::Array;
use crate::compute::flatten::flatten;
use crate::compute::ArrayCompute;
use crate::scalar::Scalar;
use log::info;
use std::cmp::Ordering;

pub enum SearchSortedSide {
Expand All @@ -19,7 +22,14 @@ pub fn search_sorted<T: Into<Scalar>>(
side: SearchSortedSide,
) -> VortexResult<usize> {
let scalar = target.into().cast(array.dtype())?;
array
if let Some(search_sorted) = array.search_sorted() {
return search_sorted.search_sorted(&scalar, side);
}

// Otherwise, flatten and try again.
info!("SearchSorted not implemented for {}, flattening", array);
flatten(array)?
.into_array()
.search_sorted()
.map(|f| f.search_sorted(&scalar, side))
.unwrap_or_else(|| {
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/ptype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub trait NativePType:
+ Into<Scalar>
+ TryFrom<Scalar, Error = VortexError>
+ Into<PScalar>
+ TryFrom<PScalar, Error = VortexError>
{
const PTYPE: PType;
}
Expand Down
40 changes: 31 additions & 9 deletions vortex-array/src/scalar/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::any;
use std::fmt::{Display, Formatter};
use std::mem::size_of;

use half::f16;

use crate::match_each_native_ptype;
use vortex_error::{VortexError, VortexResult};
use vortex_schema::{DType, Nullability};

use crate::match_each_native_ptype;
use crate::ptype::{NativePType, PType};
use crate::scalar::Scalar;

Expand Down Expand Up @@ -51,6 +52,15 @@ impl PrimitiveScalar {
self.value
}

pub fn typed_value<T: NativePType>(&self) -> Option<T> {
assert_eq!(
T::PTYPE,
self.ptype,
"typed_value called with incorrect ptype"
);
self.value.map(|v| v.try_into().unwrap())
}

#[inline]
pub fn ptype(&self) -> PType {
self.ptype
Expand Down Expand Up @@ -216,14 +226,29 @@ macro_rules! pscalar {
Scalar::Primitive(PrimitiveScalar {
value: Some(pscalar),
..
}) => match pscalar {
PScalar::$ptype(v) => Ok(v),
_ => Err(VortexError::InvalidDType(pscalar.ptype().into())),
},
}) => pscalar.try_into(),
_ => Err(VortexError::InvalidDType(value.dtype().clone())),
}
}
}

impl TryFrom<PScalar> for $T {
type Error = VortexError;

fn try_from(value: PScalar) -> Result<Self, Self::Error> {
match value {
PScalar::$ptype(v) => Ok(v),
_ => Err(VortexError::InvalidArgument(
format!(
"Expected {} type but got {}",
any::type_name::<Self>(),
value
)
.into(),
)),
}
}
}
};
}

Expand All @@ -241,10 +266,7 @@ pscalar!(f64, F64);

impl<T: NativePType> From<Option<T>> for Scalar {
fn from(value: Option<T>) -> Self {
match value {
Some(value) => PrimitiveScalar::some::<T>(value).into(),
None => PrimitiveScalar::none::<T>().into(),
}
PrimitiveScalar::nullable(value).into()
}
}

Expand Down
6 changes: 2 additions & 4 deletions vortex-dict/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ impl ScalarAtFn for DictArray {
impl TakeFn for DictArray {
fn take(&self, indices: &dyn Array) -> VortexResult<ArrayRef> {
let codes = take(self.codes(), indices)?;
// TODO(ngates): we could wrap this back up as a DictArray with the same dictionary.
// But we may later want to run some compaction function to ensure all values in the
// dictionary are actually used.
take(self.values(), &codes)
// TODO(ngates): Add function to remove unused entries from dictionary
Ok(DictArray::new(codes, self.values().clone()).to_array())
}
}

Expand Down
12 changes: 5 additions & 7 deletions vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use arrayref::array_ref;

use crate::{BitPackedArray, BitPackedEncoding};
use fastlanez_sys::TryBitPack;
use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::PrimitiveArray;
Expand All @@ -19,6 +18,9 @@ use vortex::stats::Stat;
use vortex::validity::ArrayValidity;
use vortex_error::VortexResult;

use crate::downcast::DowncastFastlanes;
use crate::{BitPackedArray, BitPackedEncoding};

impl EncodingCompression for BitPackedEncoding {
fn cost(&self) -> u8 {
0
Expand Down Expand Up @@ -65,7 +67,7 @@ impl EncodingCompression for BitPackedEncoding {
.unwrap()
.0;

let like_bp = like.map(|l| l.as_any().downcast_ref::<BitPackedArray>().unwrap());
let like_bp = like.map(|l| l.as_bitpacked());
let bit_width = best_bit_width(&bit_width_freq, bytes_per_exception(parray.ptype()));
let num_exceptions = count_exceptions(bit_width, &bit_width_freq);

Expand Down Expand Up @@ -301,11 +303,7 @@ mod test {
)
.unwrap();
assert_eq!(compressed.encoding().id(), BitPackedEncoding.id());
let bp = compressed
.as_any()
.downcast_ref::<BitPackedArray>()
.unwrap();
assert_eq!(bp.bit_width(), 6);
assert_eq!(compressed.as_bitpacked().bit_width(), 6);
}

#[test]
Expand Down
66 changes: 62 additions & 4 deletions vortex-fastlanes/src/bitpacking/compute.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use crate::bitpacking::compress::bitunpack;
use crate::BitPackedArray;
use itertools::Itertools;

use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef};
use vortex::compute::flatten::{flatten, FlattenFn, FlattenedArray};
use vortex::compute::as_contiguous::as_contiguous;
use vortex::compute::flatten::{flatten_primitive, FlattenFn, FlattenedArray};
use vortex::compute::take::{take, TakeFn};
use vortex::compute::ArrayCompute;
use vortex::match_each_integer_ptype;
use vortex_error::VortexResult;

use crate::bitpacking::compress::bitunpack;
use crate::downcast::DowncastFastlanes;
use crate::BitPackedArray;

impl ArrayCompute for BitPackedArray {
fn flatten(&self) -> Option<&dyn FlattenFn> {
Some(self)
Expand All @@ -24,6 +31,57 @@ impl FlattenFn for BitPackedArray {

impl TakeFn for BitPackedArray {
fn take(&self, indices: &dyn Array) -> VortexResult<ArrayRef> {
take(&flatten(self)?.into_array(), indices)
let prim_indices = flatten_primitive(indices)?;
// Group indices into 1024 chunks and relativise them to the beginning of each chunk
let relative_indices: Vec<(usize, Vec<u16>)> = match_each_integer_ptype!(prim_indices.ptype(), |$P| {
let groupped_indices = prim_indices
.typed_data::<$P>()
.iter()
.group_by(|idx| (**idx / 1024) as usize);
groupped_indices
.into_iter()
.map(|(k, g)| (k, g.map(|idx| (*idx % 1024) as u16).collect()))
.collect()
});

let taken = relative_indices
.into_iter()
.map(|(chunk, offsets)| {
let sliced = self.slice(chunk * 1024, (chunk + 1) * 1024)?;

take(
&bitunpack(sliced.as_bitpacked())?,
&PrimitiveArray::from(offsets),
)
})
.collect::<VortexResult<Vec<_>>>()?;
as_contiguous(&taken)
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding};
use vortex::array::{Array, EncodingRef};
use vortex::compress::{CompressConfig, CompressCtx};
use vortex::compute::take::take;

use crate::BitPackedEncoding;

#[test]
fn take_indices() {
let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]);
let ctx = CompressCtx::new(Arc::new(cfg));

let indices = PrimitiveArray::from(vec![0, 125, 2047, 2049, 2151, 2790]);
let unpacked = PrimitiveArray::from((0..4096).map(|i| (i % 63) as u8).collect::<Vec<_>>());
let bitpacked = ctx.compress(&unpacked, None).unwrap();
let result = take(&bitpacked, &indices).unwrap();
assert_eq!(result.encoding().id(), PrimitiveEncoding::ID);
let res_bytes = result.as_primitive().typed_data::<u8>();
assert_eq!(res_bytes, &[0, 62, 31, 33, 9, 18]);
}
}
Loading

0 comments on commit 1a500ba

Please sign in to comment.