From 6403efa5eaf4b31f0025d4d6955935f08b5489dd Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 30 Jul 2024 14:26:20 -0400 Subject: [PATCH] fix idempotent async --- bench-vortex/src/lib.rs | 18 ++++++++++++++++++ bench-vortex/src/tpch/mod.rs | 33 ++++++++++++++++++--------------- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index a734692a9a..fe2b91142e 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::env::temp_dir; use std::fs::{create_dir_all, File}; +use std::future::Future; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -95,6 +96,23 @@ pub fn idempotent( Ok(data_path) } +pub async fn idempotent_async( + path: &P, + f: impl FnOnce(PathBuf) -> F, +) -> Result +where + F: Future>, + P: IdempotentPath + ?Sized, +{ + let data_path = path.to_data_path(); + if !data_path.exists() { + let temp_location = path.to_temp_path(); + f(temp_location.clone()).await?; + std::fs::rename(temp_location.as_path(), &data_path).unwrap(); + } + Ok(data_path) +} + pub trait IdempotentPath { fn to_data_path(&self) -> PathBuf; fn to_temp_path(&self) -> PathBuf; diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 59c8914794..0e7eccd80e 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -1,3 +1,4 @@ +#![allow(unused_imports, unused_variables)] use std::fs; use std::path::Path; use std::sync::Arc; @@ -7,13 +8,12 @@ use arrow_schema::Schema; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; -use futures::executor::block_on; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::{Array, ArrayDType, ArrayData, IntoArray}; use vortex_datafusion::{SessionContextExt, VortexMemTableOptions}; -use crate::idempotent; +use crate::idempotent_async; pub mod dbgen; pub mod schema; @@ -132,30 +132,33 @@ async fn register_parquet( file: &Path, schema: &Schema, ) -> anyhow::Result<()> { - // Idempotent conversion from TPCH CSV to Parquet. - let pq_file = idempotent( + + let csv_file = file.to_str().unwrap(); + let pq_file = idempotent_async( &file.with_extension("").with_extension("parquet"), - |pq_file| { - let df = block_on( - session.read_csv( - file.to_str().unwrap(), + |pq_file| async move { + let df = session + .read_csv( + csv_file, CsvReadOptions::default() .delimiter(b'|') .has_header(false) .file_extension("tbl") .schema(schema), - ), - ) - .unwrap(); + ) + .await?; - block_on(df.write_parquet( - pq_file.as_os_str().to_str().unwrap(), + df.write_parquet( + pq_file.as_path().as_os_str().to_str().unwrap(), DataFrameWriteOptions::default(), None, - )) + ) + .await?; + + Ok::<(), anyhow::Error>(()) }, ) - .unwrap(); + .await?; Ok(session .register_parquet(