diff --git a/Cargo.lock b/Cargo.lock index 1062bae3e3..44c06c4b7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1549,6 +1549,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsst-rs" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae2cf518ae03eeef49886cc339ab26bc5bb9877f4a502c28fe9cd8bdf98db21" + [[package]] name = "futures" version = "0.3.30" @@ -2908,7 +2914,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.13.0", "log", "multimap", @@ -4402,6 +4408,20 @@ dependencies = [ "flatbuffers", ] +[[package]] +name = "vortex-fsst" +version = "0.7.0" +dependencies = [ + "arrow-array", + "fsst-rs", + "serde", + "vortex-array", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-scalar", +] + [[package]] name = "vortex-fuzz" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index a7497ea46f..8aa7004c6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ fastlanes = "0.1.5" flatbuffers = "24.3.25" flexbuffers = "2.0.0" fs_extra = "1.3.0" +fsst-rs = "0.1.0" futures = { version = "0.3.30", default-features = false } futures-executor = "0.3.30" futures-util = "0.3.30" diff --git a/encodings/fsst/Cargo.toml b/encodings/fsst/Cargo.toml new file mode 100644 index 0000000000..3eda98b7ba --- /dev/null +++ b/encodings/fsst/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "vortex-fsst" +version = { workspace = true } +description = "Vortex FSST string array encoding" +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +keywords = { workspace = true } +include = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +arrow-array = { workspace = true } +fsst-rs = { workspace = true } +serde = { workspace = true } + +vortex-array = { workspace = true } +vortex-buffer = { workspace = true } +vortex-dtype = { workspace = true } +vortex-error = { workspace = true } +vortex-scalar = { workspace = true } diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs new file mode 100644 index 0000000000..d59b7b57b8 --- /dev/null +++ b/encodings/fsst/src/array.rs @@ -0,0 +1,129 @@ +use std::sync::Arc; + +use fsst::{Decompressor, Symbol, MAX_CODE}; +use serde::{Deserialize, Serialize}; +use vortex::stats::{ArrayStatisticsCompute, StatsSet}; +use vortex::validity::{ArrayValidity, LogicalValidity}; +use vortex::variants::{ArrayVariants, BinaryArrayTrait, Utf8ArrayTrait}; +use vortex::visitor::AcceptArrayVisitor; +use vortex::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoCanonical}; +use vortex_dtype::{DType, Nullability, PType}; +use vortex_error::{vortex_bail, VortexResult}; + +impl_encoding!("vortex.fsst", 24u16, FSST); + +static SYMBOLS_DTYPE: DType = DType::Primitive(PType::U64, Nullability::NonNullable); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FSSTMetadata { + symbols_len: usize, + codes_dtype: DType, +} + +impl FSSTArray { + // Build an FSST array from a set of `symbols` and `codes`. + // + // Symbols are 8-bytes and can represent short strings, each of which is assigned + // a code. + // + // The `codes` array is a Binary array where each binary datum is a sequence of 8-bit codes. + // Each code corresponds either to an offset in the `symbols` table, or to the "escape code", + // which tells the decoder to emit the following byte without doing a table lookup. + pub fn try_new(dtype: DType, symbols: Array, codes: Array) -> VortexResult { + // Check: symbols must be a u64 array + if symbols.dtype() != &DType::Primitive(PType::U64, Nullability::NonNullable) { + vortex_bail!(InvalidArgument: "symbols array must be of type u64") + } + + // Check: symbols must not have length > 255 + if symbols.len() > MAX_CODE as usize { + vortex_bail!(InvalidArgument: "symbols array must have length <= 255") + } + + // Check: strings must be a Binary array. + if !matches!(codes.dtype(), DType::Binary(_)) { + vortex_bail!(InvalidArgument: "strings array must be DType::Binary type"); + } + + let symbols_len = symbols.len(); + let len = codes.len(); + let strings_dtype = codes.dtype().clone(); + let children = Arc::new([symbols, codes]); + + Self::try_from_parts( + dtype, + len, + FSSTMetadata { + symbols_len, + codes_dtype: strings_dtype, + }, + children, + StatsSet::new(), + ) + } + + /// Access the symbol table array + pub fn symbols(&self) -> Array { + self.array() + .child(0, &SYMBOLS_DTYPE, self.metadata().symbols_len) + .expect("FSSTArray must have a symbols child array") + } + + /// Access the codes array + pub fn codes(&self) -> Array { + self.array() + .child(1, &self.metadata().codes_dtype, self.len()) + .expect("FSSTArray must have a codes child array") + } + + /// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from + /// this array. + /// + /// This is private to the crate to avoid leaking `fsst` as part of the public API. + pub(crate) fn decompressor(&self) -> VortexResult { + // canonicalize the symbols child array, so we can view it contiguously + let symbols_array = self.symbols().into_canonical()?.into_primitive()?; + let symbols = symbols_array.maybe_null_slice::(); + + // Transmute the 64-bit symbol values into fsst `Symbol`s. + // SAFETY: Symbol is guaranteed to be 8 bytes, guaranteed by the compiler. + let symbols = unsafe { std::mem::transmute::<&[u64], &[Symbol]>(symbols) }; + + // Build a new decompressor that uses these symbols. + Ok(Decompressor::new(symbols)) + } +} + +impl AcceptArrayVisitor for FSSTArray { + fn accept(&self, _visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> { + todo!("implement this") + } +} + +impl ArrayStatisticsCompute for FSSTArray {} + +impl ArrayValidity for FSSTArray { + fn is_valid(&self, index: usize) -> bool { + self.codes().with_dyn(|a| a.is_valid(index)) + } + + fn logical_validity(&self) -> LogicalValidity { + self.codes().with_dyn(|a| a.logical_validity()) + } +} + +impl ArrayVariants for FSSTArray { + fn as_binary_array(&self) -> Option<&dyn BinaryArrayTrait> { + Some(self) + } + + fn as_utf8_array(&self) -> Option<&dyn Utf8ArrayTrait> { + Some(self) + } +} + +impl Utf8ArrayTrait for FSSTArray {} + +impl BinaryArrayTrait for FSSTArray {} + +impl ArrayTrait for FSSTArray {} diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs new file mode 100644 index 0000000000..603edf52a5 --- /dev/null +++ b/encodings/fsst/src/canonical.rs @@ -0,0 +1,56 @@ +use arrow_array::builder::GenericByteBuilder; +use arrow_array::types::BinaryType; +use fsst::Symbol; +use vortex::array::VarBinArray; +use vortex::arrow::FromArrowArray; +use vortex::validity::ArrayValidity; +use vortex::{ArrayDType, Canonical, IntoCanonical}; +use vortex_error::VortexResult; + +use crate::FSSTArray; + +impl IntoCanonical for FSSTArray { + fn into_canonical(self) -> VortexResult { + let decompressor = self.decompressor()?; + + // Note: the maximum amount of decompressed space for an FSST array is 8 * n_elements, + // as each code can expand into a symbol of 1-8 bytes. + let max_items = self.len(); + let max_bytes = self.codes().nbytes() * size_of::(); + + // Create the target Arrow binary array + // TODO(aduffy): switch to BinaryView when PR https://github.com/spiraldb/vortex/pull/476 merges + let mut builder = GenericByteBuilder::::with_capacity(max_items, max_bytes); + + // TODO(aduffy): add decompression functions that support writing directly into and output buffer. + let codes_array = self.codes().into_canonical()?.into_varbin()?; + + // TODO(aduffy): make this loop faster. + for idx in 0..self.len() { + if !codes_array.is_valid(idx) { + builder.append_null() + } else { + let compressed = codes_array.bytes_at(idx)?; + let value = decompressor.decompress(compressed.as_slice()); + builder.append_value(value) + } + } + + let arrow_array = builder.finish(); + + // Force the DTYpe + let canonical_varbin = VarBinArray::try_from(&vortex::Array::from_arrow( + &arrow_array, + self.dtype().is_nullable(), + ))?; + + let forced_dtype = VarBinArray::try_new( + canonical_varbin.offsets(), + canonical_varbin.bytes(), + self.dtype().clone(), + canonical_varbin.validity(), + )?; + + Ok(Canonical::VarBin(forced_dtype)) + } +} diff --git a/encodings/fsst/src/compute.rs b/encodings/fsst/src/compute.rs new file mode 100644 index 0000000000..a52364c4ce --- /dev/null +++ b/encodings/fsst/src/compute.rs @@ -0,0 +1,90 @@ +use vortex::compute::unary::{scalar_at, ScalarAtFn}; +use vortex::compute::{filter, slice, take, ArrayCompute, FilterFn, SliceFn, TakeFn}; +use vortex::validity::ArrayValidity; +use vortex::{Array, ArrayDType, IntoArray}; +use vortex_buffer::{Buffer, BufferString}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_scalar::{Scalar, ScalarValue}; + +use crate::FSSTArray; + +impl ArrayCompute for FSSTArray { + fn slice(&self) -> Option<&dyn SliceFn> { + Some(self) + } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } + + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } + + fn filter(&self) -> Option<&dyn FilterFn> { + Some(self) + } +} + +impl SliceFn for FSSTArray { + fn slice(&self, start: usize, stop: usize) -> VortexResult { + // Slicing an FSST array leaves the symbol table unmodified, + // only slicing the `codes` array. + Ok(Self::try_new( + self.dtype().clone(), + self.symbols(), + slice(&self.codes(), start, stop)?, + )? + .into_array()) + } +} + +impl TakeFn for FSSTArray { + // Take on an FSSTArray is a simple take on the codes array. + fn take(&self, indices: &Array) -> VortexResult { + let new_codes = take(&self.codes(), indices)?; + + Ok(Self::try_new(self.dtype().clone(), self.symbols(), new_codes)?.into_array()) + } +} + +impl ScalarAtFn for FSSTArray { + fn scalar_at(&self, index: usize) -> VortexResult { + // Check validity and short-circuit to null + if !self.is_valid(index) { + return Ok(Scalar::null(self.dtype().clone())); + } + + let compressed = scalar_at(&self.codes(), index)?; + let binary_datum = match compressed.value().as_buffer()? { + Some(b) => b, + None => vortex_bail!("non-nullable scalar must unwrap"), + }; + + let decompressor = self.decompressor()?; + let decoded_buffer: Buffer = decompressor.decompress(binary_datum.as_slice()).into(); + + if matches!(self.dtype(), &DType::Utf8(_)) { + // SAFETY: a UTF-8 FSSTArray can only be compressed from a known-good UTF-8 array, no need to revalidate. + let buffer_string = unsafe { BufferString::new_unchecked(decoded_buffer) }; + Ok(Scalar::new( + self.dtype().clone(), + ScalarValue::BufferString(buffer_string), + )) + } else { + Ok(Scalar::new( + self.dtype().clone(), + ScalarValue::Buffer(decoded_buffer), + )) + } + } +} + +impl FilterFn for FSSTArray { + // Filtering an FSSTArray filters the codes array, leaving the symbols array untouched + fn filter(&self, predicate: &Array) -> VortexResult { + let filtered_codes = filter(&self.codes(), predicate)?; + Ok(Self::try_new(self.dtype().clone(), self.symbols(), filtered_codes)?.into_array()) + } +} diff --git a/encodings/fsst/src/lib.rs b/encodings/fsst/src/lib.rs new file mode 100644 index 0000000000..4222be2be3 --- /dev/null +++ b/encodings/fsst/src/lib.rs @@ -0,0 +1,15 @@ +//! An array that uses the [Fast Static Symbol Table][fsst] compression scheme +//! to compress string arrays. +//! +//! FSST arrays can generally compress string data up to 2x through the use of +//! string tables. The string table is static for an entire array, and occupies +//! up to 2048 bytes of buffer space. Thus, FSST is only worth reaching for when +//! dealing with larger arrays of potentially hundreds of kilobytes or more. +//! +//! [fsst]: https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf + +mod array; +mod canonical; +mod compute; + +pub use array::*; diff --git a/encodings/fsst/tests/fsst_tests.rs b/encodings/fsst/tests/fsst_tests.rs new file mode 100644 index 0000000000..af4868bc6c --- /dev/null +++ b/encodings/fsst/tests/fsst_tests.rs @@ -0,0 +1,145 @@ +#![cfg(test)] + +use arrow_array::builder::BinaryBuilder; +use fsst::{Compressor, Symbol}; +use vortex::array::{BoolArray, PrimitiveArray}; +use vortex::arrow::FromArrowArray; +use vortex::compute::unary::scalar_at; +use vortex::compute::{filter, slice, take}; +use vortex::validity::Validity; +use vortex::{ArrayDType, ArrayDef, IntoArray}; +use vortex_dtype::{DType, Nullability, PType}; +use vortex_fsst::{FSSTArray, FSST}; + +fn build_fsst_utf8_array() -> FSSTArray { + let compressor = Compressor::train("the quick brown fox jumped over the lazy dog"); + + let symbols = compressor.symbol_table(); + + // SAFETY: Symbol and u64 have same size, enforced by compiler + let symbols_u64 = unsafe { std::mem::transmute::<&[Symbol], &[u64]>(symbols) }; + let mut symbols_vec = Vec::new(); + symbols_vec.extend_from_slice(symbols_u64); + + let symbols_array = PrimitiveArray::from_vec(symbols_vec, Validity::NonNullable).into_array(); + assert_eq!( + symbols_array.dtype(), + &DType::Primitive(PType::U64, Nullability::NonNullable) + ); + + let mut codes = BinaryBuilder::new(); + + codes.append_value( + compressor + .compress("The Greeks never said that the limit could not he overstepped".as_bytes()), + ); + codes.append_value( + compressor.compress( + "They said it existed and that whoever dared to exceed it was mercilessly struck down" + .as_bytes(), + ), + ); + codes.append_value( + compressor.compress("Nothing in present history can contradict them".as_bytes()), + ); + + let codes = codes.finish(); + let codes_array = vortex::Array::from_arrow(&codes, false); + + FSSTArray::try_new( + DType::Utf8(Nullability::NonNullable), + symbols_array, + codes_array, + ) + .expect("building from parts must succeed") +} + +macro_rules! assert_nth_scalar { + ($arr:expr, $n:expr, $expected:expr) => { + assert_eq!(scalar_at(&$arr, $n).unwrap(), $expected.try_into().unwrap()); + }; +} + +#[test] +fn test_compute() { + let fsst_array = build_fsst_utf8_array().into_array(); + + assert_eq!(fsst_array.len(), 3); + + // + // ScalarAtFn + // + { + assert_nth_scalar!( + fsst_array, + 0, + "The Greeks never said that the limit could not he overstepped" + ); + assert_nth_scalar!( + fsst_array, + 1, + "They said it existed and that whoever dared to exceed it was mercilessly struck down" + ); + assert_nth_scalar!( + fsst_array, + 2, + "Nothing in present history can contradict them" + ); + } + + // + // SliceFn + // + { + let fsst_sliced = slice(&fsst_array, 1, 3).unwrap(); + assert_eq!(fsst_sliced.encoding().id(), FSST::ENCODING.id()); + assert_eq!(fsst_sliced.len(), 2); + assert_nth_scalar!( + fsst_sliced, + 0, + "They said it existed and that whoever dared to exceed it was mercilessly struck down" + ); + assert_nth_scalar!( + fsst_sliced, + 1, + "Nothing in present history can contradict them" + ); + } + + // + // TakeFn + // + { + let indices = PrimitiveArray::from_vec(vec![0, 2], Validity::NonNullable).into_array(); + let fsst_taken = take(&fsst_array, &indices).unwrap(); + assert_eq!(fsst_taken.len(), 2); + assert_nth_scalar!( + fsst_taken, + 0, + "The Greeks never said that the limit could not he overstepped" + ); + assert_nth_scalar!( + fsst_taken, + 1, + "Nothing in present history can contradict them" + ); + } + + // + // FilterFn + // + + { + let predicate = + BoolArray::from_vec(vec![false, true, false], Validity::NonNullable).into_array(); + + let fsst_filtered = filter(&fsst_array, &predicate).unwrap(); + assert_eq!(fsst_filtered.encoding().id(), FSST::ENCODING.id()); + assert_eq!(fsst_filtered.len(), 1); + assert_nth_scalar!( + fsst_filtered, + 0, + "They said it existed and that whoever dared to exceed it was mercilessly struck down" + ); + } +}