diff --git a/Cargo.lock b/Cargo.lock index 62f8d4948a..51677dc842 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1551,9 +1551,9 @@ dependencies = [ [[package]] name = "fsst-rs" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3494f027ec5d00d38de5e8a14672fb30640fa62cf1d37863e1db738fe3e61eaa" +checksum = "71a86d2804d7e23b8b067df618cfeda433bb6291ec68f156f6711e9523cbd779" [[package]] name = "futures" diff --git a/Cargo.toml b/Cargo.toml index 2061e40b0b..fe490e35df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ fastlanes = "0.1.5" flatbuffers = "24.3.25" flexbuffers = "2.0.0" fs_extra = "1.3.0" -fsst-rs = "0.2.2" +fsst-rs = "0.2.3" futures = { version = "0.3.30", default-features = false } futures-executor = "0.3.30" futures-util = "0.3.30" diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index e586cc404d..a92cd8fb8f 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -1,9 +1,12 @@ -use bench_vortex::compress_taxi_data; use bench_vortex::data_downloads::BenchmarkDataset; use bench_vortex::public_bi_data::BenchmarkDatasets; use bench_vortex::public_bi_data::PBIDataset::Medicare1; use bench_vortex::taxi_data::taxi_data_parquet; +use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; +use bench_vortex::{compress_taxi_data, tpch}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use vortex_sampling_compressor::compressors::fsst::FSSTCompressor; +use vortex_sampling_compressor::SamplingCompressor; fn vortex_compress_taxi(c: &mut Criterion) { taxi_data_parquet(); @@ -24,5 +27,46 @@ fn vortex_compress_medicare1(c: &mut Criterion) { group.finish() } -criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare1); +fn vortex_compress_tpch(c: &mut Criterion) { + let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let orders_vortex = rt.block_on(tpch::load_table(data_dir, "orders", &tpch::schema::ORDERS)); + + let compressor = SamplingCompressor::default().excluding(&FSSTCompressor); + let compressed = compressor.compress(&orders_vortex, None).unwrap(); + let ratio = (orders_vortex.nbytes() as f64) / (compressed.nbytes() as f64); + println!("compression ratio: {ratio}"); + + let mut group = c.benchmark_group("tpch"); + group.sample_size(10); + group.bench_function("orders", |b| { + b.iter(|| { + std::hint::black_box(compressor.compress(std::hint::black_box(&orders_vortex), None)) + }); + }); + + let compressor_fsst = SamplingCompressor::default(); + + group.bench_function("orders-fsst", |b| { + b.iter(|| { + std::hint::black_box( + compressor_fsst.compress(std::hint::black_box(&orders_vortex), None), + ) + }); + }); + + let compressed = compressor_fsst.compress(&orders_vortex, None).unwrap(); + let ratio = (orders_vortex.nbytes() as f64) / (compressed.nbytes() as f64); + println!("compression ratio: {ratio}"); +} + +criterion_group!( + benches, + vortex_compress_taxi, + vortex_compress_medicare1, + vortex_compress_tpch +); criterion_main!(benches); diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index b922aae959..cb3128ed75 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -377,6 +377,43 @@ async fn register_vortex( Ok(()) } +/// Load a table as an uncompressed Vortex array. +pub async fn load_table(data_dir: impl AsRef, name: &str, schema: &Schema) -> Array { + // Create a local session to load the CSV file from the path. + let path = data_dir + .as_ref() + .to_owned() + .join(format!("{name}.tbl")) + .to_str() + .unwrap() + .to_string(); + let record_batches = SessionContext::new() + .read_csv( + &path, + CsvReadOptions::default() + .delimiter(b'|') + .has_header(false) + .file_extension("tbl") + .schema(schema), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let chunks: Vec = record_batches + .iter() + .cloned() + .map(ArrowStructArray::from) + .map(|struct_array| Array::from_arrow(&struct_array, false)) + .collect(); + + let dtype = chunks[0].dtype().clone(); + + ChunkedArray::try_new(chunks, dtype).unwrap().into_array() +} + pub fn tpch_queries() -> impl Iterator)> { (1..=22).map(|q| (q, tpch_query(q))) } diff --git a/encodings/fsst/src/compress.rs b/encodings/fsst/src/compress.rs index 63d61299d1..2033d4f6d2 100644 --- a/encodings/fsst/src/compress.rs +++ b/encodings/fsst/src/compress.rs @@ -16,32 +16,26 @@ use crate::FSSTArray; /// # Panics /// /// If the `strings` array is not encoded as either [`VarBinArray`] or [`VarBinViewArray`]. -pub fn fsst_compress(strings: Array, compressor: Option) -> FSSTArray { +pub fn fsst_compress(strings: Array, compressor: &Compressor) -> FSSTArray { let len = strings.len(); let dtype = strings.dtype().clone(); // Compress VarBinArray if let Ok(varbin) = VarBinArray::try_from(&strings) { - let compressor = compressor.unwrap_or_else(|| { - varbin - .with_iterator(|iter| fsst_train_compressor(iter)) - .unwrap() - }); - return varbin - .with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor)) + let compressed = varbin + .with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor)) .unwrap(); + + return compressed; } // Compress VarBinViewArray if let Ok(varbin_view) = VarBinViewArray::try_from(&strings) { - let compressor = compressor.unwrap_or_else(|| { - varbin_view - .with_iterator(|iter| fsst_train_compressor(iter)) - .unwrap() - }); - return varbin_view - .with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor)) + let compressed = varbin_view + .with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor)) .unwrap(); + + return compressed; } panic!( @@ -50,22 +44,50 @@ pub fn fsst_compress(strings: Array, compressor: Option) -> FSSTArra ) } -fn fsst_train_compressor<'a, I>(iter: I) -> Compressor +/// Train a compressor from an array. +/// +/// # Panics +/// +/// If the provided array is not FSST compressible. +pub fn fsst_train_compressor(array: &Array, sample_size: usize) -> Compressor { + if let Ok(varbin) = VarBinArray::try_from(array) { + varbin + .with_iterator(|iter| fsst_train_compressor_iter(iter, sample_size)) + .unwrap() + } else if let Ok(varbin_view) = VarBinViewArray::try_from(array) { + varbin_view + .with_iterator(|iter| fsst_train_compressor_iter(iter, sample_size)) + .unwrap() + } else { + panic!( + "cannot fsst_compress array with unsupported encoding {:?}", + array.encoding().id() + ) + } +} + +/// Train a [compressor][Compressor] from an iterator of bytestrings. +fn fsst_train_compressor_iter<'a, I>(iter: I, sample_size: usize) -> Compressor where I: Iterator>, { // TODO(aduffy): eliminate the copying. - let mut sample = Vec::with_capacity(1_024 * 1_024); + let mut sample = Vec::with_capacity(sample_size); for string in iter { match string { None => {} Some(b) => sample.extend_from_slice(b), } + + if sample.len() >= sample_size { + break; + } } Compressor::train(&sample) } +/// Compress from an iterator of bytestrings using FSST. pub fn fsst_compress_iter<'a, I>( iter: I, len: usize, diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index 0e7708830a..56d7b71690 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -1,12 +1,14 @@ use std::collections::HashSet; +use std::fmt::Debug; +use fsst::Compressor; use vortex::array::{VarBin, VarBinView}; use vortex::encoding::EncodingRef; use vortex::{ArrayDType, ArrayDef, IntoArray}; -use vortex_dict::DictArray; +use vortex_dict::{Dict, DictArray}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; -use vortex_fsst::{fsst_compress, FSSTEncoding, FSST}; +use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTEncoding, FSST}; use super::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -14,6 +16,14 @@ use crate::SamplingCompressor; #[derive(Debug)] pub struct FSSTCompressor; +/// Size in bytes of the Symbol table for FSST +const FSST_SYMBOL_TABLE_SIZE: usize = 4_096; + +/// We use a 16KB sample of text from the input. +/// +/// This value is derived from the FSST paper section 4.4 +const DEFAULT_SAMPLE_BYTES: usize = 16 * 1_024; + impl EncodingCompressor for FSSTCompressor { fn id(&self) -> &str { FSST::ID.as_ref() @@ -27,30 +37,51 @@ impl EncodingCompressor for FSSTCompressor { return None; } - // FSST cannot be applied recursively. - if array.encoding().id() == FSST::ID { - return None; + // FSST can be applied on top of VarBin, VarBinView, and Dict encodings. + if array.encoding().id() == VarBin::ID + || array.encoding().id() == VarBinView::ID + || array.encoding().id() == Dict::ID + { + return Some(self); } - Some(self) + // Size-check: FSST has a builtin 4KB overhead due to the symbol table, and usually compresses + // between 2-3x depending on the text quality. + if array.nbytes() > 10 * FSST_SYMBOL_TABLE_SIZE { + return Some(self); + } + + None } fn compress<'a>( &'a self, array: &vortex::Array, - _like: Option>, + // TODO(aduffy): reuse compressor from sample run if we have saved it off. + like: Option>, _ctx: SamplingCompressor<'a>, ) -> VortexResult> { - // TODO(aduffy): use like array to clone the existing symbol table + let compressor = like + .and_then(|mut c| unsafe { c.metadata::() }) + .map(|m| { + println!("using pretrained compressor"); + m + }) + .unwrap_or_else(|| { + println!("training new compressor"); + Box::new(fsst_train_compressor(array, DEFAULT_SAMPLE_BYTES)) + }); + let result_array = if array.encoding().id() == VarBin::ID || array.encoding().id() == VarBinView::ID { // For a VarBinArray or VarBinViewArray, compress directly. - fsst_compress(array.clone(), None).into_array() + fsst_compress(array.clone(), compressor.as_ref()).into_array() } else if let Ok(dict) = DictArray::try_from(array) { // For a dict array, just compress the values - let values = fsst_compress(dict.values(), None).into_array(); + let values = fsst_compress(dict.values(), compressor.as_ref()); let codes = dict.codes(); - DictArray::try_new(codes, values)?.into_array() + + DictArray::try_new(codes, values.into_array())?.into_array() } else { vortex_bail!( InvalidArgument: "unsupported encoding for FSSTCompressor {:?}", @@ -58,7 +89,11 @@ impl EncodingCompressor for FSSTCompressor { ) }; - Ok(CompressedArray::new(result_array, None)) + Ok(CompressedArray::new( + result_array, + // Save a copy of the compressor that was used to compress this array. + Some(CompressionTree::new_with_metadata(self, vec![], compressor)), + )) } fn used_encodings(&self) -> HashSet { diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 36bb4c4c6f..556c08c6e4 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -59,6 +59,7 @@ impl Hash for dyn EncodingCompressor + '_ { pub struct CompressionTree<'a> { compressor: &'a dyn EncodingCompressor, children: Vec>>, + metadata: Option<*const ()>, } impl Display for CompressionTree<'_> { @@ -79,6 +80,25 @@ impl<'a> CompressionTree<'a> { Self { compressor, children, + metadata: None, + } + } + + /// Save a piece of metadata as part of the compression tree. + /// + /// This can be specific encoder parameters that were discovered at sample time + /// that should be reused when compressing the full array. + pub(crate) fn new_with_metadata( + compressor: &'a dyn EncodingCompressor, + children: Vec>>, + metadata: Box, + ) -> Self { + // SAFETY: the memory pointed to will get cleaned up in Drop impl. + let ptr = Box::into_raw(metadata) as *const (); + Self { + compressor, + children, + metadata: Some(ptr), } } @@ -108,6 +128,47 @@ impl<'a> CompressionTree<'a> { .can_compress(array) .map(|c| c.compress(array, Some(self.clone()), ctx.for_compressor(c))) } + + // /// Access the saved opaque metadata by reference. + // /// + // /// # Safety + // /// + // /// It is up to the caller to ensure that the type `T` is the correct type for the stored + // /// metadata. + // /// + // /// The value of `T` will almost always be `EncodingCompressor`-specific. + // pub(crate) unsafe fn metadata_ref(&self) -> Option<&T> { + // unsafe { self.metadata.map(|m| &*(m as *const T)) } + // } + + /// Access the saved opaque metadata. + /// + /// This will consume the struct's metadata pointer, giving the caller ownership of + /// the memory by returning a `Box`. + /// + /// # Safety + /// + /// It is up to the caller to ensure that the type `T` is the correct type for the stored + /// metadata. + /// + /// The value of `T` will almost always be `EncodingCompressor`-specific. + pub unsafe fn metadata(&mut self) -> Option> { + let metadata = std::mem::take(&mut self.metadata); + + metadata.map(|m| { + let ptr = m as *mut T; + unsafe { Box::from_raw(ptr) } + }) + } +} + +impl Drop for CompressionTree<'_> { + fn drop(&mut self) { + if let Some(ptr) = self.metadata { + // Recnostruct the box from the pointer to do a manual drop. + let _ = unsafe { Box::from_raw(ptr as *mut ()) }; + } + } } #[derive(Debug, Clone)] diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 3395c83b2f..35e129c400 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -31,8 +31,6 @@ mod sampling; #[derive(Debug, Clone)] pub struct CompressConfig { - #[allow(dead_code)] - block_size: u32, sample_size: u16, sample_count: u16, max_depth: u8, @@ -40,9 +38,7 @@ pub struct CompressConfig { impl Default for CompressConfig { fn default() -> Self { - // TODO(ngates): we should ensure that sample_size * sample_count <= block_size Self { - block_size: 65_536, // Sample length should always be multiple of 1024 sample_size: 128, sample_count: 8,