From 1da315e0e6673dd70c0b0dad677dfbf5eeeaed6c Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 8 Mar 2024 20:52:26 +0000 Subject: [PATCH] More Compression (#87) --- .github/actions/setup-rust/action.yml | 14 +- Cargo.lock | 2 + bench-vortex/benches/compress_benchmark.rs | 28 +-- bench-vortex/src/lib.rs | 41 ++-- pyvortex/src/compress.rs | 3 +- rust-toolchain.toml | 1 - vortex-alp/src/compress.rs | 10 +- vortex-array/src/array/chunked/mod.rs | 2 +- vortex-array/src/array/sparse/compress.rs | 10 +- vortex-array/src/array/struct_/compress.rs | 3 +- vortex-array/src/array/typed/compress.rs | 4 + vortex-array/src/array/varbin/compress.rs | 42 ++++ vortex-array/src/array/varbin/mod.rs | 6 + vortex-array/src/array/varbin/stats.rs | 4 +- vortex-array/src/compress.rs | 225 +++++++++++++------- vortex-array/src/ptype.rs | 3 +- vortex-dict/src/compress.rs | 16 +- vortex-dict/src/dict.rs | 2 +- vortex-fastlanes/Cargo.toml | 4 + vortex-fastlanes/src/bitpacking/compress.rs | 65 +++--- vortex-fastlanes/src/bitpacking/mod.rs | 4 +- vortex-fastlanes/src/for/compress.rs | 31 ++- vortex-ree/src/compress.rs | 10 +- vortex-roaring/src/integer/compress.rs | 1 - vortex-zigzag/src/compress.rs | 3 +- 25 files changed, 337 insertions(+), 197 deletions(-) create mode 100644 vortex-array/src/array/varbin/compress.rs diff --git a/.github/actions/setup-rust/action.yml b/.github/actions/setup-rust/action.yml index fc1c4884a4..66a0891abb 100644 --- a/.github/actions/setup-rust/action.yml +++ b/.github/actions/setup-rust/action.yml @@ -1,25 +1,25 @@ name: 'Setup Rust' description: 'Toolchain setup and Initial compilation' -inputs: - rust-toolchain: # id of input - description: 'Rust toolchain version to use' - required: true - default: stable runs: using: "composite" steps: + - name: Rust Version + id: rust-version + shell: bash + run: echo "version=$(cat rust-toolchain.toml | grep channel | awk -F'\"' '{print $2}')" >> $GITHUB_OUTPUT + - name: Rust Toolchain Cache id: rustup-cache uses: actions/cache@v4 with: path: ~/.rustup - key: "rustup-${{ runner.os }}-${{ inputs.rust-toolchain }}" + key: "rustup-${{ runner.os }}-${{ steps.rust-version.outputs.version }}" - name: Rust Toolchain uses: dtolnay/rust-toolchain@stable if: steps.rustup-cache.outputs.cache-hit != 'true' with: - toolchain: "${{ inputs.rust-toolchain }}" + toolchain: "${{ steps.rust-version.outputs.version }}" components: clippy, rustfmt - name: Rust Dependency Cache uses: Swatinem/rust-cache@v2 diff --git a/Cargo.lock b/Cargo.lock index 82c682fcea..989887765d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2868,6 +2868,8 @@ dependencies = [ "itertools 0.12.1", "linkme", "log", + "num-traits", + "simplelog", "vortex-array", ] diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index 610bbd06ab..dde7bf5f5c 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -1,31 +1,13 @@ -use std::fs::{create_dir_all, File}; -use std::path::Path; - -use bench_vortex::compress_taxi_data; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -fn download_taxi_data() -> &'static Path { - let download_path = Path::new("data/yellow-tripdata-2023-11.parquet"); - if download_path.exists() { - return download_path; - } - - create_dir_all(download_path.parent().unwrap()).unwrap(); - let mut download_file = File::create(download_path).unwrap(); - reqwest::blocking::get( - "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet", - ) - .unwrap() - .copy_to(&mut download_file) - .unwrap(); - - download_path -} +use bench_vortex::{compress_taxi_data, download_taxi_data}; fn enc_compress(c: &mut Criterion) { download_taxi_data(); - - c.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data()))); + let mut group = c.benchmark_group("end to end"); + group.sample_size(10); + group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data()))); + group.finish() } criterion_group!(benches, enc_compress); diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index bf38bd18a9..aff0ad0745 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -6,6 +6,7 @@ use parquet::arrow::ProjectionMask; use std::collections::HashSet; use std::fs::{create_dir_all, File}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use vortex::array::bool::BoolEncoding; use vortex::array::chunked::{ChunkedArray, ChunkedEncoding}; use vortex::array::constant::ConstantEncoding; @@ -44,13 +45,14 @@ pub fn enumerate_arrays() -> Vec<&'static dyn Encoding> { &ALPEncoding, &DictEncoding, &BitPackedEncoding, - // &DeltaEncoding, &FoREncoding, + // &DeltaEncoding, + // &FFoREncoding, &REEEncoding, &RoaringBoolEncoding, // &RoaringIntEncoding, // Doesn't offer anything more than FoR really - //&ZigZagEncoding, + // &ZigZagEncoding, ] } @@ -76,11 +78,17 @@ pub fn download_taxi_data() -> PathBuf { pub fn compress_taxi_data() -> ArrayRef { let file = File::open(download_taxi_data()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); - let _mask = ProjectionMask::roots(builder.parquet_schema(), [10]); + let _mask = ProjectionMask::roots(builder.parquet_schema(), [6]); + let _no_datetime_mask = ProjectionMask::roots( + builder.parquet_schema(), + [0, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18], + ); let reader = builder - // .with_projection(mask) + //.with_projection(mask) + //.with_projection(no_datetime_mask) .with_batch_size(65_536) - //.with_limit(100_000) + // .with_batch_size(5_000_000) + // .with_limit(100_000) .build() .unwrap(); @@ -89,18 +97,20 @@ pub fn compress_taxi_data() -> ArrayRef { HashSet::from_iter(enumerate_arrays().iter().map(|e| (*e).id())), HashSet::default(), ); - println!("Compression config {cfg:?}"); - let ctx = CompressCtx::new(&cfg); + info!("Compression config {cfg:?}"); + let ctx = CompressCtx::new(Arc::new(cfg)); let schema = reader.schema(); let mut uncompressed_size = 0; let chunks = reader .into_iter() + //.skip(39) + //.take(1) .map(|batch_result| batch_result.unwrap()) .map(ArrayRef::from) .map(|array| { uncompressed_size += array.nbytes(); - ctx.compress(array.as_ref(), None).unwrap() + ctx.clone().compress(array.as_ref(), None).unwrap() }) .collect_vec(); @@ -108,11 +118,6 @@ pub fn compress_taxi_data() -> ArrayRef { let compressed = ChunkedArray::new(chunks.clone(), dtype).boxed(); info!("Compressed array {}", display_tree(compressed.as_ref())); - info!( - "NBytes {}, Ratio {}", - compressed.nbytes(), - compressed.nbytes() as f32 / uncompressed_size as f32 - ); let mut field_bytes = vec![0; schema.fields().len()]; for chunk in chunks { @@ -122,8 +127,13 @@ pub fn compress_taxi_data() -> ArrayRef { } } field_bytes.iter().enumerate().for_each(|(i, &nbytes)| { - info!("{},{}", schema.field(i).name(), nbytes); + println!("{},{}", schema.field(i).name(), nbytes); }); + println!( + "NBytes {}, Ratio {}", + compressed.nbytes(), + compressed.nbytes() as f32 / uncompressed_size as f32 + ); compressed } @@ -146,9 +156,10 @@ mod test { .unwrap(); } + #[ignore] #[test] fn compression_ratio() { - setup_logger(LevelFilter::Info); + setup_logger(LevelFilter::Warn); _ = compress_taxi_data(); } } diff --git a/pyvortex/src/compress.rs b/pyvortex/src/compress.rs index 741d5a6ab6..fba8eb5800 100644 --- a/pyvortex/src/compress.rs +++ b/pyvortex/src/compress.rs @@ -1,5 +1,6 @@ use pyo3::types::PyType; use pyo3::{pyclass, pyfunction, pymethods, Py, PyResult, Python}; +use std::sync::Arc; use vortex::compress::{CompressConfig, CompressCtx}; @@ -33,7 +34,7 @@ pub fn compress( opts: Option, ) -> PyResult> { let compress_opts = opts.map(|o| o.inner).unwrap_or_default(); - let ctx = CompressCtx::new(&compress_opts); + let ctx = CompressCtx::new(Arc::new(compress_opts)); let compressed = py .allow_threads(|| ctx.compress(arr.unwrap(), None)) .map_err(PyVortexError::map_err)?; diff --git a/rust-toolchain.toml b/rust-toolchain.toml index b2b30bc401..cdc84958b6 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -2,4 +2,3 @@ channel = "nightly" components = ["rust-src", "rustfmt", "clippy"] profile = "minimal" - diff --git a/vortex-alp/src/compress.rs b/vortex-alp/src/compress.rs index 7bd38efb7c..fe2881a800 100644 --- a/vortex-alp/src/compress.rs +++ b/vortex-alp/src/compress.rs @@ -41,21 +41,23 @@ impl EncodingCompression for ALPEncoding { let (exponents, encoded, patches) = match parray.ptype() { PType::F32 => { - encode_to_array(parray.typed_data::(), like_alp.map(|a| a.exponents())) + encode_to_array(parray.typed_data::(), like_alp.map(|l| l.exponents())) } PType::F64 => { - encode_to_array(parray.typed_data::(), like_alp.map(|a| a.exponents())) + encode_to_array(parray.typed_data::(), like_alp.map(|l| l.exponents())) } _ => panic!("Unsupported ptype"), }; let compressed_encoded = ctx - .next_level() + .named("packed") + .excluding(&ALPEncoding::ID) .compress(encoded.as_ref(), like_alp.map(|a| a.encoded()))?; let compressed_patches = patches .map(|p| { - ctx.next_level() + ctx.auxiliary("patches") + .excluding(&ALPEncoding::ID) .compress(p.as_ref(), like_alp.and_then(|a| a.patches())) }) .transpose()?; diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 318f778e1b..500d12e08f 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -192,7 +192,7 @@ impl<'arr> AsRef<(dyn Array + 'arr)> for ChunkedArray { impl ArrayDisplay for ChunkedArray { fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { for (i, c) in self.chunks().iter().enumerate() { - f.child(&format!("[{}]", i), c.as_ref())? + f.new_total_size(c.nbytes(), |f| f.child(&format!("[{}]", i), c.as_ref()))?; } Ok(()) } diff --git a/vortex-array/src/array/sparse/compress.rs b/vortex-array/src/array/sparse/compress.rs index a1f264add0..893f51344a 100644 --- a/vortex-array/src/array/sparse/compress.rs +++ b/vortex-array/src/array/sparse/compress.rs @@ -5,6 +5,10 @@ use crate::compress::{CompressConfig, CompressCtx, EncodingCompression}; use crate::error::VortexResult; impl EncodingCompression for SparseEncoding { + fn cost(&self) -> u8 { + 0 + } + fn can_compress( &self, array: &dyn Array, @@ -22,8 +26,10 @@ impl EncodingCompression for SparseEncoding { let sparse_array = array.as_sparse(); let sparse_like = like.map(|la| la.as_sparse()); Ok(SparseArray::new( - ctx.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices()))?, - ctx.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?, + ctx.auxiliary("indices") + .compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices()))?, + ctx.named("values") + .compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?, sparse_array.len(), ) .boxed()) diff --git a/vortex-array/src/array/struct_/compress.rs b/vortex-array/src/array/struct_/compress.rs index 44f1676a9b..a842321567 100644 --- a/vortex-array/src/array/struct_/compress.rs +++ b/vortex-array/src/array/struct_/compress.rs @@ -32,7 +32,8 @@ impl EncodingCompression for StructEncoding { let like_chunk = struct_like .and_then(|c_like| c_like.fields().get(i)) .map(Deref::deref); - ctx.compress(chunk.deref(), like_chunk) + ctx.auxiliary(&format!("[{}]", i)) + .compress(chunk.deref(), like_chunk) }) .try_collect()?; diff --git a/vortex-array/src/array/typed/compress.rs b/vortex-array/src/array/typed/compress.rs index 7e60a8422e..917c7ea00c 100644 --- a/vortex-array/src/array/typed/compress.rs +++ b/vortex-array/src/array/typed/compress.rs @@ -5,6 +5,10 @@ use crate::compress::{CompressConfig, CompressCtx, EncodingCompression}; use crate::error::VortexResult; impl EncodingCompression for TypedEncoding { + fn cost(&self) -> u8 { + 0 + } + fn can_compress( &self, array: &dyn Array, diff --git a/vortex-array/src/array/varbin/compress.rs b/vortex-array/src/array/varbin/compress.rs new file mode 100644 index 0000000000..2249d32107 --- /dev/null +++ b/vortex-array/src/array/varbin/compress.rs @@ -0,0 +1,42 @@ +use crate::array::downcast::DowncastArrayBuiltin; +use crate::array::varbin::{VarBinArray, VarBinEncoding}; +use crate::array::{Array, ArrayRef}; +use crate::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use crate::error::VortexResult; + +impl EncodingCompression for VarBinEncoding { + fn cost(&self) -> u8 { + 0 // We simply destructure. + } + + fn can_compress( + &self, + array: &dyn Array, + _config: &CompressConfig, + ) -> Option<&dyn EncodingCompression> { + (array.encoding().id() == &Self::ID).then_some(self) + } + + fn compress( + &self, + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, + ) -> VortexResult { + let vb = array.as_varbin(); + let vblike = like.map(|a| a.as_varbin()); + Ok(VarBinArray::new( + ctx.auxiliary("offsets") + .compress(vb.offsets(), vblike.map(|l| l.offsets()))?, + dyn_clone::clone_box(vb.bytes()), + vb.dtype().clone(), + vb.validity() + .map(|v| { + ctx.auxiliary("validity") + .compress(v, vblike.and_then(|l| l.validity())) + }) + .transpose()?, + ) + .boxed()) + } +} diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index d48457719c..6c5aa3ac65 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -17,6 +17,7 @@ use crate::array::{ EncodingId, EncodingRef, ENCODINGS, }; use crate::arrow::CombineChunks; +use crate::compress::EncodingCompression; use crate::compute::scalar_at::scalar_at; use crate::dtype::{DType, IntWidth, Nullability, Signedness}; use crate::error::{VortexError, VortexResult}; @@ -25,6 +26,7 @@ use crate::ptype::NativePType; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsSet}; +mod compress; mod compute; mod serde; mod stats; @@ -323,6 +325,10 @@ impl Encoding for VarBinEncoding { &Self::ID } + fn compression(&self) -> Option<&dyn EncodingCompression> { + Some(self) + } + fn serde(&self) -> Option<&dyn EncodingSerde> { Some(self) } diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index f4cecd03a2..619d076ed6 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -26,10 +26,10 @@ where for i in 0..self.len() { let next_val = self.bytes_at(i).unwrap(); if next_val < min { - min = next_val.clone(); + min.clone_from(&next_val); } if next_val > max { - max = next_val.clone(); + max.clone_from(&next_val); } match next_val.cmp(&last_value) { Ordering::Less => is_sorted = false, diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index a74c64f3c4..108d7c38ca 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -1,8 +1,8 @@ use std::collections::HashSet; -use std::fmt::Debug; +use std::fmt::{Debug, Display, Formatter}; +use std::sync::Arc; use log::{debug, info, warn}; -use once_cell::sync::Lazy; use crate::array::chunked::ChunkedArray; use crate::array::constant::{ConstantArray, ConstantEncoding}; @@ -15,7 +15,10 @@ use crate::sampling::stratified_slices; use crate::stats::Stat; pub trait EncodingCompression: Encoding { - // TODO(ngates): we could return a weighted score here to allow for better selection? + fn cost(&self) -> u8 { + 1 + } + fn can_compress( &self, array: &dyn Array, @@ -28,6 +31,11 @@ pub trait EncodingCompression: Encoding { like: Option<&dyn Array>, ctx: CompressCtx, ) -> VortexResult; + + // For an array returned by this encoding, give the size in bytes minus any constant overheads. + fn compressed_nbytes(&self, array: &dyn Array) -> usize { + array.nbytes() + } } #[derive(Debug, Clone)] @@ -51,7 +59,7 @@ impl Default for CompressConfig { // Sample length should always be multiple of 1024 sample_size: 128, sample_count: 8, - max_depth: 4, + max_depth: 3, ree_average_run_threshold: 2.0, encodings: HashSet::new(), disabled_encodings: HashSet::new(), @@ -89,47 +97,83 @@ impl CompressConfig { } } -static DEFAULT_COMPRESS_CONFIG: Lazy = Lazy::new(CompressConfig::default); - #[derive(Debug, Clone)] -pub struct CompressCtx<'a> { - options: &'a CompressConfig, +pub struct CompressCtx { + path: Vec, + // TODO(ngates): put this back to a reference + options: Arc, depth: u8, + disabled_encodings: HashSet<&'static EncodingId>, } -impl<'a> CompressCtx<'a> { - pub fn new(options: &'a CompressConfig) -> Self { - Self { options, depth: 0 } +impl Display for CompressCtx { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "[{}|{}]", self.depth, self.path.join(".")) } +} - pub fn compress(&self, arr: &dyn Array, like: Option<&dyn Array>) -> VortexResult { - debug!( - "Compressing {} like {} at depth={}", - arr, - like.map(|l| l.encoding().id().name()).unwrap_or(""), - self.depth - ); - if arr.is_empty() { - return Ok(dyn_clone::clone_box(arr)); +impl CompressCtx { + pub fn new(options: Arc) -> Self { + Self { + path: Vec::new(), + options, + depth: 0, + disabled_encodings: HashSet::new(), } + } + + pub fn named(&self, name: &str) -> Self { + let mut cloned = self.clone(); + cloned.path.push(name.into()); + cloned + } + + // Returns a new ctx used for compressing an auxiliary arrays. + // In practice, this means resetting any disabled encodings back to the original config. + pub fn auxiliary(&self, name: &str) -> Self { + let mut cloned = self.clone(); + cloned.path.push(name.into()); + cloned.disabled_encodings = HashSet::new(); + cloned + } + + pub fn for_encoding(&self, compression: &dyn EncodingCompression) -> Self { + let mut cloned = self.clone(); + cloned.depth += compression.cost(); + cloned + } - if self.depth >= self.options.max_depth { + #[inline] + pub fn options(&self) -> Arc { + self.options.clone() + } + + pub fn excluding(&self, encoding: &'static EncodingId) -> Self { + let mut cloned = self.clone(); + cloned.disabled_encodings.insert(encoding); + cloned + } + + // We don't take a reference to self to force the caller to think about whether to use + // an auxilliary ctx. + pub fn compress(self, arr: &dyn Array, like: Option<&dyn Array>) -> VortexResult { + if arr.is_empty() { return Ok(dyn_clone::clone_box(arr)); } // Attempt to compress using the "like" array, otherwise fall back to sampled compression if let Some(l) = like { - if let Some(compression) = l + if let Some(compressed) = l .encoding() .compression() - .and_then(|c| c.can_compress(arr, self.options)) + .map(|c| c.compress(arr, Some(l), self.for_encoding(c))) { - return compression.compress(arr, Some(l), self.clone()); + return compressed; } else { - warn!("Cannot find compressor to compress {} like {}", arr, l); - // TODO(ngates): we shouldn't just bail, but we also probably don't want to fully - // re-sample. - return Ok(dyn_clone::clone_box(arr)); + warn!( + "{} cannot find compressor to compress {} like {}", + self, arr, l + ); } } @@ -167,22 +211,11 @@ impl<'a> CompressCtx<'a> { } } } - - pub fn next_level(&self) -> Self { - let mut cloned = self.clone(); - cloned.depth += 1; - cloned - } - - #[inline] - pub fn options(&self) -> &CompressConfig { - self.options - } } -impl Default for CompressCtx<'_> { +impl Default for CompressCtx { fn default() -> Self { - Self::new(&DEFAULT_COMPRESS_CONFIG) + Self::new(Arc::new(CompressConfig::default())) } } @@ -199,61 +232,103 @@ pub fn sampled_compression(array: &dyn Array, ctx: &CompressCtx) -> VortexResult )); } - let candidates: Vec<&dyn EncodingCompression> = ENCODINGS + let mut candidates: Vec<&dyn EncodingCompression> = ENCODINGS .iter() .filter(|encoding| ctx.options().is_enabled(encoding.id())) + .filter(|encoding| !ctx.disabled_encodings.contains(encoding.id())) .filter_map(|encoding| encoding.compression()) - .filter_map(|compression| compression.can_compress(array, ctx.options())) + .filter(|compression| { + if compression + .can_compress(array, ctx.options().as_ref()) + .is_some() + { + if ctx.depth + compression.cost() > ctx.options.max_depth { + debug!( + "{} skipping encoding {} due to depth", + ctx, + compression.id() + ); + return false; + } + true + } else { + false + } + }) .collect(); - debug!("Candidates for {}: {:?}", array, candidates); + debug!("{} candidates for {}: {:?}", ctx, array, candidates); if candidates.is_empty() { debug!( - "No compressors for array with dtype: {} and encoding: {}", + "{} no compressors for array with dtype: {} and encoding: {}", + ctx, array.dtype(), array.encoding().id(), ); return Ok(None); } - if array.len() < (ctx.options.sample_size as usize * ctx.options.sample_count as usize) { + // We prefer all other candidates to the array's own encoding. + // This is because we assume that the array's own encoding is the least efficient, but useful + // to destructure an array in the final stages of compression. e.g. VarBin would be DictEncoded + // but then the dictionary itself remains a VarBin array. DictEncoding excludes itself from the + // dictionary, but we still have a large offsets array that should be compressed. + // TODO(ngates): we actually probably want some way to prefer dict encoding over other varbin + // encodings, e.g. FSST. + if candidates.len() > 1 { + candidates.retain(|&compression| compression.id() != array.encoding().id()); + } + + if array.len() <= (ctx.options.sample_size as usize * ctx.options.sample_count as usize) { // We're either already within a sample, or we're operating over a sufficiently small array. - find_best_compression(candidates, array, ctx) - } else { - let sample = compute::as_contiguous::as_contiguous( - stratified_slices( - array.len(), - ctx.options.sample_size, - ctx.options.sample_count, - ) - .into_iter() - .map(|(start, stop)| array.slice(start, stop).unwrap()) - .collect(), - )?; - - find_best_compression(candidates, sample.as_ref(), ctx)? - .map(|best| { - info!("Compressing array {} like {}", array, best); - ctx.compress(array, Some(best.as_ref())) - }) - .transpose() + return find_best_compression(candidates, array, ctx) + .map(|best| best.map(|(_compression, best)| best)); } + + // Take a sample of the array, then ask codecs for their best compression estimate. + let sample = compute::as_contiguous::as_contiguous( + stratified_slices( + array.len(), + ctx.options.sample_size, + ctx.options.sample_count, + ) + .into_iter() + .map(|(start, stop)| array.slice(start, stop).unwrap()) + .collect(), + )?; + + find_best_compression(candidates, sample.as_ref(), ctx)? + .map(|(compression, best)| { + info!("{} compressing array {} like {}", ctx, array, best); + ctx.for_encoding(compression) + .compress(array, Some(best.as_ref())) + }) + .transpose() } -fn find_best_compression( - candidates: Vec<&'static dyn EncodingCompression>, +fn find_best_compression<'a>( + candidates: Vec<&'a dyn EncodingCompression>, sample: &dyn Array, ctx: &CompressCtx, -) -> VortexResult> { - let mut best_sample = None; +) -> VortexResult> { + let mut best = None; let mut best_ratio = 1.0; for compression in candidates { - let compressed_sample = compression.compress(sample.as_ref(), None, ctx.clone())?; - let compression_ratio = compressed_sample.nbytes() as f32 / sample.nbytes() as f32; - if compression_ratio < best_ratio { - best_sample = Some(compressed_sample); - best_ratio = compression_ratio; + debug!( + "{} trying candidate {} for {}", + ctx, + compression.id(), + sample + ); + let compressed_sample = + compression.compress(sample, None, ctx.for_encoding(compression))?; + let compressed_size = compression.compressed_nbytes(compressed_sample.as_ref()); + let ratio = compressed_size as f32 / sample.nbytes() as f32; + debug!("{} ratio for {}: {}", ctx, compression.id(), ratio); + if ratio < best_ratio { + best_ratio = ratio; + best = Some((compression, compressed_sample)) } } - Ok(best_sample) + Ok(best) } diff --git a/vortex-array/src/ptype.rs b/vortex-array/src/ptype.rs index 51414c2ce7..7463a53bb3 100644 --- a/vortex-array/src/ptype.rs +++ b/vortex-array/src/ptype.rs @@ -3,7 +3,7 @@ use std::panic::RefUnwindSafe; use arrow::datatypes::ArrowNativeType; use half::f16; -use num_traits::NumCast; +use num_traits::{Num, NumCast}; use crate::dtype::{DType, FloatWidth, IntWidth, Signedness}; use crate::error::{VortexError, VortexResult}; @@ -35,6 +35,7 @@ pub trait NativePType: + Default + ArrowNativeType + RefUnwindSafe + + Num + NumCast + Into + TryFrom diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index ab78e9e0b0..6595adfcd5 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -3,13 +3,13 @@ use std::hash::{Hash, Hasher}; use ahash::RandomState; use hashbrown::hash_map::{Entry, RawEntryMut}; use hashbrown::HashMap; -use log::debug; use num_traits::AsPrimitive; use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; use vortex::array::varbin::{VarBinArray, VarBinEncoding}; use vortex::array::{Array, ArrayKind, ArrayRef}; use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; + use vortex::dtype::DType; use vortex::error::VortexResult; use vortex::match_each_native_ptype; @@ -30,7 +30,6 @@ impl EncodingCompression for DictEncoding { if array.encoding().id() != &PrimitiveEncoding::ID && array.encoding().id() != &VarBinEncoding::ID { - debug!("Skipping Dict: not primitive or varbin"); return None; }; @@ -41,7 +40,6 @@ impl EncodingCompression for DictEncoding { .get_or_compute_as(&Stat::IsStrictSorted) .unwrap_or(false) { - debug!("Skipping Dict: array is strict_sorted"); return None; } @@ -60,18 +58,22 @@ impl EncodingCompression for DictEncoding { ArrayKind::Primitive(p) => { let (codes, dict) = dict_encode_primitive(p); ( - ctx.next_level() + ctx.auxiliary("codes") + .excluding(&DictEncoding::ID) .compress(codes.as_ref(), dict_like.map(|dict| dict.codes()))?, - ctx.next_level() + ctx.named("values") + .excluding(&DictEncoding::ID) .compress(dict.as_ref(), dict_like.map(|dict| dict.dict()))?, ) } ArrayKind::VarBin(vb) => { let (codes, dict) = dict_encode_varbin(vb); ( - ctx.next_level() + ctx.auxiliary("codes") + .excluding(&DictEncoding::ID) .compress(codes.as_ref(), dict_like.map(|dict| dict.codes()))?, - ctx.next_level() + ctx.named("values") + .excluding(&DictEncoding::ID) .compress(dict.as_ref(), dict_like.map(|dict| dict.dict()))?, ) } diff --git a/vortex-dict/src/dict.rs b/vortex-dict/src/dict.rs index e447d100ee..0bacfe60cd 100644 --- a/vortex-dict/src/dict.rs +++ b/vortex-dict/src/dict.rs @@ -116,7 +116,7 @@ impl DictEncoding { } impl Encoding for DictEncoding { - fn id(&self) -> &EncodingId { + fn id(&self) -> &'static EncodingId { &Self::ID } diff --git a/vortex-fastlanes/Cargo.toml b/vortex-fastlanes/Cargo.toml index 6ed5739338..7f21bf95c2 100644 --- a/vortex-fastlanes/Cargo.toml +++ b/vortex-fastlanes/Cargo.toml @@ -19,5 +19,9 @@ arrayref = "0.3.7" vortex-array = { path = "../vortex-array" } linkme = "0.3.22" itertools = "0.12.1" +num-traits = "0.2.18" fastlanez-sys = { path = "../fastlanez-sys" } log = "0.4.20" + +[dev-dependencies] +simplelog = { version = "0.12.1", features = ["paris"] } \ No newline at end of file diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 4e569b1d09..82e051b837 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -1,5 +1,4 @@ use arrayref::array_ref; -use log::debug; use fastlanez_sys::TryBitPack; use vortex::array::downcast::DowncastArrayBuiltin; @@ -16,38 +15,32 @@ use vortex::stats::Stat; use crate::{BitPackedArray, BitPackedEncoding}; impl EncodingCompression for BitPackedEncoding { + fn cost(&self) -> u8 { + 0 + } + fn can_compress( &self, array: &dyn Array, _config: &CompressConfig, ) -> Option<&dyn EncodingCompression> { // Only support primitive arrays - let Some(parray) = array.maybe_primitive() else { - debug!("Skipping BitPacking: not primitive"); - return None; - }; + let parray = array.maybe_primitive()?; // Only supports ints if !parray.ptype().is_int() { - debug!("Skipping BitPacking: not int"); - return None; - } - - // Check that the min == zero. Otherwise, we can assume that FoR will run first. - if parray.stats().get_or_compute_cast::(&Stat::Min)? != 0 { - debug!("Skipping BitPacking: min != 0"); return None; } + let bytes_per_exception = bytes_per_exception(parray.ptype()); let bit_width_freq = parray .stats() .get_or_compute_as::>(&Stat::BitWidthFreq)? .0; - let bit_width = best_bit_width(parray.ptype(), &bit_width_freq); + let bit_width = best_bit_width(&bit_width_freq, bytes_per_exception); // Check that the bit width is less than the type's bit width if bit_width == parray.ptype().bit_width() { - debug!("Skipping BitPacking: best == current bit width"); return None; } @@ -68,29 +61,26 @@ impl EncodingCompression for BitPackedEncoding { .0; let like_bp = like.map(|l| l.as_any().downcast_ref::().unwrap()); - - let bit_width = like_bp - .map(|bp| bp.bit_width()) - .unwrap_or_else(|| best_bit_width(parray.ptype(), &bit_width_freq)); + let bit_width = best_bit_width(&bit_width_freq, bytes_per_exception(parray.ptype())); let num_exceptions = count_exceptions(bit_width, &bit_width_freq); - // If we pack into zero bits, then we have an empty byte array. - let packed = if bit_width == 0 { - PrimitiveArray::from(Vec::::new()).boxed() - } else { - bitpack(parray, bit_width) - }; + if bit_width == parray.ptype().bit_width() { + // Nothing we can do + return Ok(parray.clone().boxed()); + } + + let packed = bitpack(parray, bit_width); let validity = parray .validity() .map(|v| { - ctx.next_level() + ctx.auxiliary("validity") .compress(v.as_ref(), like_bp.and_then(|bp| bp.validity())) }) .transpose()?; let patches = if num_exceptions > 0 { - Some(ctx.next_level().compress( + Some(ctx.auxiliary("patches").compress( bitpack_patches(parray, bit_width, num_exceptions).as_ref(), like_bp.and_then(|bp| bp.patches()), )?) @@ -126,6 +116,10 @@ fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> ArrayRef { } fn bitpack_primitive(array: &[T], bit_width: usize) -> Vec { + if bit_width == 0 { + return Vec::new(); + } + // How many fastlanes vectors we will process. let num_chunks = (array.len() + 1023) / 1024; @@ -173,9 +167,8 @@ fn bitpack_patches( /// Assuming exceptions cost 1 value + 1 u32 index, figure out the best bit-width to use. /// We could try to be clever, but we can never really predict how the exceptions will compress. -fn best_bit_width(ptype: &PType, bit_width_freq: &[usize]) -> usize { +fn best_bit_width(bit_width_freq: &[usize], bytes_per_exception: usize) -> usize { let len: usize = bit_width_freq.iter().sum(); - let bytes_per_exception = ptype.byte_width() + 4; if bit_width_freq.len() > u8::MAX as usize { panic!("Too many bit widths"); @@ -198,6 +191,10 @@ fn best_bit_width(ptype: &PType, bit_width_freq: &[usize]) -> usize { best_width } +fn bytes_per_exception(ptype: &PType) -> usize { + ptype.byte_width() + 4 +} + fn count_exceptions(bit_width: usize, bit_width_freq: &[usize]) -> usize { bit_width_freq[bit_width + 1..].iter().sum() } @@ -205,8 +202,8 @@ fn count_exceptions(bit_width: usize, bit_width_freq: &[usize]) -> usize { #[cfg(test)] mod test { use std::collections::HashSet; + use std::sync::Arc; - use vortex::array::primitive::PrimitiveEncoding; use vortex::array::Encoding; use super::*; @@ -216,17 +213,13 @@ mod test { // 10 1-bit values, 20 2-bit, etc. let freq = vec![0, 10, 20, 15, 1, 0, 0, 0]; // 3-bits => (46 * 3) + (8 * 1 * 5) => 178 bits => 23 bytes and zero exceptions - assert_eq!(best_bit_width(&PType::U8, &freq), 3); + assert_eq!(best_bit_width(&freq, bytes_per_exception(&PType::U8)), 3); } #[test] fn test_compress() { - // FIXME(ngates): remove PrimitiveEncoding https://github.com/fulcrum-so/vortex/issues/35 - let cfg = CompressConfig::new( - HashSet::from([PrimitiveEncoding.id(), BitPackedEncoding.id()]), - HashSet::default(), - ); - let ctx = CompressCtx::new(&cfg); + let cfg = CompressConfig::new(HashSet::from([BitPackedEncoding.id()]), HashSet::default()); + let ctx = CompressCtx::new(Arc::new(cfg)); let compressed = ctx .compress( diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index dcdb3d5ba0..ae0a43c9ea 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -130,7 +130,9 @@ impl Array for BitPackedArray { #[inline] fn nbytes(&self) -> usize { - self.encoded().nbytes() + // Ignore any overheads like padding or the bit-width flag. + let packed_size = ((self.bit_width * self.len()) + 7) / 8; + packed_size + self.patches().map(|p| p.nbytes()).unwrap_or(0) + self.validity().map(|v| v.nbytes()).unwrap_or(0) } diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index cf1b5d7444..968f393d43 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -1,5 +1,4 @@ use itertools::Itertools; -use log::debug; use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::PrimitiveArray; @@ -12,26 +11,25 @@ use vortex::stats::Stat; use crate::{FoRArray, FoREncoding}; impl EncodingCompression for FoREncoding { + fn cost(&self) -> u8 { + 0 + } + fn can_compress( &self, array: &dyn Array, _config: &CompressConfig, ) -> Option<&dyn EncodingCompression> { // Only support primitive arrays - let Some(parray) = array.maybe_primitive() else { - debug!("Skipping FoR: not primitive"); - return None; - }; + let parray = array.maybe_primitive()?; // Only supports integers if !parray.ptype().is_int() { - debug!("Skipping FoR: not int"); return None; } - // Nothing for us to do if the min is already zero. + // Nothing for us to do if the min is already zero if parray.stats().get_or_compute_cast::(&Stat::Min)? == 0 { - debug!("Skipping FoR: min is zero"); return None; } @@ -48,18 +46,20 @@ impl EncodingCompression for FoREncoding { let child = match_each_integer_ptype!(parray.ptype(), |$T| { let min = parray.stats().get_or_compute_as::<$T>(&Stat::Min).unwrap_or(<$T>::default()); + // TODO(ngates): check for overflow let values = parray.buffer().typed_data::<$T>().iter().map(|v| v - min) // TODO(ngates): cast to unsigned // .map(|v| v as parray.ptype().to_unsigned()::T) .collect_vec(); + PrimitiveArray::from(values) }); // TODO(ngates): remove FoR as a potential encoding from the ctx // NOTE(ngates): we don't invoke next_level here since we know bit-packing is always // worth trying. - let compressed_child = ctx.compress( + let compressed_child = ctx.named("for").excluding(&FoREncoding::ID).compress( child.as_ref(), like.map(|l| l.as_any().downcast_ref::().unwrap().child()), )?; @@ -70,7 +70,10 @@ impl EncodingCompression for FoREncoding { #[cfg(test)] mod test { + use log::LevelFilter; + use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use std::collections::HashSet; + use std::sync::Arc; use vortex::array::primitive::PrimitiveEncoding; use vortex::array::Encoding; @@ -81,6 +84,14 @@ mod test { #[test] fn test_compress() { + TermLogger::init( + LevelFilter::Debug, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ) + .unwrap(); + let cfg = CompressConfig::new( // We need some BitPacking else we will need choose FoR. HashSet::from([ @@ -90,7 +101,7 @@ mod test { ]), HashSet::default(), ); - let ctx = CompressCtx::new(&cfg); + let ctx = CompressCtx::new(Arc::new(cfg)); // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); diff --git a/vortex-ree/src/compress.rs b/vortex-ree/src/compress.rs index 2b6797f087..c87d89cf2a 100644 --- a/vortex-ree/src/compress.rs +++ b/vortex-ree/src/compress.rs @@ -43,10 +43,11 @@ impl EncodingCompression for REEEncoding { let (ends, values) = ree_encode(primitive_array); let compressed_ends = ctx - .next_level() + .auxiliary("ends") .compress(ends.as_ref(), ree_like.map(|ree| ree.ends()))?; let compressed_values = ctx - .next_level() + .named("values") + .excluding(&REEEncoding::ID) .compress(values.as_ref(), ree_like.map(|ree| ree.values()))?; Ok(REEArray::new( @@ -54,10 +55,7 @@ impl EncodingCompression for REEEncoding { compressed_values, primitive_array .validity() - .map(|v| { - ctx.next_level() - .compress(v, ree_like.and_then(|r| r.validity())) - }) + .map(|v| ctx.compress(v, ree_like.and_then(|r| r.validity()))) .transpose()?, array.len(), ) diff --git a/vortex-roaring/src/integer/compress.rs b/vortex-roaring/src/integer/compress.rs index 73a395cf19..d6075ccc24 100644 --- a/vortex-roaring/src/integer/compress.rs +++ b/vortex-roaring/src/integer/compress.rs @@ -23,7 +23,6 @@ impl EncodingCompression for RoaringIntEncoding { ) -> Option<&dyn EncodingCompression> { // Only support primitive enc arrays if array.encoding().id() != &PrimitiveEncoding::ID { - debug!("Skipping roaring int, not primitive"); return None; } diff --git a/vortex-zigzag/src/compress.rs b/vortex-zigzag/src/compress.rs index 840b010b6b..5cc2cd107b 100644 --- a/vortex-zigzag/src/compress.rs +++ b/vortex-zigzag/src/compress.rs @@ -48,8 +48,7 @@ impl EncodingCompression for ZigZagEncoding { }; Ok(ZigZagArray::new( - ctx.next_level() - .compress(encoded.unwrap().encoded(), zigzag_like.map(|z| z.encoded()))?, + ctx.compress(encoded.unwrap().encoded(), zigzag_like.map(|z| z.encoded()))?, ) .boxed()) }