Skip to content

Commit

Permalink
Benchmark suite robustness improvements (#242)
Browse files Browse the repository at this point in the history
- Panic on non-success download responses
- Idempotent writes to a temp file and then renames to the desired
output on success
  • Loading branch information
jdcasale authored Apr 15, 2024
1 parent 11614fc commit 66ad127
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ bzip2 = "0.4.4"
csv = "1.3.0"
arrow-csv = "51.0.0"
lazy_static = "1.4.0"
uuid = "1.8.0"

[workspace.lints.rust]
warnings = "deny"
Expand Down
1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ csv = { workspace = true }
arrow-csv = { workspace = true }
arrow = { workspace = true }
humansize = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn main() {
}

fn compress_taxi() {
let path: PathBuf = "taxi_data.vortex".to_idempotent_path();
let path: PathBuf = "taxi_data.vortex".to_data_path();
{
let mut write = File::create(&path).unwrap();
rewrite_parquet_as_vortex(taxi_data_parquet(), &mut write).unwrap();
Expand Down
6 changes: 5 additions & 1 deletion bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf {
idempotent(&fname, |path| {
info!("Downloading {} from {}", fname.to_str().unwrap(), data_url);
let mut file = File::create(path).unwrap();
reqwest::blocking::get(data_url).unwrap().copy_to(&mut file)
let mut response = reqwest::blocking::get(data_url).unwrap();
if !response.status().is_success() {
panic!("Failed to download data from {}", data_url);
}
response.copy_to(&mut file)
})
.unwrap()
}
Expand Down
35 changes: 28 additions & 7 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::env::temp_dir;
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -40,19 +41,23 @@ pub fn idempotent<T, E, P: IdempotentPath + ?Sized>(
path: &P,
f: impl FnOnce(&Path) -> Result<T, E>,
) -> Result<PathBuf, E> {
let path = path.to_idempotent_path();
if !path.exists() {
f(path.as_path())?;
let data_path = path.to_data_path();
if !data_path.exists() {
let temp_location = path.to_temp_path();
let temp_path = temp_location.as_path();
f(temp_path)?;
std::fs::rename(temp_path, &data_path).unwrap();
}
Ok(path)
Ok(data_path)
}

pub trait IdempotentPath {
fn to_idempotent_path(&self) -> PathBuf;
fn to_data_path(&self) -> PathBuf;
fn to_temp_path(&self) -> PathBuf;
}

impl IdempotentPath for str {
fn to_idempotent_path(&self) -> PathBuf {
fn to_data_path(&self) -> PathBuf {
let path = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("data")
.join(self);
Expand All @@ -61,15 +66,31 @@ impl IdempotentPath for str {
}
path
}

fn to_temp_path(&self) -> PathBuf {
let temp_dir = temp_dir().join(uuid::Uuid::new_v4().to_string());
if !temp_dir.exists() {
create_dir_all(temp_dir.clone()).unwrap();
}
temp_dir.join(self)
}
}

impl IdempotentPath for PathBuf {
fn to_idempotent_path(&self) -> PathBuf {
fn to_data_path(&self) -> PathBuf {
if !self.parent().unwrap().exists() {
create_dir_all(self.parent().unwrap()).unwrap();
}
self.to_path_buf()
}

fn to_temp_path(&self) -> PathBuf {
let temp_dir = std::env::temp_dir().join(uuid::Uuid::new_v4().to_string());
if !temp_dir.exists() {
create_dir_all(temp_dir.clone()).unwrap();
}
temp_dir.join(self.file_name().unwrap())
}
}

pub fn setup_logger(level: LevelFilter) {
Expand Down
12 changes: 5 additions & 7 deletions bench-vortex/src/public_bi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl PBIDataset {
};

"PBI"
.to_idempotent_path()
.to_data_path()
.join(self.dataset_name())
.join(extension)
.join(url.file_name.strip_suffix(".csv.bz2").unwrap())
Expand All @@ -323,7 +323,7 @@ impl PBIDataset {

fn get_bzip_path(&self, url: &PBIUrl) -> PathBuf {
"PBI"
.to_idempotent_path()
.to_data_path()
.join(self.dataset_name())
.join("bzip2")
.join(url.file_name)
Expand Down Expand Up @@ -352,9 +352,9 @@ impl PBIUrl {
}
}
fn to_url_string(&self) -> Url {
Url::parse("https://homepages.cwi.nl/~boncz/PublicBIbenchmark")
Url::parse("https://homepages.cwi.nl/~boncz/PublicBIbenchmark/")
.unwrap()
.join(self.dataset_name)
.join(format!("{}/", self.dataset_name).as_str())
.unwrap()
.join(self.file_name)
.unwrap()
Expand Down Expand Up @@ -536,9 +536,7 @@ impl BenchmarkDataset for BenchmarkDatasets {

fn directory_location(&self) -> PathBuf {
match self {
BenchmarkDatasets::PBI(dataset) => {
"PBI".to_idempotent_path().join(dataset.dataset_name())
}
BenchmarkDatasets::PBI(dataset) => "PBI".to_data_path().join(dataset.dataset_name()),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::reader::rewrite_parquet_as_vortex;
use crate::{idempotent, IdempotentPath};

fn download_taxi_data() -> PathBuf {
let taxi_parquet_fpath = "yellow-tripdata-2023-11.parquet".to_idempotent_path();
let taxi_parquet_fpath = "yellow-tripdata-2023-11.parquet".to_data_path();
let taxi_data_url =
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet";
download_data(taxi_parquet_fpath, taxi_data_url)
Expand Down

0 comments on commit 66ad127

Please sign in to comment.