From 335636a5e868589b6a64207033b5506d3f3742b5 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 20 Aug 2024 19:49:50 -0400 Subject: [PATCH] FSSTCompressor --- Cargo.lock | 2 + Cargo.toml | 3 +- encodings/fsst/src/compress.rs | 94 +++++++++++++++++++ encodings/fsst/src/lib.rs | 2 + vortex-sampling-compressor/Cargo.toml | 2 + .../src/compressors/fsst.rs | 65 +++++++++++++ .../src/compressors/mod.rs | 1 + 7 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 encodings/fsst/src/compress.rs create mode 100644 vortex-sampling-compressor/src/compressors/fsst.rs diff --git a/Cargo.lock b/Cargo.lock index af63264208..5877257d39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4495,6 +4495,7 @@ name = "vortex-sampling-compressor" version = "0.7.0" dependencies = [ "chrono", + "fsst-rs", "log", "rand", "vortex-alp", @@ -4505,6 +4506,7 @@ dependencies = [ "vortex-dtype", "vortex-error", "vortex-fastlanes", + "vortex-fsst", "vortex-roaring", "vortex-runend", "vortex-zigzag", diff --git a/Cargo.toml b/Cargo.toml index 4db08c788c..46d5d83fa2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,8 +138,9 @@ vortex-dict = { version = "0.7.0", path = "./encodings/dict" } vortex-dtype = { version = "0.7.0", path = "./vortex-dtype", default-features = false } vortex-error = { version = "0.7.0", path = "./vortex-error" } vortex-expr = { version = "0.7.0", path = "./vortex-expr" } -vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" } vortex-fastlanes = { version = "0.7.0", path = "./encodings/fastlanes" } +vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" } +vortex-fsst = { version = "0.7.0", path = "./encodings/fsst" } vortex-proto = { version = "0.7.0", path = "./vortex-proto" } vortex-roaring = { version = "0.7.0", path = "./encodings/roaring" } vortex-runend = { version = "0.7.0", path = "./encodings/runend" } diff --git a/encodings/fsst/src/compress.rs b/encodings/fsst/src/compress.rs new file mode 100644 index 0000000000..5a297d421a --- /dev/null +++ b/encodings/fsst/src/compress.rs @@ -0,0 +1,94 @@ +// Compress a set of values into an Array. + +use fsst::{Compressor, Symbol}; +use vortex::accessor::ArrayAccessor; +use vortex::array::builder::VarBinBuilder; +use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray}; +use vortex::validity::Validity; +use vortex::{Array, ArrayDType, IntoArray}; +use vortex_dtype::DType; + +use crate::FSSTArray; + +/// Compress an array using FSST. If a compressor is provided, use the existing compressor, else +/// it will train a new compressor directly from the `strings`. +/// +/// # Panics +/// +/// If the `strings` array is not encoded as either [`VarBinArray`] or [`VarBinViewArray`]. +pub fn fsst_compress(strings: Array, compressor: Option) -> FSSTArray { + let len = strings.len(); + let dtype = strings.dtype().clone(); + + // Compress VarBinArray + if let Ok(varbin) = VarBinArray::try_from(&strings) { + let compressor = compressor.unwrap_or_else(|| { + varbin + .with_iterator(|iter| fsst_train_compressor(iter)) + .unwrap() + }); + return varbin + .with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor)) + .unwrap(); + } + + // Compress VarBinViewArray + if let Ok(varbin_view) = VarBinViewArray::try_from(&strings) { + let compressor = compressor.unwrap_or_else(|| { + varbin_view + .with_iterator(|iter| fsst_train_compressor(iter)) + .unwrap() + }); + return varbin_view + .with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor)) + .unwrap(); + } + + panic!( + "cannot fsst_compress array with unsupported encoding {:?}", + strings.encoding().id() + ) +} + +fn fsst_train_compressor<'a, I>(iter: I) -> Compressor +where + I: Iterator>, +{ + // TODO(aduffy): eliminate the copying. + let mut sample = Vec::with_capacity(1_024 * 1_024); + for string in iter { + match string { + None => {} + Some(b) => sample.extend_from_slice(b), + } + } + + Compressor::train(&sample) +} + +pub fn fsst_compress_iter<'a, I>( + iter: I, + len: usize, + dtype: DType, + compressor: &Compressor, +) -> FSSTArray +where + I: Iterator>, +{ + let mut builder = VarBinBuilder::::with_capacity(len); + for string in iter { + match string { + None => builder.push_null(), + Some(s) => builder.push_value(&compressor.compress(s)), + } + } + + let codes = builder.finish(dtype.clone()); + let symbols_vec: Vec = compressor.symbol_table().to_vec(); + // SAFETY: Symbol and u64 are same size + let symbols_u64: Vec = unsafe { std::mem::transmute(symbols_vec) }; + let symbols = PrimitiveArray::from_vec(symbols_u64, Validity::NonNullable); + + FSSTArray::try_new(dtype, symbols.into_array(), codes.into_array()) + .expect("building FSSTArray from parts") +} diff --git a/encodings/fsst/src/lib.rs b/encodings/fsst/src/lib.rs index 4222be2be3..e5cc58b54f 100644 --- a/encodings/fsst/src/lib.rs +++ b/encodings/fsst/src/lib.rs @@ -10,6 +10,8 @@ mod array; mod canonical; +mod compress; mod compute; pub use array::*; +pub use compress::*; diff --git a/vortex-sampling-compressor/Cargo.toml b/vortex-sampling-compressor/Cargo.toml index 64001c33ac..4e345eab0f 100644 --- a/vortex-sampling-compressor/Cargo.toml +++ b/vortex-sampling-compressor/Cargo.toml @@ -12,6 +12,7 @@ edition = { workspace = true } rust-version = { workspace = true } [dependencies] +fsst-rs = { workspace = true } log = { workspace = true } rand = { workspace = true } vortex-alp = { workspace = true } @@ -22,6 +23,7 @@ vortex-dict = { workspace = true } vortex-dtype = { workspace = true } vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } +vortex-fsst = { workspace = true } vortex-roaring = { workspace = true } vortex-runend = { workspace = true } vortex-zigzag = { workspace = true } diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs new file mode 100644 index 0000000000..776bc93e42 --- /dev/null +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -0,0 +1,65 @@ +use std::collections::HashSet; + +use vortex::array::{VarBinArray, VarBinViewArray}; +use vortex::encoding::EncodingRef; +use vortex::{ArrayDType, ArrayDef, IntoArray}; +use vortex_dict::DictArray; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_fsst::{fsst_compress, FSSTEncoding, FSST}; + +use super::{CompressedArray, CompressionTree, EncodingCompressor}; +use crate::SamplingCompressor; + +#[derive(Debug)] +pub struct FSSTCompressor; + +impl EncodingCompressor for FSSTCompressor { + fn id(&self) -> &str { + FSST::ID.as_ref() + } + + fn can_compress(&self, array: &vortex::Array) -> Option<&dyn EncodingCompressor> { + // FSST arrays must have DType::Utf8. + // + // Note that while it can accept binary data, it is unlikely to perform well. + if !matches!(array.dtype(), &DType::Utf8(_)) { + return None; + } + + // FSST cannot be applied recursively. + if array.encoding().id() == FSST::ID { + return None; + } + + Some(self) + } + + fn compress<'a>( + &'a self, + array: &vortex::Array, + _like: Option>, + _ctx: SamplingCompressor<'a>, + ) -> VortexResult> { + // TODO(aduffy): use like array to clone the existing symbol table + let fsst_array = + if VarBinArray::try_from(array).is_ok() || VarBinViewArray::try_from(array).is_ok() { + // For a VarBinArray or VarBinViewArray, compress directly. + fsst_compress(array.clone(), None) + } else if let Ok(dict) = DictArray::try_from(array) { + // For a dict array, just compress the values + fsst_compress(dict.values(), None) + } else { + vortex_bail!( + InvalidArgument: "unsupported encoding for FSSTCompressor {:?}", + array.encoding().id() + ) + }; + + Ok(CompressedArray::new(fsst_array.into_array(), None)) + } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&FSSTEncoding as EncodingRef]) + } +} diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index 5f7d1b5640..36bb4c4c6f 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -15,6 +15,7 @@ pub mod date_time_parts; pub mod delta; pub mod dict; pub mod r#for; +pub mod fsst; pub mod roaring_bool; pub mod roaring_int; pub mod runend;