Skip to content

Commit

Permalink
fix idempotent async
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jul 30, 2024
1 parent 24dca82 commit 6403efa
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
18 changes: 18 additions & 0 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::collections::HashSet;
use std::env::temp_dir;
use std::fs::{create_dir_all, File};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;

Expand Down Expand Up @@ -95,6 +96,23 @@ pub fn idempotent<T, E, P: IdempotentPath + ?Sized>(
Ok(data_path)
}

pub async fn idempotent_async<T, E, F, P>(
path: &P,
f: impl FnOnce(PathBuf) -> F,
) -> Result<PathBuf, E>
where
F: Future<Output = Result<T, E>>,
P: IdempotentPath + ?Sized,
{
let data_path = path.to_data_path();
if !data_path.exists() {
let temp_location = path.to_temp_path();
f(temp_location.clone()).await?;
std::fs::rename(temp_location.as_path(), &data_path).unwrap();
}
Ok(data_path)
}

pub trait IdempotentPath {
fn to_data_path(&self) -> PathBuf;
fn to_temp_path(&self) -> PathBuf;
Expand Down
33 changes: 18 additions & 15 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(unused_imports, unused_variables)]
use std::fs;
use std::path::Path;
use std::sync::Arc;
Expand All @@ -7,13 +8,12 @@ use arrow_schema::Schema;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use futures::executor::block_on;
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray};
use vortex_datafusion::{SessionContextExt, VortexMemTableOptions};

use crate::idempotent;
use crate::idempotent_async;

pub mod dbgen;
pub mod schema;
Expand Down Expand Up @@ -132,30 +132,33 @@ async fn register_parquet(
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
// Idempotent conversion from TPCH CSV to Parquet.
let pq_file = idempotent(

let csv_file = file.to_str().unwrap();
let pq_file = idempotent_async(
&file.with_extension("").with_extension("parquet"),
|pq_file| {
let df = block_on(
session.read_csv(
file.to_str().unwrap(),
|pq_file| async move {
let df = session
.read_csv(
csv_file,
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
),
)
.unwrap();
)
.await?;

block_on(df.write_parquet(
pq_file.as_os_str().to_str().unwrap(),
df.write_parquet(
pq_file.as_path().as_os_str().to_str().unwrap(),
DataFrameWriteOptions::default(),
None,
))
)
.await?;

Ok::<(), anyhow::Error>(())
},
)
.unwrap();
.await?;

Ok(session
.register_parquet(
Expand Down

0 comments on commit 6403efa

Please sign in to comment.