diff --git a/Cargo.lock b/Cargo.lock index c93951c46f..118a90ba80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -972,6 +972,7 @@ dependencies = [ "reqwest 0.12.3", "simplelog", "tokio", + "uuid", "vortex-alp", "vortex-array", "vortex-array2", diff --git a/Cargo.toml b/Cargo.toml index 3a3e642b08..7b33add5cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ bzip2 = "0.4.4" csv = "1.3.0" arrow-csv = "51.0.0" lazy_static = "1.4.0" +uuid = "1.8.0" [workspace.lints.rust] warnings = "deny" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index e6d9f95f5d..da29da0e65 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -46,6 +46,7 @@ csv = { workspace = true } arrow-csv = { workspace = true } arrow = { workspace = true } humansize = { workspace = true } +uuid = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/bench-vortex/src/bin/compress.rs b/bench-vortex/src/bin/compress.rs index 871020c628..3bb79bd98c 100644 --- a/bench-vortex/src/bin/compress.rs +++ b/bench-vortex/src/bin/compress.rs @@ -19,7 +19,7 @@ pub fn main() { } fn compress_taxi() { - let path: PathBuf = "taxi_data.vortex".to_idempotent_path(); + let path: PathBuf = "taxi_data.vortex".to_data_path(); { let mut write = File::create(&path).unwrap(); rewrite_parquet_as_vortex(taxi_data_parquet(), &mut write).unwrap(); diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs index 4989b9e64a..8c6b97f8b4 100644 --- a/bench-vortex/src/data_downloads.rs +++ b/bench-vortex/src/data_downloads.rs @@ -25,7 +25,11 @@ pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf { idempotent(&fname, |path| { info!("Downloading {} from {}", fname.to_str().unwrap(), data_url); let mut file = File::create(path).unwrap(); - reqwest::blocking::get(data_url).unwrap().copy_to(&mut file) + let mut response = reqwest::blocking::get(data_url).unwrap(); + if !response.status().is_success() { + panic!("Failed to download data from {}", data_url); + } + response.copy_to(&mut file) }) .unwrap() } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index e2bb47f65c..ab20becad7 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -1,3 +1,4 @@ +use std::env::temp_dir; use std::fs::{create_dir_all, File}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -40,19 +41,23 @@ pub fn idempotent( path: &P, f: impl FnOnce(&Path) -> Result, ) -> Result { - let path = path.to_idempotent_path(); - if !path.exists() { - f(path.as_path())?; + let data_path = path.to_data_path(); + if !data_path.exists() { + let temp_location = path.to_temp_path(); + let temp_path = temp_location.as_path(); + f(temp_path)?; + std::fs::rename(temp_path, &data_path).unwrap(); } - Ok(path) + Ok(data_path) } pub trait IdempotentPath { - fn to_idempotent_path(&self) -> PathBuf; + fn to_data_path(&self) -> PathBuf; + fn to_temp_path(&self) -> PathBuf; } impl IdempotentPath for str { - fn to_idempotent_path(&self) -> PathBuf { + fn to_data_path(&self) -> PathBuf { let path = Path::new(env!("CARGO_MANIFEST_DIR")) .join("data") .join(self); @@ -61,15 +66,31 @@ impl IdempotentPath for str { } path } + + fn to_temp_path(&self) -> PathBuf { + let temp_dir = temp_dir().join(uuid::Uuid::new_v4().to_string()); + if !temp_dir.exists() { + create_dir_all(temp_dir.clone()).unwrap(); + } + temp_dir.join(self) + } } impl IdempotentPath for PathBuf { - fn to_idempotent_path(&self) -> PathBuf { + fn to_data_path(&self) -> PathBuf { if !self.parent().unwrap().exists() { create_dir_all(self.parent().unwrap()).unwrap(); } self.to_path_buf() } + + fn to_temp_path(&self) -> PathBuf { + let temp_dir = std::env::temp_dir().join(uuid::Uuid::new_v4().to_string()); + if !temp_dir.exists() { + create_dir_all(temp_dir.clone()).unwrap(); + } + temp_dir.join(self.file_name().unwrap()) + } } pub fn setup_logger(level: LevelFilter) { diff --git a/bench-vortex/src/public_bi_data.rs b/bench-vortex/src/public_bi_data.rs index c480d08e01..218fcd6878 100644 --- a/bench-vortex/src/public_bi_data.rs +++ b/bench-vortex/src/public_bi_data.rs @@ -305,7 +305,7 @@ impl PBIDataset { }; "PBI" - .to_idempotent_path() + .to_data_path() .join(self.dataset_name()) .join(extension) .join(url.file_name.strip_suffix(".csv.bz2").unwrap()) @@ -323,7 +323,7 @@ impl PBIDataset { fn get_bzip_path(&self, url: &PBIUrl) -> PathBuf { "PBI" - .to_idempotent_path() + .to_data_path() .join(self.dataset_name()) .join("bzip2") .join(url.file_name) @@ -352,9 +352,9 @@ impl PBIUrl { } } fn to_url_string(&self) -> Url { - Url::parse("https://homepages.cwi.nl/~boncz/PublicBIbenchmark") + Url::parse("https://homepages.cwi.nl/~boncz/PublicBIbenchmark/") .unwrap() - .join(self.dataset_name) + .join(format!("{}/", self.dataset_name).as_str()) .unwrap() .join(self.file_name) .unwrap() @@ -536,9 +536,7 @@ impl BenchmarkDataset for BenchmarkDatasets { fn directory_location(&self) -> PathBuf { match self { - BenchmarkDatasets::PBI(dataset) => { - "PBI".to_idempotent_path().join(dataset.dataset_name()) - } + BenchmarkDatasets::PBI(dataset) => "PBI".to_data_path().join(dataset.dataset_name()), } } } diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index bc5b703643..30f795e1ef 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -8,7 +8,7 @@ use crate::reader::rewrite_parquet_as_vortex; use crate::{idempotent, IdempotentPath}; fn download_taxi_data() -> PathBuf { - let taxi_parquet_fpath = "yellow-tripdata-2023-11.parquet".to_idempotent_path(); + let taxi_parquet_fpath = "yellow-tripdata-2023-11.parquet".to_data_path(); let taxi_data_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet"; download_data(taxi_parquet_fpath, taxi_data_url)