Skip to content

Commit

Permalink
Add PBI medicare dataset & vortex/parquet comparison (#187)
Browse files Browse the repository at this point in the history
- Adds one of the medicare datasets* from the ALP paper to our compress
and random_access benchmarks
- Adds mechanism for handling csv data
- Adds mechanism for handling bzipped data
- Refactors to enable translating general parquet to lance 
- General refactors to make data download/translation ops more modular



*https://github.com/cwida/public_bi_benchmark/tree/master/benchmark/Medicare1
-- contains two bzipped files, this downloads just the first
  • Loading branch information
jdcasale authored Apr 8, 2024
1 parent f36417e commit a9795b0
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 48 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ thiserror = "1.0.58"
uninit = "0.6.2"
walkdir = "2.5.0"
zigzag = "0.1.0"
bzip2 = "0.4.4"
csv = "1.3.0"
arrow-csv = "51.0.0"

[workspace.lints.rust]
warnings = "deny"
Expand Down
7 changes: 6 additions & 1 deletion bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ parquet = { workspace = true, features = [] }
reqwest = { workspace = true }
simplelog = { workspace = true }
tokio = "1.0.1"
bzip2 = { workspace = true }
csv = { workspace = true }
arrow-csv = { workspace = true }
arrow = {workspace = true }

[dev-dependencies]
criterion = { workspace = true }


[[bench]]
name = "compress_benchmark"
harness = false

[[bench]]
name = "random_access"
harness = false
harness = false
17 changes: 14 additions & 3 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
use bench_vortex::compress_taxi_data;
use bench_vortex::medicare_data::medicare_data_csv;
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{compress_medicare_data, compress_taxi_data};
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn vortex_compress(c: &mut Criterion) {
fn vortex_compress_taxi(c: &mut Criterion) {
taxi_data_parquet();
let mut group = c.benchmark_group("end to end");
group.sample_size(10);
group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data())));
group.finish()
}

criterion_group!(benches, vortex_compress);
fn vortex_compress_medicare(c: &mut Criterion) {
medicare_data_csv();
let mut group = c.benchmark_group("end to end");
group.sample_size(10);
group.bench_function("compress", |b| {
b.iter(|| black_box(compress_medicare_data()))
});
group.finish()
}

criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare);
criterion_main!(benches);
6 changes: 6 additions & 0 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bench_vortex::medicare_data::medicare_data_lance;
use bench_vortex::reader::{take_lance, take_parquet, take_vortex};
use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
Expand All @@ -22,6 +23,11 @@ fn random_access(c: &mut Criterion) {
group.bench_function("lance", |b| {
b.iter(|| black_box(take_lance(&taxi_lance, &indices)))
});

let medicare_lance = medicare_data_lance();
group.bench_function("lance", |b| {
b.iter(|| black_box(take_lance(&medicare_lance, &indices)))
});
}

criterion_group!(benches, random_access);
Expand Down
48 changes: 44 additions & 4 deletions bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,29 @@ use std::fs::File;
use std::os::unix::prelude::MetadataExt;
use std::path::PathBuf;

use bench_vortex::reader::{compress_parquet_to_vortex, open_vortex};
use bench_vortex::setup_logger;
use bench_vortex::medicare_data::medicare_data_csv;
use bench_vortex::reader::{
compress_csv_to_vortex, default_csv_format, open_vortex, rewrite_parquet_as_vortex,
write_csv_as_parquet,
};
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{data_path, setup_logger};
use log::LevelFilter;
use vortex::array::Array;
use vortex::formatter::display_tree;

pub fn main() {
setup_logger(LevelFilter::Debug);
compress_taxi();
compress_medicare();
write_medicare_as_parquet();
}

let path: PathBuf = "taxi_data.vortex".into();
fn compress_taxi() {
let path: PathBuf = data_path("taxi_data.vortex");
{
let mut write = File::create(&path).unwrap();
compress_parquet_to_vortex(&taxi_data_parquet(), &mut write).unwrap();
rewrite_parquet_as_vortex(taxi_data_parquet(), &mut write).unwrap();
}

let taxi_vortex = open_vortex(&path).unwrap();
Expand All @@ -27,3 +36,34 @@ pub fn main() {
println!("Parquet size: {}, Vortex size: {}", pq_size, vx_size);
println!("Compression ratio: {}", vx_size as f32 / pq_size as f32);
}

fn compress_medicare() {
let path: PathBuf = data_path("medicare.vortex");
{
let mut write = File::create(&path).unwrap();
let delimiter = u8::try_from('|').unwrap();
compress_csv_to_vortex(
medicare_data_csv(),
default_csv_format().with_delimiter(delimiter),
&mut write,
)
.unwrap();
}

let medicare_vortex = open_vortex(&path).unwrap();

let pq_size = medicare_data_csv().metadata().unwrap().size();
let vx_size = medicare_vortex.nbytes();

println!("{}\n\n", display_tree(medicare_vortex.as_ref()));
println!("Csv size: {}, Vortex size: {}", pq_size, vx_size);
println!("Compression ratio: {}", vx_size as f32 / pq_size as f32);
}

pub fn write_medicare_as_parquet() {
let path = data_path("medicare.parquet");
let delimiter = u8::try_from('|').unwrap();
let format = default_csv_format().with_delimiter(delimiter);
let file = File::create(path).unwrap();
write_csv_as_parquet(medicare_data_csv(), format, file).unwrap();
}
28 changes: 20 additions & 8 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::fs::File;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};

use arrow_array::RecordBatchReader;
use bzip2::read::BzDecoder;
use itertools::Itertools;
use lance::dataset::WriteParams;
use lance::Dataset;
Expand All @@ -12,10 +14,11 @@ use vortex::array::chunked::ChunkedArray;
use vortex::array::IntoArray;
use vortex::arrow::FromArrowType;
use vortex::serde::WriteCtx;
use vortex_error::VortexError;
use vortex_schema::DType;

use crate::idempotent;
use crate::reader::{compress_parquet_to_vortex, BATCH_SIZE};
use crate::reader::BATCH_SIZE;
use crate::{data_path, idempotent};

pub fn download_data(fname: &str, data_url: &str) -> PathBuf {
idempotent(fname, |path| {
Expand Down Expand Up @@ -44,12 +47,6 @@ pub fn parquet_to_lance(lance_fname: &Path, read: File) -> PathBuf {
PathBuf::from(lance_fname)
}

pub fn parquet_to_vortex(output_fname: &Path, data_to_compress: PathBuf) -> PathBuf {
let mut write = File::create(output_fname).unwrap();
compress_parquet_to_vortex(&data_to_compress, &mut write).unwrap();
output_fname.to_path_buf()
}

pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> PathBuf {
idempotent(fname_out, |path| {
let taxi_pq = File::open(downloaded_data).unwrap();
Expand All @@ -73,3 +70,18 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
})
.unwrap()
}

pub fn decompress_bz2(input_path: &str, output_path: &str) -> PathBuf {
idempotent(output_path, |path| {
let input_file = File::open(data_path(input_path)).unwrap();
let mut decoder = BzDecoder::new(input_file);

let mut buffer = Vec::new();
decoder.read_to_end(&mut buffer).unwrap();

let mut output_file = File::create(path).unwrap();
output_file.write_all(&buffer).unwrap();
Ok::<PathBuf, VortexError>(data_path(output_path))
})
.unwrap()
}
44 changes: 40 additions & 4 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::fs::{create_dir_all, File};
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow_array::RecordBatchReader;
use arrow_csv::reader::Format;
use itertools::Itertools;
use log::{info, warn, LevelFilter};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand All @@ -24,9 +27,12 @@ use vortex_ree::REEEncoding;
use vortex_roaring::RoaringBoolEncoding;
use vortex_schema::DType;

use crate::medicare_data::medicare_data_csv;
use crate::reader::BATCH_SIZE;
use crate::taxi_data::taxi_data_parquet;

mod data_downloads;
pub mod medicare_data;
pub mod reader;
pub mod taxi_data;

Expand Down Expand Up @@ -92,19 +98,17 @@ pub fn compress_taxi_data() -> ArrayRef {
let reader = builder
.with_projection(_mask)
//.with_projection(no_datetime_mask)
.with_batch_size(65_536)
.with_batch_size(BATCH_SIZE)
// .with_batch_size(5_000_000)
// .with_limit(100_000)
.build()
.unwrap();

let ctx = compress_ctx();
let schema = reader.schema();
let mut uncompressed_size = 0;
let mut uncompressed_size: usize = 0;
let chunks = reader
.into_iter()
//.skip(39)
//.take(1)
.map(|batch_result| batch_result.unwrap())
.map(|batch| batch.into_array())
.map(|array| {
Expand All @@ -113,6 +117,38 @@ pub fn compress_taxi_data() -> ArrayRef {
})
.collect_vec();

chunks_to_array(schema, uncompressed_size, chunks)
}

pub fn compress_medicare_data() -> ArrayRef {
let csv_file = File::open(medicare_data_csv()).unwrap();
let reader = BufReader::new(csv_file.try_clone().unwrap());

let (schema, _) = Format::default()
.with_delimiter(u8::try_from('|').unwrap())
.infer_schema(&mut csv_file.try_clone().unwrap(), None)
.unwrap();

let csv_reader = arrow::csv::ReaderBuilder::new(Arc::new(schema.clone()))
.with_batch_size(BATCH_SIZE * 10)
.build(reader)
.unwrap();

let ctx = compress_ctx();
let mut uncompressed_size: usize = 0;
let chunks = csv_reader
.into_iter()
.map(|batch_result| batch_result.unwrap())
.map(|batch| batch.into_array())
.map(|array| {
uncompressed_size += array.nbytes();
ctx.clone().compress(&array, None).unwrap()
})
.collect_vec();
chunks_to_array(SchemaRef::new(schema), uncompressed_size, chunks)
}

fn chunks_to_array(schema: SchemaRef, uncompressed_size: usize, chunks: Vec<ArrayRef>) -> ArrayRef {
let dtype = DType::from_arrow(schema.clone());
let compressed = ChunkedArray::new(chunks.clone(), dtype).into_array();

Expand Down
87 changes: 87 additions & 0 deletions bench-vortex/src/medicare_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow_csv::reader::Format;
use itertools::Itertools;
use vortex::array::chunked::ChunkedArray;
use vortex::array::IntoArray;
use vortex::arrow::FromArrowType;
use vortex::serde::WriteCtx;
use vortex_error::VortexError;
use vortex_schema::DType;

use crate::data_downloads::{decompress_bz2, download_data, parquet_to_lance};
use crate::idempotent;
use crate::reader::{compress_csv_to_vortex, default_csv_format, write_csv_as_parquet};

pub fn medicare_data_csv() -> PathBuf {
let fname = "Medicare1_1.csv.bz2";
download_data(
fname,
"http://www.cwi.nl/~boncz/PublicBIbenchmark/Medicare1/Medicare1_1.csv.bz2",
);
decompress_bz2(fname, "Medicare1_1.csv")
}

pub fn medicare_data_lance() -> PathBuf {
let taxi_data = File::open(medicare_data_parquet()).unwrap();
idempotent("medicare.lance", |path| {
Ok::<PathBuf, VortexError>(parquet_to_lance(path, taxi_data))
})
.unwrap()
}

pub fn medicare_data_vortex_uncompressed() -> PathBuf {
idempotent("medicare-uncompressed.vortex", |path| {
let csv_file = File::open(medicare_data_csv()).unwrap();
let reader = BufReader::new(csv_file.try_clone().unwrap());

let (schema, _) = Format::default()
.infer_schema(&mut csv_file.try_clone().unwrap(), None)
.unwrap();

let csv_reader = arrow::csv::ReaderBuilder::new(Arc::new(schema.clone()))
.with_batch_size(crate::reader::BATCH_SIZE)
.build(reader)?;

let dtype = DType::from_arrow(SchemaRef::new(schema.clone()));

let chunks = csv_reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| record_batch.into_array())
.collect_vec();
let chunked = ChunkedArray::new(chunks, dtype.clone());

let mut write = File::create(path).unwrap();
let mut write_ctx = WriteCtx::new(&mut write);
write_ctx.dtype(&dtype)?;
write_ctx.write(&chunked)
})
.unwrap()
}

pub fn medicare_data_vortex() -> PathBuf {
idempotent("medicare.vortex", |path| {
let mut write = File::create(path).unwrap();
let delimiter = u8::try_from('|').unwrap();
compress_csv_to_vortex(
medicare_data_csv(),
default_csv_format().with_delimiter(delimiter),
&mut write,
)
})
.unwrap()
}

pub fn medicare_data_parquet() -> PathBuf {
idempotent("medicare.parquet", |path| {
let delimiter = u8::try_from('|').unwrap();
let format = default_csv_format().with_delimiter(delimiter);
let file = File::create(path).unwrap();
write_csv_as_parquet(medicare_data_csv(), format, file)
})
.unwrap()
}
Loading

0 comments on commit a9795b0

Please sign in to comment.