Skip to content

Commit

Permalink
Add FSSTArray
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Aug 16, 2024
1 parent 19e993c commit 2d49d71
Show file tree
Hide file tree
Showing 8 changed files with 483 additions and 1 deletion.
22 changes: 21 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fastlanes = "0.1.5"
flatbuffers = "24.3.25"
flexbuffers = "2.0.0"
fs_extra = "1.3.0"
fsst-rs = "0.1.0"
futures = { version = "0.3.30", default-features = false }
futures-executor = "0.3.30"
futures-util = "0.3.30"
Expand Down
26 changes: 26 additions & 0 deletions encodings/fsst/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "vortex-fsst"
version = { workspace = true }
description = "Vortex FSST string array encoding"
homepage = { workspace = true }
repository = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
keywords = { workspace = true }
include = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }

[lints]
workspace = true

[dependencies]
arrow-array = { workspace = true }
fsst-rs = { workspace = true }
serde = { workspace = true }

vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-scalar = { workspace = true }
129 changes: 129 additions & 0 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::sync::Arc;

use fsst::{Decompressor, Symbol, MAX_CODE};
use serde::{Deserialize, Serialize};
use vortex::stats::{ArrayStatisticsCompute, StatsSet};
use vortex::validity::{ArrayValidity, LogicalValidity};
use vortex::variants::{ArrayVariants, BinaryArrayTrait, Utf8ArrayTrait};
use vortex::visitor::AcceptArrayVisitor;
use vortex::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoCanonical};
use vortex_dtype::{DType, Nullability, PType};
use vortex_error::{vortex_bail, VortexResult};

impl_encoding!("vortex.fsst", 24u16, FSST);

static SYMBOLS_DTYPE: DType = DType::Primitive(PType::U64, Nullability::NonNullable);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FSSTMetadata {
symbols_len: usize,
codes_dtype: DType,
}

impl FSSTArray {
// Build an FSST array from a set of `symbols` and `codes`.
//
// Symbols are 8-bytes and can represent short strings, each of which is assigned
// a code.
//
// The `codes` array is a Binary array where each binary datum is a sequence of 8-bit codes.
// Each code corresponds either to an offset in the `symbols` table, or to the "escape code",
// which tells the decoder to emit the following byte without doing a table lookup.
pub fn try_new(dtype: DType, symbols: Array, codes: Array) -> VortexResult<Self> {
// Check: symbols must be a u64 array
if symbols.dtype() != &DType::Primitive(PType::U64, Nullability::NonNullable) {
vortex_bail!(InvalidArgument: "symbols array must be of type u64")
}

// Check: symbols must not have length > 255
if symbols.len() > MAX_CODE as usize {
vortex_bail!(InvalidArgument: "symbols array must have length <= 255")
}

// Check: strings must be a Binary array.
if !matches!(codes.dtype(), DType::Binary(_)) {
vortex_bail!(InvalidArgument: "strings array must be DType::Binary type");
}

let symbols_len = symbols.len();
let len = codes.len();
let strings_dtype = codes.dtype().clone();
let children = Arc::new([symbols, codes]);

Self::try_from_parts(
dtype,
len,
FSSTMetadata {
symbols_len,
codes_dtype: strings_dtype,
},
children,
StatsSet::new(),
)
}

/// Access the symbol table array
pub fn symbols(&self) -> Array {
self.array()
.child(0, &SYMBOLS_DTYPE, self.metadata().symbols_len)
.expect("FSSTArray must have a symbols child array")
}

/// Access the codes array
pub fn codes(&self) -> Array {
self.array()
.child(1, &self.metadata().codes_dtype, self.len())
.expect("FSSTArray must have a codes child array")
}

/// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
/// this array.
///
/// This is private to the crate to avoid leaking `fsst` as part of the public API.
pub(crate) fn decompressor(&self) -> VortexResult<Decompressor> {
// canonicalize the symbols child array, so we can view it contiguously
let symbols_array = self.symbols().into_canonical()?.into_primitive()?;
let symbols = symbols_array.maybe_null_slice::<u64>();

// Transmute the 64-bit symbol values into fsst `Symbol`s.
// SAFETY: Symbol is guaranteed to be 8 bytes, guaranteed by the compiler.
let symbols = unsafe { std::mem::transmute::<&[u64], &[Symbol]>(symbols) };

// Build a new decompressor that uses these symbols.
Ok(Decompressor::new(symbols))
}
}

impl AcceptArrayVisitor for FSSTArray {
fn accept(&self, _visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
todo!("implement this")
}
}

impl ArrayStatisticsCompute for FSSTArray {}

impl ArrayValidity for FSSTArray {
fn is_valid(&self, index: usize) -> bool {
self.codes().with_dyn(|a| a.is_valid(index))
}

fn logical_validity(&self) -> LogicalValidity {
self.codes().with_dyn(|a| a.logical_validity())
}
}

impl ArrayVariants for FSSTArray {
fn as_binary_array(&self) -> Option<&dyn BinaryArrayTrait> {
Some(self)
}

fn as_utf8_array(&self) -> Option<&dyn Utf8ArrayTrait> {
Some(self)
}
}

impl Utf8ArrayTrait for FSSTArray {}

impl BinaryArrayTrait for FSSTArray {}

impl ArrayTrait for FSSTArray {}
56 changes: 56 additions & 0 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use arrow_array::builder::GenericByteBuilder;
use arrow_array::types::BinaryType;
use fsst::Symbol;
use vortex::array::VarBinArray;
use vortex::arrow::FromArrowArray;
use vortex::validity::ArrayValidity;
use vortex::{ArrayDType, Canonical, IntoCanonical};
use vortex_error::VortexResult;

use crate::FSSTArray;

impl IntoCanonical for FSSTArray {
fn into_canonical(self) -> VortexResult<Canonical> {
let decompressor = self.decompressor()?;

// Note: the maximum amount of decompressed space for an FSST array is 8 * n_elements,
// as each code can expand into a symbol of 1-8 bytes.
let max_items = self.len();
let max_bytes = self.codes().nbytes() * size_of::<Symbol>();

// Create the target Arrow binary array
// TODO(aduffy): switch to BinaryView when PR https://github.com/spiraldb/vortex/pull/476 merges
let mut builder = GenericByteBuilder::<BinaryType>::with_capacity(max_items, max_bytes);

// TODO(aduffy): add decompression functions that support writing directly into and output buffer.
let codes_array = self.codes().into_canonical()?.into_varbin()?;

// TODO(aduffy): make this loop faster.
for idx in 0..self.len() {
if !codes_array.is_valid(idx) {
builder.append_null()
} else {
let compressed = codes_array.bytes_at(idx)?;
let value = decompressor.decompress(compressed.as_slice());
builder.append_value(value)
}
}

let arrow_array = builder.finish();

// Force the DTYpe
let canonical_varbin = VarBinArray::try_from(&vortex::Array::from_arrow(
&arrow_array,
self.dtype().is_nullable(),
))?;

let forced_dtype = VarBinArray::try_new(
canonical_varbin.offsets(),
canonical_varbin.bytes(),
self.dtype().clone(),
canonical_varbin.validity(),
)?;

Ok(Canonical::VarBin(forced_dtype))
}
}
90 changes: 90 additions & 0 deletions encodings/fsst/src/compute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use vortex::compute::unary::{scalar_at, ScalarAtFn};
use vortex::compute::{filter, slice, take, ArrayCompute, FilterFn, SliceFn, TakeFn};
use vortex::validity::ArrayValidity;
use vortex::{Array, ArrayDType, IntoArray};
use vortex_buffer::{Buffer, BufferString};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::{Scalar, ScalarValue};

use crate::FSSTArray;

impl ArrayCompute for FSSTArray {
fn slice(&self) -> Option<&dyn SliceFn> {
Some(self)
}

fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}

fn filter(&self) -> Option<&dyn FilterFn> {
Some(self)
}
}

impl SliceFn for FSSTArray {
fn slice(&self, start: usize, stop: usize) -> VortexResult<Array> {
// Slicing an FSST array leaves the symbol table unmodified,
// only slicing the `codes` array.
Ok(Self::try_new(
self.dtype().clone(),
self.symbols(),
slice(&self.codes(), start, stop)?,
)?
.into_array())
}
}

impl TakeFn for FSSTArray {
// Take on an FSSTArray is a simple take on the codes array.
fn take(&self, indices: &Array) -> VortexResult<Array> {
let new_codes = take(&self.codes(), indices)?;

Ok(Self::try_new(self.dtype().clone(), self.symbols(), new_codes)?.into_array())
}
}

impl ScalarAtFn for FSSTArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
// Check validity and short-circuit to null
if !self.is_valid(index) {
return Ok(Scalar::null(self.dtype().clone()));
}

let compressed = scalar_at(&self.codes(), index)?;
let binary_datum = match compressed.value().as_buffer()? {
Some(b) => b,
None => vortex_bail!("non-nullable scalar must unwrap"),
};

let decompressor = self.decompressor()?;
let decoded_buffer: Buffer = decompressor.decompress(binary_datum.as_slice()).into();

if matches!(self.dtype(), &DType::Utf8(_)) {
// SAFETY: a UTF-8 FSSTArray can only be compressed from a known-good UTF-8 array, no need to revalidate.
let buffer_string = unsafe { BufferString::new_unchecked(decoded_buffer) };
Ok(Scalar::new(
self.dtype().clone(),
ScalarValue::BufferString(buffer_string),
))
} else {
Ok(Scalar::new(
self.dtype().clone(),
ScalarValue::Buffer(decoded_buffer),
))
}
}
}

impl FilterFn for FSSTArray {
// Filtering an FSSTArray filters the codes array, leaving the symbols array untouched
fn filter(&self, predicate: &Array) -> VortexResult<Array> {
let filtered_codes = filter(&self.codes(), predicate)?;
Ok(Self::try_new(self.dtype().clone(), self.symbols(), filtered_codes)?.into_array())
}
}
15 changes: 15 additions & 0 deletions encodings/fsst/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! An array that uses the [Fast Static Symbol Table][fsst] compression scheme
//! to compress string arrays.
//!
//! FSST arrays can generally compress string data up to 2x through the use of
//! string tables. The string table is static for an entire array, and occupies
//! up to 2048 bytes of buffer space. Thus, FSST is only worth reaching for when
//! dealing with larger arrays of potentially hundreds of kilobytes or more.
//!
//! [fsst]: https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf
mod array;
mod canonical;
mod compute;

pub use array::*;
Loading

0 comments on commit 2d49d71

Please sign in to comment.