Skip to content

Commit

Permalink
Compression 2
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Mar 8, 2024
1 parent 7905769 commit f4cc69f
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 34 deletions.
13 changes: 6 additions & 7 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow_array::RecordBatchReader;
use itertools::Itertools;
use log::info;
use log::{info, warn};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use std::collections::HashSet;
Expand All @@ -24,7 +24,7 @@ use vortex::dtype::DType;
use vortex::formatter::display_tree;
use vortex_alp::ALPEncoding;
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};
use vortex_ree::REEEncoding;
use vortex_roaring::RoaringBoolEncoding;

Expand All @@ -46,8 +46,7 @@ pub fn enumerate_arrays() -> Vec<&'static dyn Encoding> {
&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
// &DeltaEncoding,
// &FFoREncoding,
&DeltaEncoding,
&REEEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
Expand Down Expand Up @@ -117,7 +116,7 @@ pub fn compress_taxi_data() -> ArrayRef {
let dtype: DType = schema.clone().try_into().unwrap();
let compressed = ChunkedArray::new(chunks.clone(), dtype).boxed();

info!("Compressed array {}", display_tree(compressed.as_ref()));
warn!("Compressed array {}\n", display_tree(compressed.as_ref()));

let mut field_bytes = vec![0; schema.fields().len()];
for chunk in chunks {
Expand Down Expand Up @@ -156,10 +155,10 @@ mod test {
.unwrap();
}

#[ignore]
// #[ignore]
#[test]
fn compression_ratio() {
setup_logger(LevelFilter::Warn);
setup_logger(LevelFilter::Debug);
_ = compress_taxi_data();
}
}
20 changes: 20 additions & 0 deletions vortex-array/src/ptype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,26 @@ impl PType {
pub fn bit_width(&self) -> usize {
self.byte_width() * 8
}

pub fn to_signed(self) -> PType {
match self {
PType::U8 => PType::I8,
PType::U16 => PType::I16,
PType::U32 => PType::I32,
PType::U64 => PType::I64,
_ => self,
}
}

pub fn to_unsigned(self) -> PType {
match self {
PType::I8 => PType::U8,
PType::I16 => PType::U16,
PType::I32 => PType::U32,
PType::I64 => PType::U64,
_ => self,
}
}
}

impl TryFrom<&DType> for PType {
Expand Down
2 changes: 1 addition & 1 deletion vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ fn bitpack_primitive<T: NativePType + TryBitPack>(array: &[T], bit_width: usize)
});

// Pad the last chunk with zeros to a full 1024 elements.
let last_chunk_size = array.len() % 1024;
let last_chunk_size = array.len() - ((num_chunks - 1) * 1024);
let mut last_chunk: [T; 1024] = [T::default(); 1024];
last_chunk[..last_chunk_size].copy_from_slice(&array[array.len() - last_chunk_size..]);
TryBitPack::try_bitpack_into(&last_chunk, bit_width, &mut output).unwrap();
Expand Down
52 changes: 40 additions & 12 deletions vortex-fastlanes/src/delta/compress.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use arrayref::array_ref;
use log::debug;
use std::mem::size_of;

use fastlanez_sys::{transpose, Delta};
use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef};
use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::compute::fill::fill_forward;
use vortex::error::VortexResult;
use vortex::match_each_signed_integer_ptype;
use vortex::ptype::NativePType;
use vortex::stats::Stat;

use crate::{DeltaArray, DeltaEncoding};

Expand All @@ -21,18 +22,34 @@ impl EncodingCompression for DeltaEncoding {
) -> Option<&dyn EncodingCompression> {
// Only support primitive arrays
let Some(parray) = array.maybe_primitive() else {
debug!("Skipping Delta: not primitive");
return None;
};

// Only supports signed ints
if !parray.ptype().is_signed_int() {
debug!("Skipping Delta: not int");
return None;
}

debug!("Compressing with Delta");
if parray
.stats()
.get_or_compute_cast::<i64>(&Stat::Min)
.unwrap_or(0)
!= 0
{
return None;
}

Some(self)
//
// // For now, only consider delta on sorted arrays
// if parray
// .stats()
// .get_or_compute_as::<bool>(&Stat::IsSorted)
// .unwrap_or(false)
// {
// return Some(self);
// }
// None
}

fn compress(
Expand All @@ -46,14 +63,19 @@ impl EncodingCompression for DeltaEncoding {

let validity = parray
.validity()
.map(|v| ctx.compress(v.as_ref(), like_delta.and_then(|d| d.validity())))
.map(|v| {
ctx.auxiliary("validity")
.compress(v.as_ref(), like_delta.and_then(|d| d.validity()))
})
.transpose()?;

// Fill forward nulls
let filled = fill_forward(array)?;
let delta_encoded = match_each_signed_integer_ptype!(parray.ptype(), |$T| {
PrimitiveArray::from(delta_primitive(parray.buffer().typed_data::<$T>()))
PrimitiveArray::from(delta_primitive(filled.as_primitive().typed_data::<$T>()))
});

let encoded = ctx.next_level().compress(
let encoded = ctx.named("deltas").compress(
delta_encoded.as_ref(),
like_delta.map(|d| d.encoded().as_ref()),
)?;
Expand Down Expand Up @@ -85,11 +107,17 @@ where
Delta::delta(&transposed, &mut base, &mut output);
});

// Pad the last chunk with zeros to a full 1024 elements.
let last_chunk_size = array.len() % 1024;
let mut last_chunk: [T; 1024] = [T::default(); 1024];
last_chunk[..last_chunk_size].copy_from_slice(&array[array.len() - last_chunk_size..]);
Delta::delta(&last_chunk, &mut base, &mut output);
// To avoid padding, the remainder is encoded with scalar logic.
let mut base_scalar = base[base.len() - 1];
let last_chunk_size = array.len() - ((num_chunks - 1) * 1024);
for i in array.len() - last_chunk_size..array.len() {
let next = array[i];
output.push(next - base_scalar);
base_scalar = next;
}
// let mut last_chunk: [T; 1024] = [T::default(); 1024];
// last_chunk[..last_chunk_size].copy_from_slice(&array[array.len() - last_chunk_size..]);
// Delta::delta(&last_chunk, &mut base, &mut output);

output
}
Expand Down
68 changes: 54 additions & 14 deletions vortex-fastlanes/src/for/compress.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use itertools::Itertools;
use num_traits::CheckedSub;

use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef};
use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::error::VortexResult;
use vortex::match_each_integer_ptype;
use vortex::ptype::NativePType;
use vortex::stats::Stat;

use crate::{FoRArray, FoREncoding};

impl EncodingCompression for FoREncoding {
fn cost(&self) -> u8 {
0
1
}

fn can_compress(
Expand All @@ -28,10 +30,26 @@ impl EncodingCompression for FoREncoding {
return None;
}

// Nothing for us to do if the min is already zero
if parray.stats().get_or_compute_cast::<i64>(&Stat::Min)? == 0 {
return None;
}
match_each_integer_ptype!(parray.ptype(), |$T| {
let min = parray
.stats()
.get_or_compute_as::<$T>(&Stat::Min)
.unwrap_or(<$T>::default());

// Nothing for us to do if the min is already zero
if min == 0 {
return None;
}

// Check for overflow
let max = parray
.stats()
.get_or_compute_as::<$T>(&Stat::Max)
.unwrap_or(<$T>::default());
if max.checked_sub(min).is_none() {
return None;
}
});

Some(self)
}
Expand All @@ -45,15 +63,7 @@ impl EncodingCompression for FoREncoding {
let parray = array.as_primitive();

let child = match_each_integer_ptype!(parray.ptype(), |$T| {
let min = parray.stats().get_or_compute_as::<$T>(&Stat::Min).unwrap_or(<$T>::default());

// TODO(ngates): check for overflow
let values = parray.buffer().typed_data::<$T>().iter().map(|v| v - min)
// TODO(ngates): cast to unsigned
// .map(|v| v as parray.ptype().to_unsigned()::T)
.collect_vec();

PrimitiveArray::from(values)
compress_primitive::<$T>(parray)
});

// TODO(ngates): remove FoR as a potential encoding from the ctx
Expand All @@ -67,6 +77,36 @@ impl EncodingCompression for FoREncoding {
Ok(FoRArray::try_new(compressed_child, reference)?.boxed())
}
}
fn compress_primitive<T: NativePType + CheckedSub>(parray: &PrimitiveArray) -> PrimitiveArray {
let min = parray
.stats()
.get_or_compute_as::<T>(&Stat::Min)
.unwrap_or(<T>::default());
let max = parray
.stats()
.get_or_compute_as::<T>(&Stat::Max)
.unwrap_or(<T>::default());

let _buffer = parray.typed_data::<T>();
if max.checked_sub(&min).is_none() {
// Delta would cause overflow
return parray.clone();
}

// TODO(ngates): check for overflow
let values = parray
.typed_data::<T>()
.iter()
.map(|&v| {
v.checked_sub(&min)
.unwrap_or_else(|| panic!("Underflow when compressing FoR"))
})
// TODO(ngates): cast to unsigned
// .map(|v| v as parray.ptype().to_unsigned()::T)
.collect_vec();

PrimitiveArray::from(values)
}

#[cfg(test)]
mod test {
Expand Down

0 comments on commit f4cc69f

Please sign in to comment.