Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into ngates/fastlanes-d…
Browse files Browse the repository at this point in the history
…elta
  • Loading branch information
lwwmanning committed Mar 20, 2024
2 parents afb4477 + c6674ee commit bd2b957
Show file tree
Hide file tree
Showing 25 changed files with 655 additions and 501 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 27 additions & 3 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ pub fn download_taxi_data() -> PathBuf {
pub fn compress_taxi_data() -> ArrayRef {
let file = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let _mask = ProjectionMask::roots(builder.parquet_schema(), [6]);
let _mask = ProjectionMask::roots(builder.parquet_schema(), [1]);
let _no_datetime_mask = ProjectionMask::roots(
builder.parquet_schema(),
[0, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
);
let reader = builder
//.with_projection(mask)
.with_projection(_mask)
//.with_projection(no_datetime_mask)
.with_batch_size(65_536)
// .with_batch_size(5_000_000)
Expand Down Expand Up @@ -151,6 +151,7 @@ mod test {
use vortex::array::ArrayRef;
use vortex::compute::as_arrow::as_arrow;
use vortex::encode::FromArrow;
use vortex::serde::{ReadCtx, WriteCtx};

use crate::{compress_ctx, compress_taxi_data, download_taxi_data};

Expand All @@ -165,13 +166,34 @@ mod test {
.unwrap();
}

#[ignore]
#[test]
fn compression_ratio() {
setup_logger(LevelFilter::Debug);
_ = compress_taxi_data();
}

#[ignore]
#[test]
fn round_trip_serde() {
let file = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let reader = builder.with_limit(1).build().unwrap();

for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayRef::from_arrow(arrow_array.clone(), false);

let mut buf = Vec::<u8>::new();
let mut write_ctx = WriteCtx::new(&mut buf);
write_ctx.write(vortex_array.as_ref()).unwrap();

let mut read = buf.as_slice();
let mut read_ctx = ReadCtx::new(vortex_array.dtype(), &mut read);
read_ctx.read().unwrap();
}
}

#[ignore]
#[test]
fn round_trip_arrow() {
Expand All @@ -188,6 +210,8 @@ mod test {
}
}

// Ignoring since Struct arrays don't currently support equality.
// https://github.com/apache/arrow-rs/issues/5199
#[ignore]
#[test]
fn round_trip_arrow_compressed() {
Expand Down
94 changes: 83 additions & 11 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
use crate::alp::ALPFloat;
use itertools::Itertools;

use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::sparse::SparseArray;
use vortex::array::{Array, ArrayRef};
use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::compute::flatten::flatten_primitive;
use vortex::compute::patch::patch;
use vortex::error::{VortexError, VortexResult};
use vortex::ptype::{NativePType, PType};

use crate::alp::ALPFloat;
use crate::array::{ALPArray, ALPEncoding};
use crate::downcast::DowncastALP;
use crate::Exponents;

#[macro_export]
macro_rules! match_each_alp_float_ptype {
($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({
macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )}
use vortex::error::VortexError;
use vortex::ptype::PType;
let ptype = $self;
match ptype {
PType::F32 => Ok(__with__! { f32 }),
PType::F64 => Ok(__with__! { f64 }),
_ => Err(VortexError::InvalidPType(ptype))
}
})
}

impl EncodingCompression for ALPEncoding {
fn can_compress(
&self,
Expand Down Expand Up @@ -39,15 +58,10 @@ impl EncodingCompression for ALPEncoding {
// TODO(ngates): fill forward nulls
let parray = array.as_primitive();

let (exponents, encoded, patches) = match parray.ptype() {
PType::F32 => {
encode_to_array(parray.typed_data::<f32>(), like_alp.map(|l| l.exponents()))
}
PType::F64 => {
encode_to_array(parray.typed_data::<f64>(), like_alp.map(|l| l.exponents()))
}
_ => panic!("Unsupported ptype"),
};
let (exponents, encoded, patches) = match_each_alp_float_ptype!(
*parray.ptype(), |$T| {
encode_to_array(parray.typed_data::<$T>(), like_alp.map(|l| l.exponents()))
})?;

let compressed_encoded = ctx
.named("packed")
Expand Down Expand Up @@ -90,7 +104,7 @@ where
)
}

pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
pub(crate) fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
let (exponents, encoded, patches) = match parray.ptype() {
PType::F32 => encode_to_array(parray.typed_data::<f32>(), None),
PType::F64 => encode_to_array(parray.typed_data::<f64>(), None),
Expand All @@ -99,6 +113,39 @@ pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
Ok(ALPArray::new(encoded, exponents, patches))
}

pub fn decompress(array: &ALPArray) -> VortexResult<PrimitiveArray> {
let encoded = flatten_primitive(array.encoded())?;
let decoded = match_each_alp_float_ptype!(array.dtype().try_into().unwrap(), |$T| {
use vortex::array::CloneOptionalArray;
PrimitiveArray::from_nullable(
decompress_primitive::<$T>(encoded.typed_data(), array.exponents()),
encoded.validity().clone_optional(),
)
})?;
if let Some(patches) = array.patches() {
// TODO(#121): right now, applying patches forces an extraneous copy of the array data
let patched = patch(&decoded, patches)?;
let patched_encoding_id = patched.encoding().id().clone();
patched
.into_any()
.downcast::<PrimitiveArray>()
.map_err(|_| VortexError::InvalidEncoding(patched_encoding_id))
.map(|boxed| *boxed)
} else {
Ok(decoded)
}
}

fn decompress_primitive<T: NativePType + ALPFloat>(
values: &[T::ALPInt],
exponents: &Exponents,
) -> Vec<T> {
values
.iter()
.map(|&v| T::decode_single(v, exponents))
.collect_vec()
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -113,6 +160,9 @@ mod tests {
vec![1234; 1025]
);
assert_eq!(encoded.exponents(), &Exponents { e: 4, f: 1 });

let decoded = decompress(&encoded).unwrap();
assert_eq!(array.typed_data::<f32>(), decoded.typed_data::<f32>());
}

#[test]
Expand All @@ -126,5 +176,27 @@ mod tests {
vec![0, 1234, 0]
);
assert_eq!(encoded.exponents(), &Exponents { e: 4, f: 1 });

let decoded = decompress(&encoded).unwrap();
let expected = vec![0f32, 1.234f32, 0f32];
assert_eq!(decoded.typed_data::<f32>(), expected.as_slice());
}

#[test]
#[allow(clippy::approx_constant)]
fn test_patched_compress() {
let values = vec![1.234f64, 2.718, std::f64::consts::PI, 4.0];
let array = PrimitiveArray::from(values.clone());
let encoded = alp_encode(&array).unwrap();
println!("Encoded {:?}", encoded);
assert!(encoded.patches().is_some());
assert_eq!(
encoded.encoded().as_primitive().typed_data::<i64>(),
vec![1234i64, 2718, 2718, 4000] // fill forward
);
assert_eq!(encoded.exponents(), &Exponents { e: 3, f: 0 });

let decoded = decompress(&encoded).unwrap();
assert_eq!(values, decoded.typed_data::<f64>());
}
}
45 changes: 22 additions & 23 deletions vortex-alp/src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,42 @@
use crate::alp::ALPFloat;
use crate::ALPArray;
use std::f32;
use vortex::array::Array;
use vortex::compute::flatten::{FlattenFn, FlattenedArray};
use vortex::compute::scalar_at::{scalar_at, ScalarAtFn};
use vortex::compute::ArrayCompute;
use vortex::dtype::{DType, FloatWidth};
use vortex::error::{VortexError, VortexResult};
use vortex::error::VortexResult;
use vortex::scalar::Scalar;

use crate::compress::decompress;
use crate::{match_each_alp_float_ptype, ALPArray, ALPFloat};

impl ArrayCompute for ALPArray {
fn flatten(&self) -> Option<&dyn FlattenFn> {
Some(self)
}

fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}
}

impl FlattenFn for ALPArray {
fn flatten(&self) -> VortexResult<FlattenedArray> {
decompress(self).map(FlattenedArray::Primitive)
}
}

impl ScalarAtFn for ALPArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
if let Some(patch) = self.patches().and_then(|p| scalar_at(p, index).ok()) {
return Ok(patch);
}

let encoded_val = scalar_at(self.encoded(), index)?;

match self.dtype() {
DType::Float(FloatWidth::_32, _) => {
let encoded_val: i32 = encoded_val.try_into().unwrap();
Ok(Scalar::from(<f32 as ALPFloat>::decode_single(
encoded_val,
self.exponents(),
)))
}
DType::Float(FloatWidth::_64, _) => {
let encoded_val: i64 = encoded_val.try_into().unwrap();
Ok(Scalar::from(<f64 as ALPFloat>::decode_single(
encoded_val,
self.exponents(),
)))
}
_ => Err(VortexError::InvalidDType(self.dtype().clone())),
}
match_each_alp_float_ptype!(self.dtype().try_into().unwrap(), |$T| {
let encoded_val: <$T as ALPFloat>::ALPInt = encoded_val.try_into().unwrap();
Scalar::from(<$T as ALPFloat>::decode_single(
encoded_val,
self.exponents(),
))
})
}
}
2 changes: 1 addition & 1 deletion vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ linkme = "0.3.23"
log = "0.4.20"
num-traits = "0.2.18"
num_enum = "0.7.2"
once_cell = "1.19.0"
paste = "1.0.14"
rand = { version = "0.8.5", features = [] }
rayon = "1.8.1"
roaring = "0.10.3"
Expand Down
Loading

0 comments on commit bd2b957

Please sign in to comment.