Skip to content

Commit

Permalink
Hook on-disk vortex files into benchmarking (#565)
Browse files Browse the repository at this point in the history
Adding the Vortex on-file format to our benchmarking suite
  • Loading branch information
AdamGS authored Aug 8, 2024
1 parent 3aa7782 commit c33819a
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 124 deletions.
24 changes: 22 additions & 2 deletions bench-vortex/benches/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ fn benchmark(c: &mut Criterion) {
let vortex_pushdown_disabled_ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::Vortex {
Format::InMemoryVortex {
enable_pushdown: false,
},
))
.unwrap();
let vortex_ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::Vortex {
Format::InMemoryVortex {
enable_pushdown: true,
},
))
Expand All @@ -31,6 +31,14 @@ fn benchmark(c: &mut Criterion) {
let parquet_ctx = runtime
.block_on(load_datasets(&data_dir, Format::Parquet))
.unwrap();
let persistent_vortex_ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::OnDiskVortex {
enable_compression: true,
},
))
.unwrap();

for (q, query) in tpch_queries() {
let mut group = c.benchmark_group(format!("tpch_q{q}"));
Expand Down Expand Up @@ -83,6 +91,18 @@ fn benchmark(c: &mut Criterion) {
.unwrap()
})
});

group.bench_function("persistent_compressed_vortex", |b| {
b.to_async(&runtime).iter(|| async {
persistent_vortex_ctx
.sql(&query)
.await
.unwrap()
.collect()
.await
.unwrap()
})
});
}
}

Expand Down
8 changes: 7 additions & 1 deletion bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ async fn main() {
let formats = [
Format::Arrow,
Format::Parquet,
Format::Vortex {
Format::InMemoryVortex {
enable_pushdown: true,
},
Format::OnDiskVortex {
enable_compression: true,
},
Format::OnDiskVortex {
enable_compression: false,
},
];

// Load datasets
Expand Down
144 changes: 137 additions & 7 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Arc;

use arrow_array::StructArray;
use arrow_array::StructArray as ArrowStructArray;
use arrow_schema::Schema;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::MemTable;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use vortex::array::ChunkedArray;
use tokio::fs::OpenOptions;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, IntoArray};
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::SessionContextExt;
use vortex_dtype::DType;
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::layouts::writer::LayoutWriter;

use crate::idempotent_async;

Expand All @@ -23,7 +31,8 @@ pub enum Format {
Csv,
Arrow,
Parquet,
Vortex { enable_pushdown: bool },
InMemoryVortex { enable_pushdown: bool },
OnDiskVortex { enable_compression: bool },
}

// Generate table dataset.
Expand Down Expand Up @@ -51,7 +60,7 @@ pub async fn load_datasets<P: AsRef<Path>>(
Format::Parquet => {
register_parquet(&context, stringify!($name), &$name, $schema).await
}
Format::Vortex {
Format::InMemoryVortex {
enable_pushdown, ..
} => {
register_vortex(
Expand All @@ -63,6 +72,16 @@ pub async fn load_datasets<P: AsRef<Path>>(
)
.await
}
Format::OnDiskVortex { enable_compression } => {
register_vortex_file(
&context,
stringify!($name),
&$name,
$schema,
enable_compression,
)
.await
}
}
};
}
Expand Down Expand Up @@ -168,6 +187,117 @@ async fn register_parquet(
.await?)
}

async fn register_vortex_file(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
enable_compression: bool,
) -> anyhow::Result<()> {
let vtx_file = idempotent_async(
&file.with_extension("").with_extension("vtx"),
|vtx_file| async move {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.iter()
.cloned()
.map(Array::from)
.map(|a| a.into_struct().unwrap())
.collect::<Vec<_>>();

let mut arrays_map: HashMap<Arc<str>, Vec<Array>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();

for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();

for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name).unwrap());

types_map.insert(field_name.clone(), field_type.clone());
}
}

let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map.get(&name).unwrap().clone();
let chunks = arrays_map.remove(&name).unwrap();

(
name.clone(),
ChunkedArray::try_new(chunks, dtype).unwrap().into_array(),
)
})
.collect::<Vec<_>>();

let data = StructArray::from_fields(&fields).into_array();

let data = if enable_compression {
let compressor = SamplingCompressor::default();
compressor.compress(&data, None)?.into_array()
} else {
data
};

let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;

let mut writer = LayoutWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;

anyhow::Ok(())
},
)
.await?;

let f = OpenOptions::new()
.read(true)
.write(true)
.open(&vtx_file)
.await?;
let file_size = f.metadata().await?.len();

let schema_ref = Arc::new(schema.clone());

session.register_disk_vortex_opts(
name,
ObjectStoreUrl::local_filesystem(),
VortexTableOptions::new(
schema_ref,
vec![VortexFile::new(
vtx_file.to_str().unwrap().to_string(),
file_size,
)],
),
)?;

Ok(())
}

async fn register_vortex(
session: &SessionContext,
name: &str,
Expand All @@ -192,14 +322,14 @@ async fn register_vortex(
let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(StructArray::from)
.map(ArrowStructArray::from)
.map(|struct_array| Array::from_arrow(&struct_array, false))
.collect();

let dtype = chunks[0].dtype().clone();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?.into_array();

session.register_vortex_opts(
session.register_mem_vortex_opts(
name,
chunked_array,
VortexMemTableOptions::default().with_pushdown(enable_pushdown),
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/tpch/q4.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ select
from
orders
where
o_orderdate >= '1993-07-01'
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
and exists (
select
Expand Down
20 changes: 19 additions & 1 deletion vortex-array/src/arrow/recordbatch.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use arrow_array::cast::as_struct_array;
use arrow_array::RecordBatch;
use itertools::Itertools;

use crate::array::StructArray;
use crate::arrow::FromArrowArray;
use crate::validity::Validity;
use crate::Array;
use crate::{Array, IntoArray, IntoCanonical};

impl From<RecordBatch> for Array {
fn from(value: RecordBatch) -> Self {
Expand All @@ -29,3 +30,20 @@ impl From<RecordBatch> for Array {
.into()
}
}

impl From<Array> for RecordBatch {
fn from(value: Array) -> Self {
let array_ref = value
.into_canonical()
.expect("struct arrays must canonicalize")
.into_arrow();
let struct_array = as_struct_array(array_ref.as_ref());
RecordBatch::from(struct_array)
}
}

impl From<StructArray> for RecordBatch {
fn from(value: StructArray) -> Self {
RecordBatch::from(value.into_array())
}
}
4 changes: 2 additions & 2 deletions vortex-datafusion/examples/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use url::Url;
use vortex::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray};
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex_datafusion::persistent::config::{VortexFile, VortexTableConfig};
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::persistent::provider::VortexFileTableProvider;
use vortex_serde::layouts::writer::LayoutWriter;

Expand Down Expand Up @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> {

let p = Path::from_filesystem_path(filepath)?;

let config = VortexTableConfig::new(
let config = VortexTableOptions::new(
Arc::new(Schema::new(vec![
Field::new("strings", DataType::Utf8, false),
Field::new("numbers", DataType::UInt32, false),
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ mod test {
DType, ExtDType, ExtID, FieldName, FieldNames, Nullability, PType, StructDType,
};

use crate::datatype::{infer_data_type, infer_schema};
use super::*;

#[test]
fn test_dtype_conversion_success() {
Expand Down
Loading

0 comments on commit c33819a

Please sign in to comment.