Skip to content

Commit

Permalink
some things
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Aug 22, 2024
1 parent fc81829 commit 47c21a2
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 38 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fastlanes = "0.1.5"
flatbuffers = "24.3.25"
flexbuffers = "2.0.0"
fs_extra = "1.3.0"
fsst-rs = "0.2.2"
fsst-rs = "0.2.3"
futures = { version = "0.3.30", default-features = false }
futures-executor = "0.3.30"
futures-util = "0.3.30"
Expand Down
48 changes: 46 additions & 2 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use bench_vortex::compress_taxi_data;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets;
use bench_vortex::public_bi_data::PBIDataset::Medicare1;
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::{compress_taxi_data, tpch};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::SamplingCompressor;

fn vortex_compress_taxi(c: &mut Criterion) {
taxi_data_parquet();
Expand All @@ -24,5 +27,46 @@ fn vortex_compress_medicare1(c: &mut Criterion) {
group.finish()
}

criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare1);
fn vortex_compress_tpch(c: &mut Criterion) {
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let orders_vortex = rt.block_on(tpch::load_table(data_dir, "orders", &tpch::schema::ORDERS));

let compressor = SamplingCompressor::default().excluding(&FSSTCompressor);
let compressed = compressor.compress(&orders_vortex, None).unwrap();
let ratio = (orders_vortex.nbytes() as f64) / (compressed.nbytes() as f64);
println!("compression ratio: {ratio}");

let mut group = c.benchmark_group("tpch");
group.sample_size(10);
group.bench_function("orders", |b| {
b.iter(|| {
std::hint::black_box(compressor.compress(std::hint::black_box(&orders_vortex), None))
});
});

let compressor_fsst = SamplingCompressor::default();

group.bench_function("orders-fsst", |b| {
b.iter(|| {
std::hint::black_box(
compressor_fsst.compress(std::hint::black_box(&orders_vortex), None),
)
});
});

let compressed = compressor_fsst.compress(&orders_vortex, None).unwrap();
let ratio = (orders_vortex.nbytes() as f64) / (compressed.nbytes() as f64);
println!("compression ratio: {ratio}");
}

criterion_group!(
benches,
vortex_compress_taxi,
vortex_compress_medicare1,
vortex_compress_tpch
);
criterion_main!(benches);
37 changes: 37 additions & 0 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,43 @@ async fn register_vortex(
Ok(())
}

/// Load a table as an uncompressed Vortex array.
pub async fn load_table(data_dir: impl AsRef<Path>, name: &str, schema: &Schema) -> Array {
// Create a local session to load the CSV file from the path.
let path = data_dir
.as_ref()
.to_owned()
.join(format!("{name}.tbl"))
.to_str()
.unwrap()
.to_string();
let record_batches = SessionContext::new()
.read_csv(
&path,
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await
.unwrap()
.collect()
.await
.unwrap();

let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(ArrowStructArray::from)
.map(|struct_array| Array::from_arrow(&struct_array, false))
.collect();

let dtype = chunks[0].dtype().clone();

ChunkedArray::try_new(chunks, dtype).unwrap().into_array()
}

pub fn tpch_queries() -> impl Iterator<Item = (usize, Vec<String>)> {
(1..=22).map(|q| (q, tpch_query(q)))
}
Expand Down
56 changes: 39 additions & 17 deletions encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,26 @@ use crate::FSSTArray;
/// # Panics
///
/// If the `strings` array is not encoded as either [`VarBinArray`] or [`VarBinViewArray`].
pub fn fsst_compress(strings: Array, compressor: Option<Compressor>) -> FSSTArray {
pub fn fsst_compress(strings: Array, compressor: &Compressor) -> FSSTArray {
let len = strings.len();
let dtype = strings.dtype().clone();

// Compress VarBinArray
if let Ok(varbin) = VarBinArray::try_from(&strings) {
let compressor = compressor.unwrap_or_else(|| {
varbin
.with_iterator(|iter| fsst_train_compressor(iter))
.unwrap()
});
return varbin
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor))
let compressed = varbin
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
.unwrap();

return compressed;
}

// Compress VarBinViewArray
if let Ok(varbin_view) = VarBinViewArray::try_from(&strings) {
let compressor = compressor.unwrap_or_else(|| {
varbin_view
.with_iterator(|iter| fsst_train_compressor(iter))
.unwrap()
});
return varbin_view
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor))
let compressed = varbin_view
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
.unwrap();

return compressed;
}

panic!(
Expand All @@ -50,22 +44,50 @@ pub fn fsst_compress(strings: Array, compressor: Option<Compressor>) -> FSSTArra
)
}

fn fsst_train_compressor<'a, I>(iter: I) -> Compressor
/// Train a compressor from an array.
///
/// # Panics
///
/// If the provided array is not FSST compressible.
pub fn fsst_train_compressor(array: &Array, sample_size: usize) -> Compressor {
if let Ok(varbin) = VarBinArray::try_from(array) {
varbin
.with_iterator(|iter| fsst_train_compressor_iter(iter, sample_size))
.unwrap()
} else if let Ok(varbin_view) = VarBinViewArray::try_from(array) {
varbin_view
.with_iterator(|iter| fsst_train_compressor_iter(iter, sample_size))
.unwrap()
} else {
panic!(
"cannot fsst_compress array with unsupported encoding {:?}",
array.encoding().id()
)
}
}

/// Train a [compressor][Compressor] from an iterator of bytestrings.
fn fsst_train_compressor_iter<'a, I>(iter: I, sample_size: usize) -> Compressor
where
I: Iterator<Item = Option<&'a [u8]>>,
{
// TODO(aduffy): eliminate the copying.
let mut sample = Vec::with_capacity(1_024 * 1_024);
let mut sample = Vec::with_capacity(sample_size);
for string in iter {
match string {
None => {}
Some(b) => sample.extend_from_slice(b),
}

if sample.len() >= sample_size {
break;
}
}

Compressor::train(&sample)
}

/// Compress from an iterator of bytestrings using FSST.
pub fn fsst_compress_iter<'a, I>(
iter: I,
len: usize,
Expand Down
59 changes: 47 additions & 12 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
use std::collections::HashSet;
use std::fmt::Debug;

use fsst::Compressor;
use vortex::array::{VarBin, VarBinView};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dict::DictArray;
use vortex_dict::{Dict, DictArray};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, FSSTEncoding, FSST};
use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTEncoding, FSST};

use super::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
pub struct FSSTCompressor;

/// Size in bytes of the Symbol table for FSST
const FSST_SYMBOL_TABLE_SIZE: usize = 4_096;

/// We use a 16KB sample of text from the input.
///
/// This value is derived from the FSST paper section 4.4
const DEFAULT_SAMPLE_BYTES: usize = 16 * 1_024;

impl EncodingCompressor for FSSTCompressor {
fn id(&self) -> &str {
FSST::ID.as_ref()
Expand All @@ -27,38 +37,63 @@ impl EncodingCompressor for FSSTCompressor {
return None;
}

// FSST cannot be applied recursively.
if array.encoding().id() == FSST::ID {
return None;
// FSST can be applied on top of VarBin, VarBinView, and Dict encodings.
if array.encoding().id() == VarBin::ID
|| array.encoding().id() == VarBinView::ID
|| array.encoding().id() == Dict::ID
{
return Some(self);
}

Some(self)
// Size-check: FSST has a builtin 4KB overhead due to the symbol table, and usually compresses
// between 2-3x depending on the text quality.
if array.nbytes() > 10 * FSST_SYMBOL_TABLE_SIZE {
return Some(self);
}

None
}

fn compress<'a>(
&'a self,
array: &vortex::Array,
_like: Option<CompressionTree<'a>>,
// TODO(aduffy): reuse compressor from sample run if we have saved it off.
like: Option<CompressionTree<'a>>,
_ctx: SamplingCompressor<'a>,
) -> VortexResult<super::CompressedArray<'a>> {
// TODO(aduffy): use like array to clone the existing symbol table
let compressor = like
.and_then(|mut c| unsafe { c.metadata::<Compressor>() })
.map(|m| {
println!("using pretrained compressor");
m
})
.unwrap_or_else(|| {
println!("training new compressor");
Box::new(fsst_train_compressor(array, DEFAULT_SAMPLE_BYTES))
});

let result_array =
if array.encoding().id() == VarBin::ID || array.encoding().id() == VarBinView::ID {
// For a VarBinArray or VarBinViewArray, compress directly.
fsst_compress(array.clone(), None).into_array()
fsst_compress(array.clone(), compressor.as_ref()).into_array()
} else if let Ok(dict) = DictArray::try_from(array) {
// For a dict array, just compress the values
let values = fsst_compress(dict.values(), None).into_array();
let values = fsst_compress(dict.values(), compressor.as_ref());
let codes = dict.codes();
DictArray::try_new(codes, values)?.into_array()

DictArray::try_new(codes, values.into_array())?.into_array()
} else {
vortex_bail!(
InvalidArgument: "unsupported encoding for FSSTCompressor {:?}",
array.encoding().id()
)
};

Ok(CompressedArray::new(result_array, None))
Ok(CompressedArray::new(
result_array,
// Save a copy of the compressor that was used to compress this array.
Some(CompressionTree::new_with_metadata(self, vec![], compressor)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
Expand Down
Loading

0 comments on commit 47c21a2

Please sign in to comment.