From c56ba9802fcbefea70ddc90bc36d91c94419f32d Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 27 Mar 2024 13:33:38 +0000 Subject: [PATCH 1/4] Scalar_at for FORArray (#159) --- vortex-array/src/ptype.rs | 1 + vortex-array/src/scalar/primitive.rs | 35 ++++++++++-- vortex-fastlanes/src/bitpacking/compress.rs | 12 ++--- vortex-fastlanes/src/delta/compress.rs | 5 +- vortex-fastlanes/src/downcast.rs | 59 +++++++++++++++++++++ vortex-fastlanes/src/for/compress.rs | 12 ++--- vortex-fastlanes/src/for/compute.rs | 52 +++++++++++++++++- vortex-fastlanes/src/for/mod.rs | 14 ++++- vortex-fastlanes/src/lib.rs | 1 + 9 files changed, 167 insertions(+), 24 deletions(-) create mode 100644 vortex-fastlanes/src/downcast.rs diff --git a/vortex-array/src/ptype.rs b/vortex-array/src/ptype.rs index 03e15c4242..07ac842144 100644 --- a/vortex-array/src/ptype.rs +++ b/vortex-array/src/ptype.rs @@ -44,6 +44,7 @@ pub trait NativePType: + Into + TryFrom + Into + + TryFrom { const PTYPE: PType; } diff --git a/vortex-array/src/scalar/primitive.rs b/vortex-array/src/scalar/primitive.rs index 9f0a81c05b..97c5f5f43b 100644 --- a/vortex-array/src/scalar/primitive.rs +++ b/vortex-array/src/scalar/primitive.rs @@ -1,12 +1,13 @@ +use std::any; use std::fmt::{Display, Formatter}; use std::mem::size_of; use half::f16; -use crate::match_each_native_ptype; use vortex_error::{VortexError, VortexResult}; use vortex_schema::{DType, Nullability}; +use crate::match_each_native_ptype; use crate::ptype::{NativePType, PType}; use crate::scalar::Scalar; @@ -51,6 +52,15 @@ impl PrimitiveScalar { self.value } + pub fn typed_value(&self) -> Option { + assert_eq!( + T::PTYPE, + self.ptype, + "typed_value called with incorrect ptype" + ); + self.value.map(|v| v.try_into().unwrap()) + } + #[inline] pub fn ptype(&self) -> PType { self.ptype @@ -216,14 +226,29 @@ macro_rules! pscalar { Scalar::Primitive(PrimitiveScalar { value: Some(pscalar), .. - }) => match pscalar { - PScalar::$ptype(v) => Ok(v), - _ => Err(VortexError::InvalidDType(pscalar.ptype().into())), - }, + }) => pscalar.try_into(), _ => Err(VortexError::InvalidDType(value.dtype().clone())), } } } + + impl TryFrom for $T { + type Error = VortexError; + + fn try_from(value: PScalar) -> Result { + match value { + PScalar::$ptype(v) => Ok(v), + _ => Err(VortexError::InvalidArgument( + format!( + "Expected {} type but got {}", + any::type_name::(), + value + ) + .into(), + )), + } + } + } }; } diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index df52cde228..4f5e9cf1b6 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -1,6 +1,5 @@ use arrayref::array_ref; -use crate::{BitPackedArray, BitPackedEncoding}; use fastlanez_sys::TryBitPack; use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::PrimitiveArray; @@ -19,6 +18,9 @@ use vortex::stats::Stat; use vortex::validity::ArrayValidity; use vortex_error::VortexResult; +use crate::downcast::DowncastFastlanes; +use crate::{BitPackedArray, BitPackedEncoding}; + impl EncodingCompression for BitPackedEncoding { fn cost(&self) -> u8 { 0 @@ -65,7 +67,7 @@ impl EncodingCompression for BitPackedEncoding { .unwrap() .0; - let like_bp = like.map(|l| l.as_any().downcast_ref::().unwrap()); + let like_bp = like.map(|l| l.as_bitpacked()); let bit_width = best_bit_width(&bit_width_freq, bytes_per_exception(parray.ptype())); let num_exceptions = count_exceptions(bit_width, &bit_width_freq); @@ -301,11 +303,7 @@ mod test { ) .unwrap(); assert_eq!(compressed.encoding().id(), BitPackedEncoding.id()); - let bp = compressed - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(bp.bit_width(), 6); + assert_eq!(compressed.as_bitpacked().bit_width(), 6); } #[test] diff --git a/vortex-fastlanes/src/delta/compress.rs b/vortex-fastlanes/src/delta/compress.rs index f3218fc53d..1ff18aaead 100644 --- a/vortex-fastlanes/src/delta/compress.rs +++ b/vortex-fastlanes/src/delta/compress.rs @@ -16,6 +16,7 @@ use vortex::validity::ArrayValidity; use vortex::validity::Validity; use vortex_error::VortexResult; +use crate::downcast::DowncastFastlanes; use crate::{DeltaArray, DeltaEncoding}; impl EncodingCompression for DeltaEncoding { @@ -42,7 +43,7 @@ impl EncodingCompression for DeltaEncoding { ctx: CompressCtx, ) -> VortexResult { let parray = array.as_primitive(); - let like_delta = like.map(|l| l.as_any().downcast_ref::().unwrap()); + let like_delta = like.map(|l| l.as_delta()); let validity = ctx.compress_validity(parray.validity())?; @@ -219,7 +220,7 @@ mod test { .unwrap(); assert_eq!(compressed.encoding().id(), DeltaEncoding.id()); - let delta = compressed.as_any().downcast_ref::().unwrap(); + let delta = compressed.as_delta(); let decompressed = decompress(delta).unwrap(); let decompressed_slice = decompressed.typed_data::(); diff --git a/vortex-fastlanes/src/downcast.rs b/vortex-fastlanes/src/downcast.rs new file mode 100644 index 0000000000..69b7d6747c --- /dev/null +++ b/vortex-fastlanes/src/downcast.rs @@ -0,0 +1,59 @@ +use vortex::array::{Array, ArrayRef}; + +use crate::{BitPackedArray, DeltaArray, FoRArray}; + +mod private { + pub trait Sealed {} +} + +pub trait DowncastFastlanes: private::Sealed { + fn maybe_for(&self) -> Option<&FoRArray>; + + fn as_for(&self) -> &FoRArray { + self.maybe_for().unwrap() + } + + fn maybe_delta(&self) -> Option<&DeltaArray>; + + fn as_delta(&self) -> &DeltaArray { + self.maybe_delta().unwrap() + } + + fn maybe_bitpacked(&self) -> Option<&BitPackedArray>; + + fn as_bitpacked(&self) -> &BitPackedArray { + self.maybe_bitpacked().unwrap() + } +} + +impl private::Sealed for dyn Array + '_ {} + +impl DowncastFastlanes for dyn Array + '_ { + fn maybe_for(&self) -> Option<&FoRArray> { + self.as_any().downcast_ref() + } + + fn maybe_delta(&self) -> Option<&DeltaArray> { + self.as_any().downcast_ref() + } + + fn maybe_bitpacked(&self) -> Option<&BitPackedArray> { + self.as_any().downcast_ref() + } +} + +impl private::Sealed for ArrayRef {} + +impl DowncastFastlanes for ArrayRef { + fn maybe_for(&self) -> Option<&FoRArray> { + self.as_any().downcast_ref() + } + + fn maybe_delta(&self) -> Option<&DeltaArray> { + self.as_any().downcast_ref() + } + + fn maybe_bitpacked(&self) -> Option<&BitPackedArray> { + self.as_any().downcast_ref() + } +} diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index 2b7175ea9d..b7fd23e4b3 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -1,6 +1,7 @@ use itertools::Itertools; use num_traits::PrimInt; +use crate::downcast::DowncastFastlanes; use crate::{FoRArray, FoREncoding}; use vortex::array::constant::ConstantArray; use vortex::array::downcast::DowncastArrayBuiltin; @@ -59,10 +60,10 @@ impl EncodingCompression for FoREncoding { } }); - let compressed_child = ctx.named("for").excluding(&FoREncoding).compress( - &child, - like.map(|l| l.as_any().downcast_ref::().unwrap().encoded()), - )?; + let compressed_child = ctx + .named("for") + .excluding(&FoREncoding) + .compress(&child, like.map(|l| l.as_for().encoded()))?; let reference = parray.stats().get(&Stat::Min).unwrap(); Ok(FoRArray::try_new(compressed_child, reference, shift)?.into_array()) } @@ -163,8 +164,7 @@ mod test { let compressed = ctx.compress(&array, None).unwrap(); assert_eq!(compressed.encoding().id(), FoREncoding.id()); - let fa = compressed.as_any().downcast_ref::().unwrap(); - assert_eq!(fa.reference().try_into(), Ok(1_000_000u32)); + assert_eq!(compressed.as_for().reference().try_into(), Ok(1_000_000u32)); } #[test] diff --git a/vortex-fastlanes/src/for/compute.rs b/vortex-fastlanes/src/for/compute.rs index 7ef2fd3733..382bcf7910 100644 --- a/vortex-fastlanes/src/for/compute.rs +++ b/vortex-fastlanes/src/for/compute.rs @@ -1,16 +1,23 @@ -use crate::r#for::compress::decompress; -use crate::FoRArray; use vortex::array::{Array, ArrayRef}; use vortex::compute::flatten::{FlattenFn, FlattenedArray}; +use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; +use vortex::match_each_integer_ptype; +use vortex::scalar::{PrimitiveScalar, Scalar}; use vortex_error::VortexResult; +use crate::r#for::compress::decompress; +use crate::FoRArray; + impl ArrayCompute for FoRArray { fn flatten(&self) -> Option<&dyn FlattenFn> { Some(self) } + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } fn take(&self) -> Option<&dyn TakeFn> { Some(self) } @@ -32,3 +39,44 @@ impl TakeFn for FoRArray { .into_array()) } } + +impl ScalarAtFn for FoRArray { + fn scalar_at(&self, index: usize) -> VortexResult { + let encoded_scalar = scalar_at(self.encoded(), index)?; + + match (&encoded_scalar, self.reference()) { + (Scalar::Primitive(p), Scalar::Primitive(r)) => match p.value() { + None => Ok(encoded_scalar), + Some(pv) => match_each_integer_ptype!(pv.ptype(), |$P| { + Ok(PrimitiveScalar::some::<$P>( + (p.typed_value::<$P>().unwrap() << self.shift()) + r.typed_value::<$P>().unwrap(), + ).into()) + }), + }, + _ => unreachable!("Reference and encoded values had different dtypes"), + } + } +} + +#[cfg(test)] +mod test { + use vortex::array::primitive::PrimitiveArray; + use vortex::compress::{CompressCtx, EncodingCompression}; + use vortex::compute::scalar_at::scalar_at; + + use crate::FoREncoding; + + #[test] + fn for_scalar_at() { + let forarr = FoREncoding + .compress( + &PrimitiveArray::from(vec![11, 15, 19]), + None, + CompressCtx::default(), + ) + .unwrap(); + assert_eq!(scalar_at(&forarr, 0), Ok(11.into())); + assert_eq!(scalar_at(&forarr, 1), Ok(15.into())); + assert_eq!(scalar_at(&forarr, 2), Ok(19.into())); + } +} diff --git a/vortex-fastlanes/src/for/mod.rs b/vortex-fastlanes/src/for/mod.rs index c6a1c5b565..17d8d956ff 100644 --- a/vortex-fastlanes/src/for/mod.rs +++ b/vortex-fastlanes/src/for/mod.rs @@ -8,7 +8,7 @@ use vortex::scalar::Scalar; use vortex::serde::{ArraySerde, EncodingSerde}; use vortex::stats::{Stat, Stats, StatsCompute, StatsSet}; use vortex::validity::{ArrayValidity, Validity}; -use vortex_error::VortexResult; +use vortex_error::{VortexError, VortexResult}; use vortex_schema::DType; mod compress; @@ -25,7 +25,17 @@ pub struct FoRArray { impl FoRArray { pub fn try_new(child: ArrayRef, reference: Scalar, shift: u8) -> VortexResult { - // TODO(ngates): check the dtype of reference == child.dtype() + if reference.is_null() { + return Err(VortexError::InvalidArgument( + "Reference value cannot be null".into(), + )); + } + if child.dtype() != reference.dtype() { + return Err(VortexError::MismatchedTypes( + child.dtype().clone(), + reference.dtype().clone(), + )); + } Ok(Self { encoded: child, reference, diff --git a/vortex-fastlanes/src/lib.rs b/vortex-fastlanes/src/lib.rs index b9af02215f..717d3b5e21 100644 --- a/vortex-fastlanes/src/lib.rs +++ b/vortex-fastlanes/src/lib.rs @@ -10,6 +10,7 @@ use vortex::array::{EncodingRef, ENCODINGS}; mod bitpacking; mod delta; +mod downcast; mod r#for; #[distributed_slice(ENCODINGS)] From f3ce3ac6cc90e1ad688f03ecd4fa57a68abeca5a Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 28 Mar 2024 08:35:16 +0000 Subject: [PATCH 2/4] Add Take for REEArray (#162) --- vortex-ree/src/compute.rs | 47 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/vortex-ree/src/compute.rs b/vortex-ree/src/compute.rs index f5890a3883..6624c0d2cb 100644 --- a/vortex-ree/src/compute.rs +++ b/vortex-ree/src/compute.rs @@ -1,7 +1,10 @@ -use vortex::array::Array; -use vortex::compute::flatten::{flatten, FlattenFn, FlattenedArray}; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::{Array, ArrayRef}; +use vortex::compute::flatten::{flatten, flatten_primitive, FlattenFn, FlattenedArray}; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; +use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; +use vortex::match_each_integer_ptype; use vortex::scalar::Scalar; use vortex::validity::ArrayValidity; use vortex_error::{VortexError, VortexResult}; @@ -17,6 +20,10 @@ impl ArrayCompute for REEArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl FlattenFn for REEArray { @@ -45,3 +52,39 @@ impl ScalarAtFn for REEArray { scalar_at(self.values(), self.find_physical_index(index)?) } } + +impl TakeFn for REEArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + let primitive_indices = flatten_primitive(indices)?; + let physical_indices = match_each_integer_ptype!(primitive_indices.ptype(), |$P| { + primitive_indices + .typed_data::<$P>() + .iter() + .map(|idx| { + self.find_physical_index(*idx as usize) + .map(|loc| loc as u64) + }) + .collect::>>()? + }); + take(self.values(), &PrimitiveArray::from(physical_indices)) + } +} + +#[cfg(test)] +mod test { + use vortex::array::downcast::DowncastArrayBuiltin; + use vortex::array::primitive::PrimitiveArray; + use vortex::compute::take::take; + + use crate::REEArray; + + #[test] + fn ree_take() { + let ree = REEArray::encode(&PrimitiveArray::from(vec![ + 1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5, + ])) + .unwrap(); + let taken = take(&ree, &PrimitiveArray::from(vec![9, 8, 1, 3])).unwrap(); + assert_eq!(taken.as_primitive().typed_data::(), &[5, 5, 1, 4]); + } +} From 5008629f94d2b79f7586aba4608ad0a453221fad Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 28 Mar 2024 16:32:27 +0000 Subject: [PATCH 3/4] Add Take for Bitpacked array (#161) --- bench-vortex/benches/random_access.rs | 4 +- bench-vortex/src/bin/serde.rs | 5 +- bench-vortex/src/taxi_data.rs | 4 +- vortex-array/src/compress.rs | 9 ++- vortex-array/src/compute/search_sorted.rs | 12 +++- vortex-array/src/scalar/primitive.rs | 5 +- vortex-dict/src/compute.rs | 6 +- vortex-fastlanes/src/bitpacking/compute.rs | 66 ++++++++++++++++++++-- vortex-fastlanes/src/bitpacking/mod.rs | 45 +++++++++++++-- vortex-fastlanes/src/for/compute.rs | 7 ++- vortex-fastlanes/src/for/mod.rs | 7 +-- 11 files changed, 136 insertions(+), 34 deletions(-) diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index 94c9a0a599..c9b28ee1c2 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -1,5 +1,5 @@ use bench_vortex::reader::{take_lance, take_parquet, take_vortex}; -use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex_compressed}; +use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; fn random_access(c: &mut Criterion) { @@ -8,7 +8,7 @@ fn random_access(c: &mut Criterion) { let indices = [10, 11, 12, 13, 100_000, 3_000_000]; - let taxi_vortex = taxi_data_vortex_compressed(); + let taxi_vortex = taxi_data_vortex(); group.bench_function("vortex", |b| { b.iter(|| black_box(take_vortex(&taxi_vortex, &indices).unwrap())) }); diff --git a/bench-vortex/src/bin/serde.rs b/bench-vortex/src/bin/serde.rs index 7c9451011c..5539dff108 100644 --- a/bench-vortex/src/bin/serde.rs +++ b/bench-vortex/src/bin/serde.rs @@ -1,10 +1,11 @@ +use log::LevelFilter; + use bench_vortex::reader::take_vortex; use bench_vortex::setup_logger; use bench_vortex::taxi_data::taxi_data_vortex; -use log::LevelFilter; pub fn main() { - setup_logger(LevelFilter::Debug); + setup_logger(LevelFilter::Error); let taxi_vortex = taxi_data_vortex(); let rows = take_vortex(&taxi_vortex, &[10, 11, 12, 13, 100_000, 3_000_000]).unwrap(); println!("TAKE TAXI DATA: {:?}", rows); diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index 064232fa44..8b22b26651 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -52,7 +52,7 @@ pub fn taxi_data_lance() -> PathBuf { .unwrap() } -pub fn taxi_data_vortex() -> PathBuf { +pub fn taxi_data_vortex_uncompressed() -> PathBuf { idempotent("taxi-uncompressed.vortex", |path| { let taxi_pq = File::open(download_taxi_data()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap(); @@ -76,7 +76,7 @@ pub fn taxi_data_vortex() -> PathBuf { .unwrap() } -pub fn taxi_data_vortex_compressed() -> PathBuf { +pub fn taxi_data_vortex() -> PathBuf { idempotent("taxi.vortex", |path| { let mut write = File::create(path).unwrap(); compress_vortex(&taxi_data_parquet(), &mut write) diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index 1999b7810c..1f1b4d4629 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -178,7 +178,14 @@ impl CompressCtx { .map(|c| c.compress(arr, Some(l), self.for_encoding(c))) { let compressed = compressed?; - assert_eq!(compressed.dtype(), arr.dtype()); + if compressed.dtype() != arr.dtype() { + panic!( + "Compression changed dtype: {:?} -> {:?} for {}", + arr.dtype(), + compressed.dtype(), + display_tree(&compressed), + ); + } return Ok(compressed); } else { warn!( diff --git a/vortex-array/src/compute/search_sorted.rs b/vortex-array/src/compute/search_sorted.rs index f4ee59c21f..ed2c80fbb8 100644 --- a/vortex-array/src/compute/search_sorted.rs +++ b/vortex-array/src/compute/search_sorted.rs @@ -1,7 +1,10 @@ use vortex_error::{VortexError, VortexResult}; use crate::array::Array; +use crate::compute::flatten::flatten; +use crate::compute::ArrayCompute; use crate::scalar::Scalar; +use log::info; use std::cmp::Ordering; pub enum SearchSortedSide { @@ -19,7 +22,14 @@ pub fn search_sorted>( side: SearchSortedSide, ) -> VortexResult { let scalar = target.into().cast(array.dtype())?; - array + if let Some(search_sorted) = array.search_sorted() { + return search_sorted.search_sorted(&scalar, side); + } + + // Otherwise, flatten and try again. + info!("SearchSorted not implemented for {}, flattening", array); + flatten(array)? + .into_array() .search_sorted() .map(|f| f.search_sorted(&scalar, side)) .unwrap_or_else(|| { diff --git a/vortex-array/src/scalar/primitive.rs b/vortex-array/src/scalar/primitive.rs index 97c5f5f43b..d92f6eebe6 100644 --- a/vortex-array/src/scalar/primitive.rs +++ b/vortex-array/src/scalar/primitive.rs @@ -266,10 +266,7 @@ pscalar!(f64, F64); impl From> for Scalar { fn from(value: Option) -> Self { - match value { - Some(value) => PrimitiveScalar::some::(value).into(), - None => PrimitiveScalar::none::().into(), - } + PrimitiveScalar::nullable(value).into() } } diff --git a/vortex-dict/src/compute.rs b/vortex-dict/src/compute.rs index bcaa9341f5..9d562ec827 100644 --- a/vortex-dict/src/compute.rs +++ b/vortex-dict/src/compute.rs @@ -38,10 +38,8 @@ impl ScalarAtFn for DictArray { impl TakeFn for DictArray { fn take(&self, indices: &dyn Array) -> VortexResult { let codes = take(self.codes(), indices)?; - // TODO(ngates): we could wrap this back up as a DictArray with the same dictionary. - // But we may later want to run some compaction function to ensure all values in the - // dictionary are actually used. - take(self.values(), &codes) + // TODO(ngates): Add function to remove unused entries from dictionary + Ok(DictArray::new(codes, self.values().clone()).to_array()) } } diff --git a/vortex-fastlanes/src/bitpacking/compute.rs b/vortex-fastlanes/src/bitpacking/compute.rs index 1a0e44d17c..5cf51de9a3 100644 --- a/vortex-fastlanes/src/bitpacking/compute.rs +++ b/vortex-fastlanes/src/bitpacking/compute.rs @@ -1,11 +1,18 @@ -use crate::bitpacking::compress::bitunpack; -use crate::BitPackedArray; +use itertools::Itertools; + +use vortex::array::primitive::PrimitiveArray; use vortex::array::{Array, ArrayRef}; -use vortex::compute::flatten::{flatten, FlattenFn, FlattenedArray}; +use vortex::compute::as_contiguous::as_contiguous; +use vortex::compute::flatten::{flatten_primitive, FlattenFn, FlattenedArray}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; +use vortex::match_each_integer_ptype; use vortex_error::VortexResult; +use crate::bitpacking::compress::bitunpack; +use crate::downcast::DowncastFastlanes; +use crate::BitPackedArray; + impl ArrayCompute for BitPackedArray { fn flatten(&self) -> Option<&dyn FlattenFn> { Some(self) @@ -24,6 +31,57 @@ impl FlattenFn for BitPackedArray { impl TakeFn for BitPackedArray { fn take(&self, indices: &dyn Array) -> VortexResult { - take(&flatten(self)?.into_array(), indices) + let prim_indices = flatten_primitive(indices)?; + // Group indices into 1024 chunks and relativise them to the beginning of each chunk + let relative_indices: Vec<(usize, Vec)> = match_each_integer_ptype!(prim_indices.ptype(), |$P| { + let groupped_indices = prim_indices + .typed_data::<$P>() + .iter() + .group_by(|idx| (**idx / 1024) as usize); + groupped_indices + .into_iter() + .map(|(k, g)| (k, g.map(|idx| (*idx % 1024) as u16).collect())) + .collect() + }); + + let taken = relative_indices + .into_iter() + .map(|(chunk, offsets)| { + let sliced = self.slice(chunk * 1024, (chunk + 1) * 1024)?; + + take( + &bitunpack(sliced.as_bitpacked())?, + &PrimitiveArray::from(offsets), + ) + }) + .collect::>>()?; + as_contiguous(&taken) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use vortex::array::downcast::DowncastArrayBuiltin; + use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; + use vortex::array::{Array, EncodingRef}; + use vortex::compress::{CompressConfig, CompressCtx}; + use vortex::compute::take::take; + + use crate::BitPackedEncoding; + + #[test] + fn take_indices() { + let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); + let ctx = CompressCtx::new(Arc::new(cfg)); + + let indices = PrimitiveArray::from(vec![0, 125, 2047, 2049, 2151, 2790]); + let unpacked = PrimitiveArray::from((0..4096).map(|i| (i % 63) as u8).collect::>()); + let bitpacked = ctx.compress(&unpacked, None).unwrap(); + let result = take(&bitpacked, &indices).unwrap(); + assert_eq!(result.encoding().id(), PrimitiveEncoding::ID); + let res_bytes = result.as_primitive().typed_data::(); + assert_eq!(res_bytes, &[0, 62, 31, 33, 9, 18]); } } diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index 34e59aede6..b96bcf3ae1 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -1,14 +1,16 @@ +use std::cmp::min; use std::sync::{Arc, RwLock}; use vortex::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef}; use vortex::compress::EncodingCompression; +use vortex::compute::flatten::flatten_primitive; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; use vortex::impl_array; use vortex::serde::{ArraySerde, EncodingSerde}; use vortex::stats::{Stat, Stats, StatsCompute, StatsSet}; use vortex::validity::{ArrayValidity, Validity}; -use vortex_error::VortexResult; -use vortex_schema::DType; +use vortex_error::{VortexError, VortexResult}; +use vortex_schema::{DType, IntWidth, Nullability, Signedness}; mod compress; mod compute; @@ -26,6 +28,9 @@ pub struct BitPackedArray { } impl BitPackedArray { + const ENCODED_DTYPE: DType = + DType::Int(IntWidth::_8, Signedness::Unsigned, Nullability::NonNullable); + pub fn try_new( encoded: ArrayRef, validity: Option, @@ -34,10 +39,15 @@ impl BitPackedArray { dtype: DType, len: usize, ) -> VortexResult { + if encoded.dtype() != &Self::ENCODED_DTYPE { + return Err(VortexError::MismatchedTypes( + Self::ENCODED_DTYPE, + encoded.dtype().clone(), + )); + } if let Some(v) = &validity { assert_eq!(v.len(), len); } - // TODO(ngates): check encoded has type u8 Ok(Self { encoded, @@ -89,8 +99,33 @@ impl Array for BitPackedArray { Stats::new(&self.stats, self) } - fn slice(&self, _start: usize, _stop: usize) -> VortexResult { - unimplemented!("BitPackedArray::slice") + fn slice(&self, start: usize, stop: usize) -> VortexResult { + if start % 1024 != 0 || stop % 1024 != 0 { + return flatten_primitive(self)?.slice(start, stop); + } + + if start > self.len() { + return Err(VortexError::OutOfBounds(start, 0, self.len())); + } + // If we are slicing more than one 1024 element chunk beyond end, we consider this out of bounds + if stop / 1024 > ((self.len() + 1023) / 1024) { + return Err(VortexError::OutOfBounds(stop, 0, self.len())); + } + + let encoded_start = (start / 8) * self.bit_width; + let encoded_stop = (stop / 8) * self.bit_width; + Self::try_new( + self.encoded().slice(encoded_start, encoded_stop)?, + self.validity() + .map(|v| v.slice(start, min(stop, self.len()))), + self.patches() + .map(|p| p.slice(start, min(stop, self.len()))) + .transpose()?, + self.bit_width(), + self.dtype().clone(), + min(stop - start, self.len()), + ) + .map(|a| a.into_array()) } #[inline] diff --git a/vortex-fastlanes/src/for/compute.rs b/vortex-fastlanes/src/for/compute.rs index 382bcf7910..031db11fed 100644 --- a/vortex-fastlanes/src/for/compute.rs +++ b/vortex-fastlanes/src/for/compute.rs @@ -48,9 +48,10 @@ impl ScalarAtFn for FoRArray { (Scalar::Primitive(p), Scalar::Primitive(r)) => match p.value() { None => Ok(encoded_scalar), Some(pv) => match_each_integer_ptype!(pv.ptype(), |$P| { - Ok(PrimitiveScalar::some::<$P>( - (p.typed_value::<$P>().unwrap() << self.shift()) + r.typed_value::<$P>().unwrap(), - ).into()) + Ok(PrimitiveScalar::try_new::<$P>( + Some((p.typed_value::<$P>().unwrap() << self.shift()) + r.typed_value::<$P>().unwrap()), + p.dtype().nullability() + ).unwrap().into()) }), }, _ => unreachable!("Reference and encoded values had different dtypes"), diff --git a/vortex-fastlanes/src/for/mod.rs b/vortex-fastlanes/src/for/mod.rs index 17d8d956ff..38cdedc71a 100644 --- a/vortex-fastlanes/src/for/mod.rs +++ b/vortex-fastlanes/src/for/mod.rs @@ -30,12 +30,7 @@ impl FoRArray { "Reference value cannot be null".into(), )); } - if child.dtype() != reference.dtype() { - return Err(VortexError::MismatchedTypes( - child.dtype().clone(), - reference.dtype().clone(), - )); - } + let reference = reference.cast(child.dtype())?; Ok(Self { encoded: child, reference, From 0c00972cbc44dcc658e574697cd233c8e0de265a Mon Sep 17 00:00:00 2001 From: Will Manning Date: Thu, 28 Mar 2024 13:18:25 -0400 Subject: [PATCH 4/4] Update README.md (#168) --- README.md | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 3ff639fd9d..800850cb55 100644 --- a/README.md +++ b/README.md @@ -74,15 +74,17 @@ canonical representations of each of the logical data types. The canonical encod ### Compressed Encodings -Vortex includes a set of compressed encodings that can hold compression in-memory arrays allowing us to defer -compression. These are: +Vortex includes a set of highly data-parallel, vectorized encodings. These encodings each correspond to a compressed +in-memory array implementation, allowing us to defer decompression. Currently, these are: -* BitPacked +* Adaptive Lossless Floating Point (ALP) +* BitPacked (FastLanes) * Constant * Chunked +* Delta (FastLanes) * Dictionary * Frame-of-Reference -* Run-end +* Run-end Encoding * RoaringUInt * RoaringBool * Sparse @@ -90,8 +92,8 @@ compression. These are: ### Compression -Vortex's compression scheme is based on -the [BtrBlocks](https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf) paper. +Vortex's top-level compression strategy is based on the +[BtrBlocks](https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf) paper. Roughly, for each chunk of data, a sample of at least ~1% of the data is taken. Compression is then attempted ( recursively) with a set of lightweight encodings. The best-performing combination of encodings is then chosen to encode @@ -135,13 +137,13 @@ Vortex serde is currently in the design phase. The goals of this implementation * Forward statistical information (such as sortedness) to consumers. * To provide a building block for file format authors to store compressed array data. -## Vs Apache Arrow +## Integration with Apache Arrow -It is important to note that Vortex and Arrow have different design goals. As such, it is somewhat -unfair to make any comparison at all. But given both can be used as array libraries, it is worth noting the differences. +Apache Arrow is the de facto standard for interoperating on columnar array data. Naturally, Vortex is designed to +be maximally compatible with Apache Arrow. All Arrow arrays can be converted into Vortex arrays with zero-copy, +and a Vortex array constructed from an Arrow array can be converted back to Arrow, again with zero-copy. -Vortex is designed to be maximally compatible with Apache Arrow. All Arrow arrays can be converted into Vortex arrays -with zero-copy, and a Vortex array constructed from an Arrow array can be converted back to Arrow, again with zero-copy. +It is important to note that Vortex and Arrow have different--albeit complementary--goals. Vortex explicitly separates logical types from physical encodings, distinguishing it from Arrow. This allows Vortex to model more complex arrays while still exposing a logical interface. For example, Vortex can model a UTF8