diff --git a/Cargo.lock b/Cargo.lock index 5877257d39..62f8d4948a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1551,9 +1551,9 @@ dependencies = [ [[package]] name = "fsst-rs" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8514d28c95e62149879bc847bacd5129d24a2ccb4401662ed5773488c0f31eb7" +checksum = "3494f027ec5d00d38de5e8a14672fb30640fa62cf1d37863e1db738fe3e61eaa" [[package]] name = "futures" diff --git a/Cargo.toml b/Cargo.toml index 46d5d83fa2..2061e40b0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ fastlanes = "0.1.5" flatbuffers = "24.3.25" flexbuffers = "2.0.0" fs_extra = "1.3.0" -fsst-rs = "0.2.1" +fsst-rs = "0.2.2" futures = { version = "0.3.30", default-features = false } futures-executor = "0.3.30" futures-util = "0.3.30" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index dad389f850..51d0d91f9a 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -77,15 +77,19 @@ bench = false [[bench]] name = "compress_benchmark" harness = false +test = false [[bench]] name = "random_access" +test = false harness = false [[bench]] name = "datafusion_benchmark" +test = false harness = false [[bench]] name = "tpch_benchmark" +test = false harness = false diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index 3d69ad9c2c..6d00240865 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -44,11 +44,13 @@ fn random_access_vortex(c: &mut Criterion) { }) }); - let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc; - let r2_path = - object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap()) - .unwrap(); group.sample_size(10).bench_function("R2", |b| { + let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc; + let r2_path = object_store::path::Path::from_url_path( + taxi_vortex.file_name().unwrap().to_str().unwrap(), + ) + .unwrap(); + b.to_async(Runtime::new().unwrap()).iter(|| async { black_box( take_vortex_object_store(&r2_fs, &r2_path, &INDICES) @@ -63,18 +65,19 @@ fn random_access_parquet(c: &mut Criterion) { let mut group = c.benchmark_group("parquet"); group.sample_size(10); - let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()); let taxi_parquet = taxi_data_parquet(); group.bench_function("tokio local disk", |b| { b.to_async(Runtime::new().unwrap()) .iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) }) }); - let r2_parquet_path = object_store::path::Path::from_url_path( - taxi_parquet.file_name().unwrap().to_str().unwrap(), - ) - .unwrap(); group.bench_function("R2", |b| { + let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()); + let r2_parquet_path = object_store::path::Path::from_url_path( + taxi_parquet.file_name().unwrap().to_str().unwrap(), + ) + .unwrap(); + b.to_async(Runtime::new().unwrap()).iter(|| async { black_box( take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES) diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 216ef859b1..dea92a1d68 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -31,6 +31,7 @@ use vortex_sampling_compressor::compressors::alp::ALPCompressor; use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor; use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor; use vortex_sampling_compressor::compressors::dict::DictCompressor; +use vortex_sampling_compressor::compressors::fsst::FSSTCompressor; use vortex_sampling_compressor::compressors::r#for::FoRCompressor; use vortex_sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor; use vortex_sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR; @@ -72,6 +73,7 @@ lazy_static! { &DictCompressor, &BitPackedCompressor, &FoRCompressor, + &FSSTCompressor, &DateTimePartsCompressor, &DEFAULT_RUN_END_COMPRESSOR, &RoaringBoolCompressor, diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index 4a94e66ef4..2972f22f29 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -42,7 +42,7 @@ impl FSSTArray { // Check: strings must be a Binary array. if !matches!(codes.dtype(), DType::Binary(_)) { - vortex_bail!(InvalidArgument: "strings array must be DType::Binary type"); + vortex_bail!(InvalidArgument: "codes array must be DType::Binary type"); } let symbols_len = symbols.len(); @@ -95,8 +95,9 @@ impl FSSTArray { } impl AcceptArrayVisitor for FSSTArray { - fn accept(&self, _visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> { - todo!("implement this") + fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> { + visitor.visit_child("symbols", &self.symbols())?; + visitor.visit_child("codes", &self.codes()) } } diff --git a/encodings/fsst/src/compress.rs b/encodings/fsst/src/compress.rs index 5a297d421a..63d61299d1 100644 --- a/encodings/fsst/src/compress.rs +++ b/encodings/fsst/src/compress.rs @@ -83,7 +83,7 @@ where } } - let codes = builder.finish(dtype.clone()); + let codes = builder.finish(DType::Binary(dtype.nullability())); let symbols_vec: Vec = compressor.symbol_table().to_vec(); // SAFETY: Symbol and u64 are same size let symbols_u64: Vec = unsafe { std::mem::transmute(symbols_vec) }; diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index 776bc93e42..0e7708830a 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use vortex::array::{VarBinArray, VarBinViewArray}; +use vortex::array::{VarBin, VarBinView}; use vortex::encoding::EncodingRef; use vortex::{ArrayDType, ArrayDef, IntoArray}; use vortex_dict::DictArray; @@ -42,13 +42,15 @@ impl EncodingCompressor for FSSTCompressor { _ctx: SamplingCompressor<'a>, ) -> VortexResult> { // TODO(aduffy): use like array to clone the existing symbol table - let fsst_array = - if VarBinArray::try_from(array).is_ok() || VarBinViewArray::try_from(array).is_ok() { + let result_array = + if array.encoding().id() == VarBin::ID || array.encoding().id() == VarBinView::ID { // For a VarBinArray or VarBinViewArray, compress directly. - fsst_compress(array.clone(), None) + fsst_compress(array.clone(), None).into_array() } else if let Ok(dict) = DictArray::try_from(array) { // For a dict array, just compress the values - fsst_compress(dict.values(), None) + let values = fsst_compress(dict.values(), None).into_array(); + let codes = dict.codes(); + DictArray::try_new(codes, values)?.into_array() } else { vortex_bail!( InvalidArgument: "unsupported encoding for FSSTCompressor {:?}", @@ -56,7 +58,7 @@ impl EncodingCompressor for FSSTCompressor { ) }; - Ok(CompressedArray::new(fsst_array.into_array(), None)) + Ok(CompressedArray::new(result_array, None)) } fn used_encodings(&self) -> HashSet { diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index f96323a245..3395c83b2f 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; +use compressors::fsst::FSSTCompressor; use log::{debug, info, warn}; use vortex::array::{Chunked, ChunkedArray, Constant, Struct, StructArray}; use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy}; @@ -89,6 +90,7 @@ impl Default for SamplingCompressor<'_> { // TODO(robert): Implement minimal compute for DeltaArrays - scalar_at and slice // &DeltaCompressor, &DictCompressor, + &FSSTCompressor, &FoRCompressor, &DateTimePartsCompressor, &RoaringBoolCompressor, diff --git a/vortex-sampling-compressor/tests/smoketest.rs b/vortex-sampling-compressor/tests/smoketest.rs index f2dda93b45..747dc959cb 100644 --- a/vortex-sampling-compressor/tests/smoketest.rs +++ b/vortex-sampling-compressor/tests/smoketest.rs @@ -23,6 +23,7 @@ use vortex_sampling_compressor::{CompressConfig, SamplingCompressor}; #[cfg(test)] mod tests { use vortex_datetime_dtype::TimeUnit; + use vortex_sampling_compressor::compressors::fsst::FSSTCompressor; use super::*; @@ -37,6 +38,7 @@ mod tests { // &DeltaCompressor, &DictCompressor, &FoRCompressor, + &FSSTCompressor, &DateTimePartsCompressor, &RoaringBoolCompressor, &RoaringIntCompressor,