Skip to content

Commit

Permalink
Add lance support to BenchmarkDataset, migrate medicare dataset to us…
Browse files Browse the repository at this point in the history
…e BenchmarkDataset (#225)

Finish centralizing all benchmarking functionality for PBI data in the
BenchmarkData abstraction. Remove old special-case code.
  • Loading branch information
jdcasale authored Apr 11, 2024
1 parent e0bc57e commit 705c3af
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 184 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ csv = "1.3.0"
arrow-csv = "51.0.0"
lazy_static = "1.4.0"
enum-iterator = "2.0.0"
fs_extra = "1.3.0"

[workspace.lints.rust]
warnings = "deny"
Expand Down
2 changes: 2 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ arrow-csv = { workspace = true }
arrow = { workspace = true }
lazy_static = { workspace = true }
enum-iterator = { workspace = true }
fs_extra = { workspace = true }
humansize = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
Expand Down
15 changes: 9 additions & 6 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use bench_vortex::medicare_data::medicare_data_csv;
use bench_vortex::compress_taxi_data;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets;
use bench_vortex::public_bi_data::PBIDataset::Medicare1;
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{compress_medicare_data, compress_taxi_data};
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn vortex_compress_taxi(c: &mut Criterion) {
Expand All @@ -11,15 +13,16 @@ fn vortex_compress_taxi(c: &mut Criterion) {
group.finish()
}

fn vortex_compress_medicare(c: &mut Criterion) {
medicare_data_csv();
fn vortex_compress_medicare1(c: &mut Criterion) {
let dataset = BenchmarkDatasets::PBI(Medicare1);
dataset.as_uncompressed();
let mut group = c.benchmark_group("end to end");
group.sample_size(10);
group.bench_function("compress", |b| {
b.iter(|| black_box(compress_medicare_data()))
b.iter(|| black_box(dataset.compress_to_vortex()))
});
group.finish()
}

criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare);
criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare1);
criterion_main!(benches);
18 changes: 13 additions & 5 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use bench_vortex::medicare_data::medicare_data_lance;
use bench_vortex::data_downloads::{BenchmarkDataset, FileType};
use bench_vortex::public_bi_data::BenchmarkDatasets;
use bench_vortex::public_bi_data::PBIDataset::Medicare1;
use bench_vortex::reader::{take_lance, take_parquet, take_vortex};
use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
Expand All @@ -20,13 +22,19 @@ fn random_access(c: &mut Criterion) {
});

let taxi_lance = taxi_data_lance();
group.bench_function("lance", |b| {
group.bench_function("taxi_lance", |b| {
b.iter(|| black_box(take_lance(&taxi_lance, &indices)))
});

let medicare_lance = medicare_data_lance();
group.bench_function("lance", |b| {
b.iter(|| black_box(take_lance(&medicare_lance, &indices)))
let dataset = BenchmarkDatasets::PBI(Medicare1);
dataset.write_as_lance();
// NB: our parquet benchmarks read from a single file, and we (currently) write each
// file to an individual lance dataset for comparison parity.
// TODO(@jcasale): use datafusion for parquet random-access benchmarks and modify
// lance-writing code to write a single dataset.
let medicare_lance = dataset.list_files(FileType::Lance);
group.bench_function("medic_lance2", |b| {
b.iter(|| black_box(take_lance(medicare_lance.first().unwrap(), &indices)))
});
}

Expand Down
3 changes: 2 additions & 1 deletion bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ fn compress_taxi() {

fn compress_pbi(which_pbi: PBIDataset) {
let dataset = PBI(which_pbi);
dataset.uncompressed();
dataset.as_uncompressed();
dataset.write_as_vortex();
dataset.write_as_parquet();
dataset.write_as_lance();
}
23 changes: 17 additions & 6 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::array::IntoArray;
use vortex::array::{ArrayRef, IntoArray};
use vortex::arrow::FromArrowType;
use vortex::serde::WriteCtx;
use vortex_error::VortexError;
use vortex_error::{VortexError, VortexResult};
use vortex_schema::DType;

use crate::idempotent;
Expand All @@ -30,8 +30,9 @@ pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf {
.unwrap()
}

pub fn parquet_to_lance(lance_fname: &Path, read: File) -> PathBuf {
pub fn parquet_to_lance(lance_fname: &Path, parquet_file: &Path) -> VortexResult<PathBuf> {
let write_params = WriteParams::default();
let read = File::open(parquet_file).unwrap();
let reader = LanceParquetRecordBatchReaderBuilder::try_new(read)
.unwrap()
.build()
Expand All @@ -45,7 +46,7 @@ pub fn parquet_to_lance(lance_fname: &Path, read: File) -> PathBuf {
Some(write_params),
))
.unwrap();
PathBuf::from(lance_fname)
Ok(PathBuf::from(lance_fname))
}

pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> PathBuf {
Expand Down Expand Up @@ -93,9 +94,19 @@ pub fn decompress_bz2(input_path: PathBuf, output_path: PathBuf) -> PathBuf {
}

pub trait BenchmarkDataset {
fn uncompressed(&self);
fn as_uncompressed(&self);
fn compress_to_vortex(&self) -> Vec<ArrayRef>;
fn write_as_parquet(&self);
fn write_as_vortex(&self);
fn list_files(&self) -> Vec<PathBuf>;
fn write_as_lance(&self);
fn list_files(&self, file_type: FileType) -> Vec<PathBuf>;
fn directory_location(&self) -> PathBuf;
}

#[derive(Clone, Copy)]
pub enum FileType {
Csv,
Parquet,
Vortex,
Lance,
}
36 changes: 3 additions & 33 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::fs::{create_dir_all, File};
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow_array::RecordBatchReader;
use arrow_csv::reader::Format;
use humansize::DECIMAL;
use itertools::Itertools;
use log::{info, warn, LevelFilter};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand All @@ -27,12 +26,10 @@ use vortex_ree::REEEncoding;
use vortex_roaring::RoaringBoolEncoding;
use vortex_schema::DType;

use crate::medicare_data::medicare_data_csv;
use crate::reader::BATCH_SIZE;
use crate::taxi_data::taxi_data_parquet;

pub mod data_downloads;
pub mod medicare_data;
pub mod public_bi_data;
pub mod reader;
pub mod taxi_data;
Expand Down Expand Up @@ -141,34 +138,6 @@ pub fn compress_taxi_data() -> ArrayRef {
chunks_to_array(schema, uncompressed_size, chunks)
}

pub fn compress_medicare_data() -> ArrayRef {
let csv_file = File::open(medicare_data_csv()).unwrap();
let reader = BufReader::new(csv_file.try_clone().unwrap());

let (schema, _) = Format::default()
.with_delimiter(u8::try_from('|').unwrap())
.infer_schema(&mut csv_file.try_clone().unwrap(), None)
.unwrap();

let csv_reader = arrow::csv::ReaderBuilder::new(Arc::new(schema.clone()))
.with_batch_size(BATCH_SIZE * 10)
.build(reader)
.unwrap();

let ctx = compress_ctx();
let mut uncompressed_size: usize = 0;
let chunks = csv_reader
.into_iter()
.map(|batch_result| batch_result.unwrap())
.map(|batch| batch.into_array())
.map(|array| {
uncompressed_size += array.nbytes();
ctx.clone().compress(&array, None).unwrap()
})
.collect_vec();
chunks_to_array(SchemaRef::new(schema), uncompressed_size, chunks)
}

fn chunks_to_array(schema: SchemaRef, uncompressed_size: usize, chunks: Vec<ArrayRef>) -> ArrayRef {
let dtype = DType::from_arrow(schema.clone());
let compressed = ChunkedArray::new(chunks.clone(), dtype).into_array();
Expand All @@ -186,7 +155,8 @@ fn chunks_to_array(schema: SchemaRef, uncompressed_size: usize, chunks: Vec<Arra
println!("{},{}", schema.field(i).name(), nbytes);
});
println!(
"NBytes {}, Ratio {}",
"{}, Bytes: {}, Ratio {}",
humansize::format_size(compressed.nbytes(), DECIMAL),
compressed.nbytes(),
compressed.nbytes() as f32 / uncompressed_size as f32
);
Expand Down
90 changes: 0 additions & 90 deletions bench-vortex/src/medicare_data.rs

This file was deleted.

Loading

0 comments on commit 705c3af

Please sign in to comment.