Skip to content

Commit

Permalink
FSSTCompressor (#664)
Browse files Browse the repository at this point in the history
* Implements a new codec, FSSTCompressor using latest version of fsst-rs
library
* Adds new `metadata` field on CompressionTree to allow reuse between
the sampling and compressing stages. For example, we can save the ALP
exponents to not have to calculate them twice. This is very important
for FSST so that we save the overhead of training the table twice
* Adds new compression benchmark using the `lineitem` table's
`l_comment` column, with scalefactor=1, which is just over 6million
rows. By default this is loaded as a ChunkedArray with 733 partitions.
Compressing with FSST enabled takes 1.6s. Compressing on the
canonicalized array takes ~550ms. We should be able to speed this up by
at least ~2x, see
#664 (comment), and
we can potentially do even better. We probably want to be able to FSST
compress a ChunkedArray directly so that we avoid the overhead of
training/compressing each chunk from scratch.
  • Loading branch information
a10y authored Sep 3, 2024
1 parent 06dd5a1 commit fd49140
Show file tree
Hide file tree
Showing 18 changed files with 672 additions and 261 deletions.
295 changes: 163 additions & 132 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fastlanes = "0.1.5"
flatbuffers = "24.3.25"
flexbuffers = "2.0.0"
fs_extra = "1.3.0"
fsst-rs = "0.2.0"
fsst-rs = "0.4.0"
futures = { version = "0.3.30", default-features = false }
futures-executor = "0.3.30"
futures-util = "0.3.30"
Expand Down Expand Up @@ -139,8 +139,9 @@ vortex-dict = { version = "0.7.0", path = "./encodings/dict" }
vortex-dtype = { version = "0.7.0", path = "./vortex-dtype", default-features = false }
vortex-error = { version = "0.7.0", path = "./vortex-error" }
vortex-expr = { version = "0.7.0", path = "./vortex-expr" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fastlanes = { version = "0.7.0", path = "./encodings/fastlanes" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fsst = { version = "0.7.0", path = "./encodings/fsst" }
vortex-proto = { version = "0.7.0", path = "./vortex-proto" }
vortex-roaring = { version = "0.7.0", path = "./encodings/roaring" }
vortex-runend = { version = "0.7.0", path = "./encodings/runend" }
Expand Down Expand Up @@ -169,5 +170,4 @@ panic_in_result_fn = { level = "deny" }
same_name_method = { level = "deny" }
tests_outside_test_module = { level = "deny" }
unwrap_in_result = { level = "deny" }
#unwrap_used = { level = "deny" }
use_debug = { level = "deny" }
4 changes: 4 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,19 @@ bench = false
[[bench]]
name = "compress_benchmark"
harness = false
test = false

[[bench]]
name = "random_access"
test = false
harness = false

[[bench]]
name = "datafusion_benchmark"
test = false
harness = false

[[bench]]
name = "tpch_benchmark"
test = false
harness = false
66 changes: 64 additions & 2 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use bench_vortex::compress_taxi_data;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets;
use bench_vortex::public_bi_data::PBIDataset::Medicare1;
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::{compress_taxi_data, tpch};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use vortex::{IntoArray, IntoCanonical};
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::SamplingCompressor;

fn vortex_compress_taxi(c: &mut Criterion) {
taxi_data_parquet();
Expand All @@ -24,5 +28,63 @@ fn vortex_compress_medicare1(c: &mut Criterion) {
group.finish()
}

criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare1);
fn vortex_compress_tpch(c: &mut Criterion) {
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let lineitem_vortex = rt.block_on(tpch::load_table(
data_dir,
"lineitem",
&tpch::schema::LINEITEM,
));

let compressor = SamplingCompressor::default().excluding(&FSSTCompressor);

let compressor_fsst = SamplingCompressor::default();

// l_comment column only
let mut group = c.benchmark_group("l_comment");
let comments = lineitem_vortex.with_dyn(|a| {
a.as_struct_array_unchecked()
.field_by_name("l_comment")
.unwrap()
});

group.sample_size(10);
group.bench_function("compress-default", |b| {
b.iter_with_large_drop(|| {
std::hint::black_box(compressor.compress(&comments, None)).unwrap()
});
});

group.bench_function("compress-fsst-chunked", |b| {
b.iter_with_large_drop(|| {
std::hint::black_box(compressor_fsst.compress(&comments, None)).unwrap()
});
});

// Compare canonicalizing
let comments_canonical = comments
.into_canonical()
.unwrap()
.into_varbin()
.unwrap()
.into_array();
group.bench_function("compress-fsst-canonicalized", |b| {
b.iter_with_large_drop(|| {
std::hint::black_box(compressor_fsst.compress(&comments_canonical, None)).unwrap()
});
});

group.finish();
}

criterion_group!(
benches,
vortex_compress_taxi,
vortex_compress_medicare1,
vortex_compress_tpch
);
criterion_main!(benches);
21 changes: 12 additions & 9 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ fn random_access_vortex(c: &mut Criterion) {
})
});

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path =
object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap())
.unwrap();
group.sample_size(10).bench_function("R2", |b| {
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path = object_store::path::Path::from_url_path(
taxi_vortex.file_name().unwrap().to_str().unwrap(),
)
.unwrap();

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(&r2_fs, &r2_path, &INDICES)
Expand All @@ -63,18 +65,19 @@ fn random_access_parquet(c: &mut Criterion) {
let mut group = c.benchmark_group("parquet");
group.sample_size(10);

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let taxi_parquet = taxi_data_parquet();
group.bench_function("tokio local disk", |b| {
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) })
});

let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();
group.bench_function("R2", |b| {
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES)
Expand Down
2 changes: 2 additions & 0 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use vortex_sampling_compressor::compressors::alp::ALPCompressor;
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
use vortex_sampling_compressor::compressors::dict::DictCompressor;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::compressors::r#for::FoRCompressor;
use vortex_sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor;
use vortex_sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR;
Expand Down Expand Up @@ -72,6 +73,7 @@ lazy_static! {
&DictCompressor,
&BitPackedCompressor,
&FoRCompressor,
&FSSTCompressor,
&DateTimePartsCompressor,
&DEFAULT_RUN_END_COMPRESSOR,
&RoaringBoolCompressor,
Expand Down
37 changes: 37 additions & 0 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,43 @@ async fn register_vortex(
Ok(())
}

/// Load a table as an uncompressed Vortex array.
pub async fn load_table(data_dir: impl AsRef<Path>, name: &str, schema: &Schema) -> Array {
// Create a local session to load the CSV file from the path.
let path = data_dir
.as_ref()
.to_owned()
.join(format!("{name}.tbl"))
.to_str()
.unwrap()
.to_string();
let record_batches = SessionContext::new()
.read_csv(
&path,
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await
.unwrap()
.collect()
.await
.unwrap();

let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(ArrowStructArray::from)
.map(|struct_array| Array::from_arrow(&struct_array, false))
.collect();

let dtype = chunks[0].dtype().clone();

ChunkedArray::try_new(chunks, dtype).unwrap().into_array()
}

pub fn tpch_queries() -> impl Iterator<Item = (usize, Vec<String>)> {
(1..=22).map(|q| (q, tpch_query(q)))
}
Expand Down
62 changes: 48 additions & 14 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use fsst::{Decompressor, Symbol, MAX_CODE};
use fsst::{Decompressor, Symbol};
use serde::{Deserialize, Serialize};
use vortex::stats::{ArrayStatisticsCompute, StatsSet};
use vortex::validity::{ArrayValidity, LogicalValidity};
Expand All @@ -13,6 +13,7 @@ use vortex_error::{vortex_bail, VortexResult};
impl_encoding!("vortex.fsst", 24u16, FSST);

static SYMBOLS_DTYPE: DType = DType::Primitive(PType::U64, Nullability::NonNullable);
static SYMBOL_LENS_DTYPE: DType = DType::Primitive(PType::U8, Nullability::NonNullable);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FSSTMetadata {
Expand All @@ -29,26 +30,39 @@ impl FSSTArray {
/// The `codes` array is a Binary array where each binary datum is a sequence of 8-bit codes.
/// Each code corresponds either to a symbol, or to the "escape code",
/// which tells the decoder to emit the following byte without doing a table lookup.
pub fn try_new(dtype: DType, symbols: Array, codes: Array) -> VortexResult<Self> {
pub fn try_new(
dtype: DType,
symbols: Array,
symbol_lengths: Array,
codes: Array,
) -> VortexResult<Self> {
// Check: symbols must be a u64 array
if symbols.dtype() != &DType::Primitive(PType::U64, Nullability::NonNullable) {
if symbols.dtype() != &SYMBOLS_DTYPE {
vortex_bail!(InvalidArgument: "symbols array must be of type u64")
}

if symbol_lengths.dtype() != &SYMBOL_LENS_DTYPE {
vortex_bail!(InvalidArgument: "symbol_lengths array must be of type u8")
}

// Check: symbols must not have length > MAX_CODE
if symbols.len() > MAX_CODE as usize {
vortex_bail!(InvalidArgument: "symbols array must have length <= 255")
if symbols.len() > 255 {
vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
}

if symbols.len() != symbol_lengths.len() {
vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length");
}

// Check: strings must be a Binary array.
if !matches!(codes.dtype(), DType::Binary(_)) {
vortex_bail!(InvalidArgument: "strings array must be DType::Binary type");
vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
}

let symbols_len = symbols.len();
let len = codes.len();
let strings_dtype = codes.dtype().clone();
let children = Arc::new([symbols, codes]);
let children = Arc::new([symbols, symbol_lengths, codes]);

Self::try_from_parts(
dtype,
Expand All @@ -69,18 +83,28 @@ impl FSSTArray {
.expect("FSSTArray must have a symbols child array")
}

/// Access the symbol table array
pub fn symbol_lengths(&self) -> Array {
self.array()
.child(1, &SYMBOL_LENS_DTYPE, self.metadata().symbols_len)
.expect("FSSTArray must have a symbols child array")
}

/// Access the codes array
pub fn codes(&self) -> Array {
self.array()
.child(1, &self.metadata().codes_dtype, self.len())
.child(2, &self.metadata().codes_dtype, self.len())
.expect("FSSTArray must have a codes child array")
}

/// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
/// this array.
/// this array, and pass it to the given function.
///
/// This is private to the crate to avoid leaking `fsst` as part of the public API.
pub(crate) fn decompressor(&self) -> Decompressor {
/// This is private to the crate to avoid leaking `fsst-rs` types as part of the public API.
pub(crate) fn with_decompressor<F, R>(&self, apply: F) -> R
where
F: FnOnce(Decompressor) -> R,
{
// canonicalize the symbols child array, so we can view it contiguously
let symbols_array = self
.symbols()
Expand All @@ -90,18 +114,28 @@ impl FSSTArray {
.expect("Symbols must be a Primitive Array");
let symbols = symbols_array.maybe_null_slice::<u64>();

let symbol_lengths_array = self
.symbol_lengths()
.into_canonical()
.unwrap()
.into_primitive()
.unwrap();
let symbol_lengths = symbol_lengths_array.maybe_null_slice::<u8>();

// Transmute the 64-bit symbol values into fsst `Symbol`s.
// SAFETY: Symbol is guaranteed to be 8 bytes, guaranteed by the compiler.
let symbols = unsafe { std::mem::transmute::<&[u64], &[Symbol]>(symbols) };

// Build a new decompressor that uses these symbols.
Decompressor::new(symbols)
let decompressor = Decompressor::new(symbols, symbol_lengths);
apply(decompressor)
}
}

impl AcceptArrayVisitor for FSSTArray {
fn accept(&self, _visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
todo!("implement this")
fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("symbols", &self.symbols())?;
visitor.visit_child("codes", &self.codes())
}
}

Expand Down
Loading

0 comments on commit fd49140

Please sign in to comment.