Skip to content

Commit

Permalink
fix compress benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jun 3, 2024
1 parent 30ebc17 commit ea36852
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
22 changes: 11 additions & 11 deletions bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@ use std::path::PathBuf;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets::PBI;
use bench_vortex::public_bi_data::PBIDataset;
use bench_vortex::reader::{open_vortex, rewrite_parquet_as_vortex};
use bench_vortex::reader::{open_vortex_async, rewrite_parquet_as_vortex};
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{setup_logger, IdempotentPath};
use futures::executor::block_on;
use log::{info, LevelFilter};
use tokio::fs::File;
use vortex_error::VortexResult;

pub fn main() {
#[tokio::main]
pub async fn main() {
setup_logger(LevelFilter::Info);
// compress_pbi(PBIDataset::Medicare1);
compress_taxi();
compress_taxi().await.unwrap();
}

fn compress_taxi() {
async fn compress_taxi() -> VortexResult<()> {
let path: PathBuf = "taxi_data.vortex".to_data_path();
block_on(async {
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await
})
.unwrap();
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?;

let taxi_vortex = open_vortex(&path).unwrap();
let taxi_vortex = open_vortex_async(&path).await?;
info!("{}", taxi_vortex.tree_display());

let pq_size = taxi_data_parquet().metadata().unwrap().size();
let vx_size = taxi_vortex.nbytes();

info!("Parquet size: {}, Vortex size: {}", pq_size, vx_size);
info!("Compression ratio: {}", vx_size as f32 / pq_size as f32);

Ok(())
}

#[allow(dead_code)]
Expand Down
11 changes: 11 additions & 0 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ pub fn open_vortex(path: &Path) -> VortexResult<Array> {
.map(|a| a.into_array())
}

pub async fn open_vortex_async(path: &Path) -> VortexResult<Array> {
let file = tokio::fs::File::open(path).await.unwrap();
let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap();
msgs.array_stream_from_messages(&CTX)
.await
.unwrap()
.collect_chunked()
.await
.map(|a| a.into_array())
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
parquet_path: PathBuf,
write: W,
Expand Down
3 changes: 2 additions & 1 deletion vortex-fastlanes/src/for/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex::stats::{ArrayStatistics, Stat};
use vortex::{Array, ArrayDType, ArrayTrait, IntoArray};
use vortex_dtype::{match_each_integer_ptype, NativePType, PType};
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::{FoRArray, FoREncoding};

Expand Down Expand Up @@ -53,7 +54,7 @@ impl EncodingCompression for FoREncoding {

let child = match_each_integer_ptype!(parray.ptype(), |$T| {
if shift == <$T>::PTYPE.bit_width() as u8 {
ConstantArray::new($T::default(), parray.len()).into_array()
ConstantArray::new(Scalar::zero::<$T>(parray.dtype().nullability()), parray.len()).into_array()
} else {
compress_primitive::<$T>(parray, shift, $T::try_from(&min)?).into_array()
}
Expand Down

0 comments on commit ea36852

Please sign in to comment.