Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Aug 21, 2024
1 parent 335636a commit fc81829
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 22 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fastlanes = "0.1.5"
flatbuffers = "24.3.25"
flexbuffers = "2.0.0"
fs_extra = "1.3.0"
fsst-rs = "0.2.1"
fsst-rs = "0.2.2"
futures = { version = "0.3.30", default-features = false }
futures-executor = "0.3.30"
futures-util = "0.3.30"
Expand Down
4 changes: 4 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,19 @@ bench = false
[[bench]]
name = "compress_benchmark"
harness = false
test = false

[[bench]]
name = "random_access"
test = false
harness = false

[[bench]]
name = "datafusion_benchmark"
test = false
harness = false

[[bench]]
name = "tpch_benchmark"
test = false
harness = false
21 changes: 12 additions & 9 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ fn random_access_vortex(c: &mut Criterion) {
})
});

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path =
object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap())
.unwrap();
group.sample_size(10).bench_function("R2", |b| {
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path = object_store::path::Path::from_url_path(
taxi_vortex.file_name().unwrap().to_str().unwrap(),
)
.unwrap();

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(&r2_fs, &r2_path, &INDICES)
Expand All @@ -63,18 +65,19 @@ fn random_access_parquet(c: &mut Criterion) {
let mut group = c.benchmark_group("parquet");
group.sample_size(10);

let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let taxi_parquet = taxi_data_parquet();
group.bench_function("tokio local disk", |b| {
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_parquet(&taxi_parquet, &INDICES).await.unwrap()) })
});

let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();
group.bench_function("R2", |b| {
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap());
let r2_parquet_path = object_store::path::Path::from_url_path(
taxi_parquet.file_name().unwrap().to_str().unwrap(),
)
.unwrap();

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_parquet_object_store(r2_fs.clone(), &r2_parquet_path, &INDICES)
Expand Down
2 changes: 2 additions & 0 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use vortex_sampling_compressor::compressors::alp::ALPCompressor;
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
use vortex_sampling_compressor::compressors::dict::DictCompressor;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::compressors::r#for::FoRCompressor;
use vortex_sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor;
use vortex_sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR;
Expand Down Expand Up @@ -72,6 +73,7 @@ lazy_static! {
&DictCompressor,
&BitPackedCompressor,
&FoRCompressor,
&FSSTCompressor,
&DateTimePartsCompressor,
&DEFAULT_RUN_END_COMPRESSOR,
&RoaringBoolCompressor,
Expand Down
7 changes: 4 additions & 3 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl FSSTArray {

// Check: strings must be a Binary array.
if !matches!(codes.dtype(), DType::Binary(_)) {
vortex_bail!(InvalidArgument: "strings array must be DType::Binary type");
vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
}

let symbols_len = symbols.len();
Expand Down Expand Up @@ -95,8 +95,9 @@ impl FSSTArray {
}

impl AcceptArrayVisitor for FSSTArray {
fn accept(&self, _visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
todo!("implement this")
fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("symbols", &self.symbols())?;
visitor.visit_child("codes", &self.codes())
}
}

Expand Down
2 changes: 1 addition & 1 deletion encodings/fsst/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where
}
}

let codes = builder.finish(dtype.clone());
let codes = builder.finish(DType::Binary(dtype.nullability()));
let symbols_vec: Vec<Symbol> = compressor.symbol_table().to_vec();
// SAFETY: Symbol and u64 are same size
let symbols_u64: Vec<u64> = unsafe { std::mem::transmute(symbols_vec) };
Expand Down
14 changes: 8 additions & 6 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;

use vortex::array::{VarBinArray, VarBinViewArray};
use vortex::array::{VarBin, VarBinView};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dict::DictArray;
Expand Down Expand Up @@ -42,21 +42,23 @@ impl EncodingCompressor for FSSTCompressor {
_ctx: SamplingCompressor<'a>,
) -> VortexResult<super::CompressedArray<'a>> {
// TODO(aduffy): use like array to clone the existing symbol table
let fsst_array =
if VarBinArray::try_from(array).is_ok() || VarBinViewArray::try_from(array).is_ok() {
let result_array =
if array.encoding().id() == VarBin::ID || array.encoding().id() == VarBinView::ID {
// For a VarBinArray or VarBinViewArray, compress directly.
fsst_compress(array.clone(), None)
fsst_compress(array.clone(), None).into_array()
} else if let Ok(dict) = DictArray::try_from(array) {
// For a dict array, just compress the values
fsst_compress(dict.values(), None)
let values = fsst_compress(dict.values(), None).into_array();
let codes = dict.codes();
DictArray::try_new(codes, values)?.into_array()
} else {
vortex_bail!(
InvalidArgument: "unsupported encoding for FSSTCompressor {:?}",
array.encoding().id()
)
};

Ok(CompressedArray::new(fsst_array.into_array(), None))
Ok(CompressedArray::new(result_array, None))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
Expand Down
2 changes: 2 additions & 0 deletions vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::fmt::{Debug, Display, Formatter};

use compressors::fsst::FSSTCompressor;
use log::{debug, info, warn};
use vortex::array::{Chunked, ChunkedArray, Constant, Struct, StructArray};
use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy};
Expand Down Expand Up @@ -89,6 +90,7 @@ impl Default for SamplingCompressor<'_> {
// TODO(robert): Implement minimal compute for DeltaArrays - scalar_at and slice
// &DeltaCompressor,
&DictCompressor,
&FSSTCompressor,
&FoRCompressor,
&DateTimePartsCompressor,
&RoaringBoolCompressor,
Expand Down
2 changes: 2 additions & 0 deletions vortex-sampling-compressor/tests/smoketest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use vortex_sampling_compressor::{CompressConfig, SamplingCompressor};
#[cfg(test)]
mod tests {
use vortex_datetime_dtype::TimeUnit;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;

use super::*;

Expand All @@ -37,6 +38,7 @@ mod tests {
// &DeltaCompressor,
&DictCompressor,
&FoRCompressor,
&FSSTCompressor,
&DateTimePartsCompressor,
&RoaringBoolCompressor,
&RoaringIntCompressor,
Expand Down

0 comments on commit fc81829

Please sign in to comment.