Skip to content

Commit

Permalink
FSSTCompressor
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Aug 20, 2024
1 parent 7e99d94 commit 335636a
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ vortex-dict = { version = "0.7.0", path = "./encodings/dict" }
vortex-dtype = { version = "0.7.0", path = "./vortex-dtype", default-features = false }
vortex-error = { version = "0.7.0", path = "./vortex-error" }
vortex-expr = { version = "0.7.0", path = "./vortex-expr" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fastlanes = { version = "0.7.0", path = "./encodings/fastlanes" }
vortex-flatbuffers = { version = "0.7.0", path = "./vortex-flatbuffers" }
vortex-fsst = { version = "0.7.0", path = "./encodings/fsst" }
vortex-proto = { version = "0.7.0", path = "./vortex-proto" }
vortex-roaring = { version = "0.7.0", path = "./encodings/roaring" }
vortex-runend = { version = "0.7.0", path = "./encodings/runend" }
Expand Down
94 changes: 94 additions & 0 deletions encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Compress a set of values into an Array.

use fsst::{Compressor, Symbol};
use vortex::accessor::ArrayAccessor;
use vortex::array::builder::VarBinBuilder;
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::validity::Validity;
use vortex::{Array, ArrayDType, IntoArray};
use vortex_dtype::DType;

use crate::FSSTArray;

/// Compress an array using FSST. If a compressor is provided, use the existing compressor, else
/// it will train a new compressor directly from the `strings`.
///
/// # Panics
///
/// If the `strings` array is not encoded as either [`VarBinArray`] or [`VarBinViewArray`].
pub fn fsst_compress(strings: Array, compressor: Option<Compressor>) -> FSSTArray {
let len = strings.len();
let dtype = strings.dtype().clone();

// Compress VarBinArray
if let Ok(varbin) = VarBinArray::try_from(&strings) {
let compressor = compressor.unwrap_or_else(|| {
varbin
.with_iterator(|iter| fsst_train_compressor(iter))
.unwrap()
});
return varbin
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor))
.unwrap();
}

// Compress VarBinViewArray
if let Ok(varbin_view) = VarBinViewArray::try_from(&strings) {
let compressor = compressor.unwrap_or_else(|| {
varbin_view
.with_iterator(|iter| fsst_train_compressor(iter))
.unwrap()
});
return varbin_view
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, &compressor))
.unwrap();
}

panic!(
"cannot fsst_compress array with unsupported encoding {:?}",
strings.encoding().id()
)
}

fn fsst_train_compressor<'a, I>(iter: I) -> Compressor
where
I: Iterator<Item = Option<&'a [u8]>>,
{
// TODO(aduffy): eliminate the copying.
let mut sample = Vec::with_capacity(1_024 * 1_024);
for string in iter {
match string {
None => {}
Some(b) => sample.extend_from_slice(b),
}
}

Compressor::train(&sample)
}

pub fn fsst_compress_iter<'a, I>(
iter: I,
len: usize,
dtype: DType,
compressor: &Compressor,
) -> FSSTArray
where
I: Iterator<Item = Option<&'a [u8]>>,
{
let mut builder = VarBinBuilder::<i32>::with_capacity(len);
for string in iter {
match string {
None => builder.push_null(),
Some(s) => builder.push_value(&compressor.compress(s)),
}
}

let codes = builder.finish(dtype.clone());
let symbols_vec: Vec<Symbol> = compressor.symbol_table().to_vec();
// SAFETY: Symbol and u64 are same size
let symbols_u64: Vec<u64> = unsafe { std::mem::transmute(symbols_vec) };
let symbols = PrimitiveArray::from_vec(symbols_u64, Validity::NonNullable);

FSSTArray::try_new(dtype, symbols.into_array(), codes.into_array())
.expect("building FSSTArray from parts")
}
2 changes: 2 additions & 0 deletions encodings/fsst/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
mod array;
mod canonical;
mod compress;
mod compute;

pub use array::*;
pub use compress::*;
2 changes: 2 additions & 0 deletions vortex-sampling-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ edition = { workspace = true }
rust-version = { workspace = true }

[dependencies]
fsst-rs = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
vortex-alp = { workspace = true }
Expand All @@ -22,6 +23,7 @@ vortex-dict = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-fastlanes = { workspace = true }
vortex-fsst = { workspace = true }
vortex-roaring = { workspace = true }
vortex-runend = { workspace = true }
vortex-zigzag = { workspace = true }
Expand Down
65 changes: 65 additions & 0 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::collections::HashSet;

use vortex::array::{VarBinArray, VarBinViewArray};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dict::DictArray;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, FSSTEncoding, FSST};

use super::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;

#[derive(Debug)]
pub struct FSSTCompressor;

impl EncodingCompressor for FSSTCompressor {
fn id(&self) -> &str {
FSST::ID.as_ref()
}

fn can_compress(&self, array: &vortex::Array) -> Option<&dyn EncodingCompressor> {
// FSST arrays must have DType::Utf8.
//
// Note that while it can accept binary data, it is unlikely to perform well.
if !matches!(array.dtype(), &DType::Utf8(_)) {
return None;
}

// FSST cannot be applied recursively.
if array.encoding().id() == FSST::ID {
return None;
}

Some(self)
}

fn compress<'a>(
&'a self,
array: &vortex::Array,
_like: Option<CompressionTree<'a>>,
_ctx: SamplingCompressor<'a>,
) -> VortexResult<super::CompressedArray<'a>> {
// TODO(aduffy): use like array to clone the existing symbol table
let fsst_array =
if VarBinArray::try_from(array).is_ok() || VarBinViewArray::try_from(array).is_ok() {
// For a VarBinArray or VarBinViewArray, compress directly.
fsst_compress(array.clone(), None)
} else if let Ok(dict) = DictArray::try_from(array) {
// For a dict array, just compress the values
fsst_compress(dict.values(), None)
} else {
vortex_bail!(
InvalidArgument: "unsupported encoding for FSSTCompressor {:?}",
array.encoding().id()
)
};

Ok(CompressedArray::new(fsst_array.into_array(), None))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&FSSTEncoding as EncodingRef])
}
}
1 change: 1 addition & 0 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod date_time_parts;
pub mod delta;
pub mod dict;
pub mod r#for;
pub mod fsst;
pub mod roaring_bool;
pub mod roaring_int;
pub mod runend;
Expand Down

0 comments on commit 335636a

Please sign in to comment.