diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index 11389180cc..1e0513c3d6 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -3,7 +3,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use bench_vortex::compress_taxi_data; use bench_vortex::taxi_data::download_taxi_data; -fn enc_compress(c: &mut Criterion) { +fn vortex_compress(c: &mut Criterion) { download_taxi_data(); let mut group = c.benchmark_group("end to end"); group.sample_size(10); @@ -11,5 +11,5 @@ fn enc_compress(c: &mut Criterion) { group.finish() } -criterion_group!(benches, enc_compress); +criterion_group!(benches, vortex_compress); criterion_main!(benches); diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index af63099220..21c6aa1f8e 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -1,8 +1,8 @@ -use arrow_array::RecordBatchReader; use std::fs::{create_dir_all, File}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use arrow_array::RecordBatchReader; use itertools::Itertools; use log::{info, warn, LevelFilter}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -19,7 +19,7 @@ use vortex::formatter::display_tree; use vortex_alp::ALPEncoding; use vortex_datetime::DateTimeEncoding; use vortex_dict::DictEncoding; -use vortex_fastlanes::{BitPackedEncoding, FoREncoding}; +use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; use vortex_ree::REEEncoding; use vortex_roaring::RoaringBoolEncoding; use vortex_schema::DType; @@ -58,8 +58,7 @@ pub fn enumerate_arrays() -> Vec<EncodingRef> { &BitPackedEncoding, &FoREncoding, &DateTimeEncoding, - // DeltaEncoding, - // FFoREncoding, + &DeltaEncoding, &REEEncoding, &RoaringBoolEncoding, // RoaringIntEncoding, @@ -148,7 +147,6 @@ mod test { use crate::taxi_data::download_taxi_data; use crate::{compress_ctx, compress_taxi_data, setup_logger}; - #[ignore] #[test] fn compression_ratio() { setup_logger(LevelFilter::Debug); diff --git a/deps/fastlanez b/deps/fastlanez index 851765c29e..1876907db2 160000 --- a/deps/fastlanez +++ b/deps/fastlanez @@ -1 +1 @@ -Subproject commit 851765c29e45d26b4aaee29863ac56d9f31df80c +Subproject commit 1876907db2e9b1e26e3a445cdac474e0300847d0 diff --git a/fastlanez-sys/src/lib.rs b/fastlanez-sys/src/lib.rs index aefba20009..dbba34f86a 100644 --- a/fastlanez-sys/src/lib.rs +++ b/fastlanez-sys/src/lib.rs @@ -1,5 +1,7 @@ #![allow(incomplete_features)] #![feature(generic_const_exprs)] +#![feature(maybe_uninit_uninit_array)] +#![feature(maybe_uninit_array_assume_init)] #![allow(non_upper_case_globals)] #![allow(non_camel_case_types)] #![allow(non_snake_case)] @@ -12,10 +14,61 @@ use uninit::prelude::VecCapacity; include!(concat!(env!("OUT_DIR"), "/bindings.rs")); -pub struct Pred<const B: bool>; +pub fn transpose<T: Sized>(input: &[T; 1024], output: &mut [T; 1024]) { + unsafe { + match size_of::<T>() { + 1 => fl_transpose_u8( + input.as_ptr() as *const [u8; 1024], + output.as_ptr() as *mut [u8; 1024], + ), + 2 => fl_transpose_u16( + input.as_ptr() as *const [u16; 1024], + output.as_ptr() as *mut [u16; 1024], + ), + 4 => fl_transpose_u32( + input.as_ptr() as *const [u32; 1024], + output.as_ptr() as *mut [u32; 1024], + ), + 8 => fl_transpose_u64( + input.as_ptr() as *const [u64; 1024], + output.as_ptr() as *mut [u64; 1024], + ), + _ => unreachable!(), + } + } +} -pub trait Satisfied {} +pub fn untranspose<T: Sized>(input: &[T; 1024], output: &mut Vec<T>) { + unsafe { + match size_of::<T>() { + 1 => fl_untranspose_u8( + input.as_ptr() as *const [u8; 1024], + array_mut_ref![output.reserve_uninit(1024), 0, 1024] + as *mut [std::mem::MaybeUninit<T>; 1024] as *mut [u8; 1024], + ), + 2 => fl_untranspose_u16( + input.as_ptr() as *const [u16; 1024], + array_mut_ref![output.reserve_uninit(1024), 0, 1024] + as *mut [std::mem::MaybeUninit<T>; 1024] as *mut [u16; 1024], + ), + 4 => fl_untranspose_u32( + input.as_ptr() as *const [u32; 1024], + array_mut_ref![output.reserve_uninit(1024), 0, 1024] + as *mut [std::mem::MaybeUninit<T>; 1024] as *mut [u32; 1024], + ), + 8 => fl_untranspose_u64( + input.as_ptr() as *const [u64; 1024], + array_mut_ref![output.reserve_uninit(1024), 0, 1024] + as *mut [std::mem::MaybeUninit<T>; 1024] as *mut [u64; 1024], + ), + _ => unreachable!(), + } + output.set_len(output.len() + input.len()); + } +} +pub struct Pred<const B: bool>; +pub trait Satisfied {} impl Satisfied for Pred<true> {} /// BitPack into a compile-time known bit-width. @@ -147,3 +200,69 @@ bitpack_impl!(u8, 8); bitpack_impl!(u16, 16); bitpack_impl!(u32, 32); bitpack_impl!(u64, 64); + +pub trait Delta +where + Self: Sized + Copy + Default, +{ + /// input is assumed to already be in the transposed layout + /// call transpose() to convert from the original layout + fn encode_transposed( + input: &[Self; 1024], + base: &mut [Self; 128 / size_of::<Self>()], + output: &mut Vec<Self>, + ); + + /// output is still in the transposed layout + /// call untranspose() to put it back in the original layout + fn decode_transposed( + input: &[Self; 1024], + base: &mut [Self; 128 / size_of::<Self>()], + output: &mut [Self; 1024], + ); + + fn lanes() -> usize { + // fastlanez processes 1024 bits (128 bytes) at a time + 128 / std::mem::size_of::<Self>() + } +} + +macro_rules! delta_impl { + ($T:ty) => { + paste::item! { + impl Delta for $T { + fn encode_transposed( + input: &[Self; 1024], + base: &mut [Self; 128 / size_of::<Self>()], + output: &mut Vec<Self>, + ) { + unsafe { + [<fl_delta_encode_ $T>]( + input, + base, + array_mut_ref![output.reserve_uninit(1024), 0, 1024] as *mut [std::mem::MaybeUninit<Self>; 1024] as *mut [Self; 1024], + ); + output.set_len(output.len() + 1024); + } + } + + fn decode_transposed( + input: &[Self; 1024], + base: &mut [Self; 128 / size_of::<Self>()], + output: &mut [Self; 1024], + ) { + unsafe { [<fl_delta_decode_ $T>](input, base, output); } + } + } + } + }; +} + +delta_impl!(i8); +delta_impl!(i16); +delta_impl!(i32); +delta_impl!(i64); +delta_impl!(u8); +delta_impl!(u16); +delta_impl!(u32); +delta_impl!(u64); diff --git a/pyvortex/src/array.rs b/pyvortex/src/array.rs index b9e293d435..1da0702932 100644 --- a/pyvortex/src/array.rs +++ b/pyvortex/src/array.rs @@ -1,5 +1,4 @@ use paste::paste; -use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use vortex::array::bool::{BoolArray, BoolEncoding}; @@ -14,7 +13,9 @@ use vortex::array::varbinview::{VarBinViewArray, VarBinViewEncoding}; use vortex::array::{Array, ArrayKind, ArrayRef, EncodingRef}; use vortex_alp::{ALPArray, ALPEncoding}; use vortex_dict::{DictArray, DictEncoding}; -use vortex_fastlanes::{BitPackedArray, BitPackedEncoding, FoRArray, FoREncoding}; +use vortex_fastlanes::{ + BitPackedArray, BitPackedEncoding, DeltaArray, DeltaEncoding, FoRArray, FoREncoding, +}; use vortex_ree::{REEArray, REEEncoding}; use vortex_roaring::{RoaringBoolArray, RoaringBoolEncoding, RoaringIntArray, RoaringIntEncoding}; use vortex_zigzag::{ZigZagArray, ZigZagEncoding}; @@ -66,6 +67,7 @@ pyarray!(VarBinViewEncoding, VarBinViewArray, "VarBinViewArray"); pyarray!(ALPEncoding, ALPArray, "ALPArray"); pyarray!(BitPackedEncoding, BitPackedArray, "BitPackedArray"); pyarray!(FoREncoding, FoRArray, "FoRArray"); +pyarray!(DeltaEncoding, DeltaArray, "DeltaArray"); pyarray!(DictEncoding, DictArray, "DictArray"); pyarray!(REEEncoding, REEArray, "REEArray"); pyarray!(RoaringBoolEncoding, RoaringBoolArray, "RoaringBoolArray"); @@ -120,6 +122,10 @@ impl PyArray { PyALPArray::wrap(py, inner.into_any().downcast::<ALPArray>().unwrap())? .extract(py) } + DeltaEncoding::ID => { + PyDeltaArray::wrap(py, inner.into_any().downcast::<DeltaArray>().unwrap())? + .extract(py) + } DictEncoding::ID => { PyDictArray::wrap(py, inner.into_any().downcast::<DictArray>().unwrap())? .extract(py) @@ -151,10 +157,7 @@ impl PyArray { PyZigZagArray::wrap(py, inner.into_any().downcast::<ZigZagArray>().unwrap())? .extract(py) } - _ => Err(PyValueError::new_err(format!( - "Cannot convert {:?} to enc array", - inner - ))), + _ => Py::new(py, Self { inner }), }, } } diff --git a/pyvortex/src/lib.rs b/pyvortex/src/lib.rs index b4d16a207d..6f93ba046d 100644 --- a/pyvortex/src/lib.rs +++ b/pyvortex/src/lib.rs @@ -39,17 +39,16 @@ fn _lib(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::<PyChunkedArray>()?; m.add_class::<PyCompositeArray>()?; m.add_class::<PyConstantArray>()?; + m.add_class::<PyDeltaArray>()?; m.add_class::<PyFoRArray>()?; m.add_class::<PyPrimitiveArray>()?; m.add_class::<PyREEArray>()?; + m.add_class::<PyRoaringBoolArray>()?; + m.add_class::<PyRoaringIntArray>()?; m.add_class::<PySparseArray>()?; m.add_class::<PyStructArray>()?; m.add_class::<PyVarBinArray>()?; m.add_class::<PyVarBinViewArray>()?; - - m.add_class::<PyRoaringBoolArray>()?; - m.add_class::<PyRoaringIntArray>()?; - m.add_class::<PyZigZagArray>()?; m.add_class::<PyDType>()?; diff --git a/pyvortex/test/test_compress.py b/pyvortex/test/test_compress.py index 2f89ef175d..9f5406adac 100644 --- a/pyvortex/test/test_compress.py +++ b/pyvortex/test/test_compress.py @@ -36,10 +36,11 @@ def test_roaring_bool_encode(): assert rarr.nbytes < a.nbytes -def test_roaring_int_encode(): +def test_arange_encode(): a = vortex.encode(pa.array(np.arange(10_000), type=pa.uint32())) compressed = vortex.compress(a) - assert compressed.encoding == "roaring.int" + assert isinstance(compressed, vortex.DeltaArray) or isinstance(compressed, vortex.RoaringIntArray) + assert compressed.nbytes < a.nbytes def test_zigzag_encode(): diff --git a/requirements-dev.lock b/requirements-dev.lock index 5654ef0998..39e324aa54 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -5,7 +5,6 @@ # pre: false # features: [] # all-features: false -# with-sources: false -e file:pyvortex -e file:. diff --git a/requirements.lock b/requirements.lock index 8f59ff5e9c..c9283eb85a 100644 --- a/requirements.lock +++ b/requirements.lock @@ -5,7 +5,6 @@ # pre: false # features: [] # all-features: false -# with-sources: false -e file:pyvortex -e file:. diff --git a/vortex-array/src/ptype.rs b/vortex-array/src/ptype.rs index 2b0d8e0eec..aaf08b795a 100644 --- a/vortex-array/src/ptype.rs +++ b/vortex-array/src/ptype.rs @@ -133,6 +133,26 @@ impl PType { pub const fn bit_width(&self) -> usize { self.byte_width() * 8 } + + pub fn to_signed(self) -> PType { + match self { + PType::U8 => PType::I8, + PType::U16 => PType::I16, + PType::U32 => PType::I32, + PType::U64 => PType::I64, + _ => self, + } + } + + pub fn to_unsigned(self) -> PType { + match self { + PType::I8 => PType::U8, + PType::I16 => PType::U16, + PType::I32 => PType::U32, + PType::I64 => PType::U64, + _ => self, + } + } } impl Display for PType { diff --git a/vortex-fastlanes/src/delta/compress.rs b/vortex-fastlanes/src/delta/compress.rs new file mode 100644 index 0000000000..f80bd30785 --- /dev/null +++ b/vortex-fastlanes/src/delta/compress.rs @@ -0,0 +1,229 @@ +use std::mem::size_of; + +use arrayref::array_ref; +use num_traits::{WrappingAdd, WrappingSub}; + +use fastlanez_sys::{transpose, untranspose, Delta}; +use vortex::array::downcast::DowncastArrayBuiltin; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::{Array, ArrayRef}; +use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compute::fill::fill_forward; +use vortex::compute::flatten::flatten_primitive; +use vortex::error::VortexResult; +use vortex::match_each_integer_ptype; +use vortex::ptype::NativePType; + +use crate::{DeltaArray, DeltaEncoding}; + +impl EncodingCompression for DeltaEncoding { + fn can_compress( + &self, + array: &dyn Array, + _config: &CompressConfig, + ) -> Option<&dyn EncodingCompression> { + // Only support primitive arrays + let parray = array.maybe_primitive()?; + + // Only supports ints + if !parray.ptype().is_int() { + return None; + } + + Some(self) + } + + fn compress( + &self, + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, + ) -> VortexResult<ArrayRef> { + let parray = array.as_primitive(); + let like_delta = like.map(|l| l.as_any().downcast_ref::<DeltaArray>().unwrap()); + + let validity = parray + .validity() + .map(|v| { + ctx.auxiliary("validity") + .compress(v.as_ref(), like_delta.and_then(|d| d.validity())) + }) + .transpose()?; + + // Fill forward nulls + let filled = fill_forward(array)?; + + // Compress the filled array + let (bases, deltas) = match_each_integer_ptype!(parray.ptype(), |$T| { + let (bases, deltas) = compress_primitive(filled.as_primitive().typed_data::<$T>()); + (PrimitiveArray::from(bases), PrimitiveArray::from(deltas)) + }); + + // Recursively compress the bases and deltas + let bases = ctx + .named("bases") + .compress(&bases, like_delta.map(|d| d.bases()))?; + let deltas = ctx + .named("deltas") + .compress(&deltas, like_delta.map(|d| d.deltas()))?; + + Ok(DeltaArray::try_new(array.len(), bases, deltas, validity)?.into_array()) + } +} + +fn compress_primitive<T: NativePType + Delta + WrappingSub>(array: &[T]) -> (Vec<T>, Vec<T>) +where + [(); 128 / size_of::<T>()]:, +{ + // How many fastlanes vectors we will process. + let num_chunks = array.len() / 1024; + + // How long each base vector will be. + let lanes = T::lanes(); + + // Allocate result arrays. + let mut bases = Vec::with_capacity(num_chunks * lanes + 1); + let mut deltas = Vec::with_capacity(array.len()); + + // Loop over all of the 1024-element chunks. + if num_chunks > 0 { + let mut transposed: [T; 1024] = [T::default(); 1024]; + let mut base = [T::default(); 128 / size_of::<T>()]; + assert_eq!(base.len(), lanes); + + for i in 0..num_chunks { + let start_elem = i * 1024; + let chunk: &[T; 1024] = array_ref![array, start_elem, 1024]; + transpose(chunk, &mut transposed); + + // Initialize and store the base vector for each chunk + base.copy_from_slice(&transposed[0..lanes]); + bases.extend(base); + + Delta::encode_transposed(&transposed, &mut base, &mut deltas); + } + } + + // To avoid padding, the remainder is encoded with scalar logic. + let remainder_size = array.len() % 1024; + if remainder_size > 0 { + let chunk = &array[array.len() - remainder_size..]; + let mut base_scalar = chunk[0]; + bases.push(base_scalar); + for next in chunk { + let diff = next.wrapping_sub(&base_scalar); + deltas.push(diff); + base_scalar = *next; + } + } + + assert_eq!( + bases.len(), + num_chunks * lanes + (if remainder_size > 0 { 1 } else { 0 }) + ); + assert_eq!(deltas.len(), array.len()); + + (bases, deltas) +} + +pub fn decompress(array: &DeltaArray) -> VortexResult<PrimitiveArray> { + let bases = flatten_primitive(array.bases())?; + let deltas = flatten_primitive(array.deltas())?; + let decoded = match_each_integer_ptype!(deltas.ptype(), |$T| { + PrimitiveArray::from_nullable( + decompress_primitive::<$T>(bases.typed_data(), deltas.typed_data()), + array.validity().cloned() + ) + }); + Ok(decoded) +} + +fn decompress_primitive<T: NativePType + Delta + WrappingAdd>(bases: &[T], deltas: &[T]) -> Vec<T> +where + [(); 128 / size_of::<T>()]:, +{ + // How many fastlanes vectors we will process. + let num_chunks = deltas.len() / 1024; + + // How long each base vector will be. + let lanes = T::lanes(); + + // Allocate a result array. + let mut output = Vec::with_capacity(deltas.len()); + + // Loop over all the chunks + if num_chunks > 0 { + let mut transposed: [T; 1024] = [T::default(); 1024]; + let mut base = [T::default(); 128 / size_of::<T>()]; + assert_eq!(base.len(), lanes); + + for i in 0..num_chunks { + let start_elem = i * 1024; + let chunk: &[T; 1024] = array_ref![deltas, start_elem, 1024]; + + // Initialize the base vector for this chunk + base.copy_from_slice(&bases[i * lanes..(i + 1) * lanes]); + Delta::decode_transposed(chunk, &mut base, &mut transposed); + untranspose(&transposed, &mut output); + } + } + assert_eq!(output.len() % 1024, 0); + + // The remainder was encoded with scalar logic, so we need to scalar decode it. + let remainder_size = deltas.len() % 1024; + if remainder_size > 0 { + let chunk = &deltas[num_chunks * 1024..]; + assert_eq!(bases.len(), num_chunks * lanes + 1); + let mut base_scalar = bases[num_chunks * lanes]; + for next_diff in chunk { + let next = next_diff.wrapping_add(&base_scalar); + output.push(next); + base_scalar = next; + } + } + + output +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use vortex::array::{Encoding, EncodingRef}; + + use super::*; + + fn compress_ctx() -> CompressCtx { + let cfg = CompressConfig::new().with_enabled([&DeltaEncoding as EncodingRef]); + CompressCtx::new(Arc::new(cfg)) + } + + #[test] + fn test_compress() { + do_roundtrip_test(Vec::from_iter(0..10_000)); + } + + #[test] + fn test_compress_overflow() { + do_roundtrip_test(Vec::from_iter( + (0..10_000).map(|i| (i % (u8::MAX as i32)) as u8), + )); + } + + fn do_roundtrip_test<T: NativePType>(input: Vec<T>) { + let ctx = compress_ctx(); + let compressed = DeltaEncoding {} + .compress(&PrimitiveArray::from(input.clone()), None, ctx) + .unwrap(); + + assert_eq!(compressed.encoding().id(), DeltaEncoding.id()); + let delta = compressed.as_any().downcast_ref::<DeltaArray>().unwrap(); + + let decompressed = decompress(delta).unwrap(); + let decompressed_slice = decompressed.typed_data::<T>(); + assert_eq!(decompressed_slice.len(), input.len()); + for (actual, expected) in decompressed_slice.iter().zip(input) { + assert_eq!(actual, &expected); + } + } +} diff --git a/vortex-fastlanes/src/delta/compute.rs b/vortex-fastlanes/src/delta/compute.rs new file mode 100644 index 0000000000..4687b4d28c --- /dev/null +++ b/vortex-fastlanes/src/delta/compute.rs @@ -0,0 +1,18 @@ +use vortex::compute::flatten::{FlattenFn, FlattenedArray}; +use vortex::compute::ArrayCompute; +use vortex::error::VortexResult; + +use crate::delta::compress::decompress; +use crate::DeltaArray; + +impl ArrayCompute for DeltaArray { + fn flatten(&self) -> Option<&dyn FlattenFn> { + Some(self) + } +} + +impl FlattenFn for DeltaArray { + fn flatten(&self) -> VortexResult<FlattenedArray> { + decompress(self).map(FlattenedArray::Primitive) + } +} diff --git a/vortex-fastlanes/src/delta/mod.rs b/vortex-fastlanes/src/delta/mod.rs new file mode 100644 index 0000000000..7d2636a0b9 --- /dev/null +++ b/vortex-fastlanes/src/delta/mod.rs @@ -0,0 +1,195 @@ +use std::sync::{Arc, RwLock}; + +use vortex::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef}; +use vortex::compress::EncodingCompression; +use vortex::compute::scalar_at::scalar_at; +use vortex::error::{VortexError, VortexResult}; +use vortex::formatter::{ArrayDisplay, ArrayFormatter}; +use vortex::serde::{ArraySerde, EncodingSerde}; +use vortex::stats::{Stat, Stats, StatsCompute, StatsSet}; +use vortex::{impl_array, match_each_integer_ptype}; +use vortex_schema::DType; + +mod compress; +mod compute; +mod serde; + +#[derive(Debug, Clone)] +pub struct DeltaArray { + len: usize, + bases: ArrayRef, + deltas: ArrayRef, + validity: Option<ArrayRef>, + stats: Arc<RwLock<StatsSet>>, +} + +impl DeltaArray { + pub fn try_new( + len: usize, + bases: ArrayRef, + deltas: ArrayRef, + validity: Option<ArrayRef>, + ) -> VortexResult<Self> { + if bases.dtype() != deltas.dtype() { + return Err(VortexError::InvalidArgument( + format!( + "DeltaArray: bases and deltas must have the same dtype, got {:?} and {:?}", + bases.dtype(), + deltas.dtype() + ) + .into(), + )); + } + if deltas.len() != len { + return Err(VortexError::InvalidArgument( + format!( + "DeltaArray: provided deltas array of len {} does not match array len {}", + deltas.len(), + len + ) + .into(), + )); + } + + let delta = Self { + len, + bases, + deltas, + validity, + stats: Arc::new(RwLock::new(StatsSet::new())), + }; + + let expected_bases_len = { + let num_chunks = len / 1024; + let remainder_base_size = if len % 1024 > 0 { 1 } else { 0 }; + num_chunks * delta.lanes() + remainder_base_size + }; + if delta.bases.len() != expected_bases_len { + return Err(VortexError::InvalidArgument( + format!( + "DeltaArray: bases.len() ({}) != expected_bases_len ({}), based on len ({}) and lane count ({})", + delta.bases.len(), + expected_bases_len, + len, + delta.lanes() + ) + .into(), + )); + } + Ok(delta) + } + + #[inline] + pub fn bases(&self) -> &ArrayRef { + &self.bases + } + + #[inline] + pub fn deltas(&self) -> &ArrayRef { + &self.deltas + } + + #[inline] + fn lanes(&self) -> usize { + let ptype = self.dtype().try_into().unwrap(); + match_each_integer_ptype!(ptype, |$T| { + <$T as fastlanez_sys::Delta>::lanes() + }) + } + + #[inline] + pub fn validity(&self) -> Option<&ArrayRef> { + self.validity.as_ref() + } + + pub fn is_valid(&self, index: usize) -> bool { + self.validity() + .map(|v| scalar_at(v, index).and_then(|v| v.try_into()).unwrap()) + .unwrap_or(true) + } +} + +impl Array for DeltaArray { + impl_array!(); + + #[inline] + fn len(&self) -> usize { + self.len + } + + #[inline] + fn is_empty(&self) -> bool { + self.bases.is_empty() + } + + #[inline] + fn dtype(&self) -> &DType { + self.bases.dtype() + } + + #[inline] + fn stats(&self) -> Stats { + Stats::new(&self.stats, self) + } + + fn slice(&self, _start: usize, _stop: usize) -> VortexResult<ArrayRef> { + unimplemented!("DeltaArray::slice") + } + + #[inline] + fn encoding(&self) -> EncodingRef { + &DeltaEncoding + } + + #[inline] + fn nbytes(&self) -> usize { + self.bases().nbytes() + + self.deltas().nbytes() + + self.validity().map(|v| v.nbytes()).unwrap_or(0) + } + + fn serde(&self) -> Option<&dyn ArraySerde> { + Some(self) + } +} + +impl<'arr> AsRef<(dyn Array + 'arr)> for DeltaArray { + fn as_ref(&self) -> &(dyn Array + 'arr) { + self + } +} + +impl ArrayDisplay for DeltaArray { + fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { + f.child("bases", self.bases())?; + f.child("deltas", self.deltas())?; + f.maybe_child("validity", self.validity()) + } +} + +impl StatsCompute for DeltaArray { + fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> { + Ok(StatsSet::default()) + } +} + +#[derive(Debug)] +pub struct DeltaEncoding; + +impl DeltaEncoding { + pub const ID: EncodingId = EncodingId::new("fastlanes.delta"); +} + +impl Encoding for DeltaEncoding { + fn id(&self) -> EncodingId { + Self::ID + } + + fn compression(&self) -> Option<&dyn EncodingCompression> { + Some(self) + } + + fn serde(&self) -> Option<&dyn EncodingSerde> { + Some(self) + } +} diff --git a/vortex-fastlanes/src/delta/serde.rs b/vortex-fastlanes/src/delta/serde.rs new file mode 100644 index 0000000000..35b415ebed --- /dev/null +++ b/vortex-fastlanes/src/delta/serde.rs @@ -0,0 +1,26 @@ +use vortex::array::{Array, ArrayRef}; +use vortex::error::VortexResult; +use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; + +use crate::{DeltaArray, DeltaEncoding}; + +impl ArraySerde for DeltaArray { + fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> { + ctx.write_usize(self.len())?; + ctx.write(self.bases())?; + ctx.write(self.deltas())?; + ctx.write_optional_array(self.validity()) + } +} + +impl EncodingSerde for DeltaEncoding { + fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> { + let len = ctx.read_usize()?; + let bases = ctx.read()?; + let deltas = ctx.read()?; + let validity = ctx.read_optional_array()?; + Ok(DeltaArray::try_new(len, bases, deltas, validity) + .unwrap() + .into_array()) + } +} diff --git a/vortex-fastlanes/src/lib.rs b/vortex-fastlanes/src/lib.rs index 43cc468c02..b9af02215f 100644 --- a/vortex-fastlanes/src/lib.rs +++ b/vortex-fastlanes/src/lib.rs @@ -1,14 +1,22 @@ +#![allow(incomplete_features)] +#![feature(generic_const_exprs)] + use linkme::distributed_slice; pub use bitpacking::*; +pub use delta::*; pub use r#for::*; use vortex::array::{EncodingRef, ENCODINGS}; mod bitpacking; +mod delta; mod r#for; #[distributed_slice(ENCODINGS)] static ENCODINGS_FL_BITPACKING: EncodingRef = &BitPackedEncoding; +#[distributed_slice(ENCODINGS)] +static ENCODINGS_FL_DELTA: EncodingRef = &DeltaEncoding; + #[distributed_slice(ENCODINGS)] static ENCODINGS_FL_FOR: EncodingRef = &FoREncoding;