diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 29f4ba616c..f7ba9be9e0 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -28,14 +28,10 @@ use vortex_schema::DType; pub mod reader; pub mod taxi_data; -pub fn idempotent( - name: &str, - f: impl FnOnce(&mut File) -> Result, -) -> Result { +pub fn idempotent(name: &str, f: impl FnOnce(&Path) -> Result) -> Result { let path = data_path(name); if !path.exists() { - let mut file = File::create(&path).unwrap(); - f(&mut file)?; + f(&path)?; } Ok(path.to_path_buf()) } diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index 182ff5927e..a0bb9423e9 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -15,16 +15,17 @@ use vortex::arrow::FromArrowType; use vortex::serde::WriteCtx; use vortex_schema::DType; +use crate::idempotent; use crate::reader::compress_vortex; -use crate::{data_path, idempotent}; fn download_taxi_data() -> PathBuf { - idempotent("yellow-tripdata-2023-11.parquet", |file| { + idempotent("yellow-tripdata-2023-11.parquet", |path| { + let mut file = File::create(path).unwrap(); reqwest::blocking::get( "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet", ) .unwrap() - .copy_to(file) + .copy_to(&mut file) }) .unwrap() } @@ -34,28 +35,26 @@ pub fn taxi_data_parquet() -> PathBuf { } pub fn taxi_data_lance() -> PathBuf { - let write_params = WriteParams::default(); + idempotent("taxi.lance", |path| { + let write_params = WriteParams::default(); - let read = File::open(taxi_data_parquet()).unwrap(); - let reader = LanceParquetRecordBatchReaderBuilder::try_new(read) - .unwrap() - .build() - .unwrap(); + let read = File::open(taxi_data_parquet()).unwrap(); + let reader = LanceParquetRecordBatchReaderBuilder::try_new(read) + .unwrap() + .build() + .unwrap(); - let lance_path = data_path("taxi.lance"); - Runtime::new() - .unwrap() - .block_on(Dataset::write( + Runtime::new().unwrap().block_on(Dataset::write( reader, - lance_path.as_os_str().to_str().unwrap(), + path.to_str().unwrap(), Some(write_params), )) - .unwrap(); - lance_path + }) + .unwrap() } pub fn taxi_data_vortex() -> PathBuf { - idempotent("taxi-uncompressed.vortex", |write| { + idempotent("taxi-uncompressed.vortex", |path| { let taxi_pq = File::open(download_taxi_data()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap(); @@ -70,7 +69,8 @@ pub fn taxi_data_vortex() -> PathBuf { .collect_vec(); let chunked = ChunkedArray::new(chunks, dtype.clone()); - let mut write_ctx = WriteCtx::new(write); + let mut write = File::create(path).unwrap(); + let mut write_ctx = WriteCtx::new(&mut write); write_ctx.dtype(&dtype)?; write_ctx.write(&chunked) }) @@ -78,8 +78,9 @@ pub fn taxi_data_vortex() -> PathBuf { } pub fn taxi_data_vortex_compressed() -> PathBuf { - idempotent("taxi.vortex", |write| { - compress_vortex(&taxi_data_parquet(), write) + idempotent("taxi.vortex", |path| { + let mut write = File::create(path).unwrap(); + compress_vortex(&taxi_data_parquet(), &mut write) }) .unwrap() }