diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 2d0f561e86..89565d3172 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -68,3 +68,7 @@ harness = false [[bench]] name = "datafusion_benchmark" harness = false + +[[bench]] +name = "tpch_benchmark" +harness = false diff --git a/bench-vortex/benches/tpch_benchmark.rs b/bench-vortex/benches/tpch_benchmark.rs new file mode 100644 index 0000000000..3e56cf2aea --- /dev/null +++ b/bench-vortex/benches/tpch_benchmark.rs @@ -0,0 +1,42 @@ +use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; +use bench_vortex::tpch::query::Q1; +use bench_vortex::tpch::{load_datasets, Format}; +use criterion::{criterion_group, criterion_main, Criterion}; +use tokio::runtime::Runtime; + +fn benchmark(c: &mut Criterion) { + let runtime = Runtime::new().unwrap(); + + // Run TPC-H data gen. + let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); + + let mut group = c.benchmark_group("tpch q1"); + group.sample_size(10); + + let ctx = runtime + .block_on(load_datasets(&data_dir, Format::VortexUncompressed)) + .unwrap(); + group.bench_function("vortex", |b| { + b.to_async(&runtime) + .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) + }); + + let ctx = runtime + .block_on(load_datasets(&data_dir, Format::Csv)) + .unwrap(); + group.bench_function("csv", |b| { + b.to_async(&runtime) + .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) + }); + + let ctx = runtime + .block_on(load_datasets(&data_dir, Format::Arrow)) + .unwrap(); + group.bench_function("arrow", |b| { + b.to_async(&runtime) + .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) + }); +} + +criterion_group!(benches, benchmark); +criterion_main!(benches); diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 1edaed38eb..3919a56426 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -1,151 +1,9 @@ -#![allow(dead_code)] - -use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::path::PathBuf; use std::time::SystemTime; -use arrow_array::StructArray; -use arrow_schema::Schema; use bench_vortex::tpch; use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; -use datafusion::datasource::MemTable; -use datafusion::prelude::{CsvReadOptions, SessionContext}; -use vortex::array::chunked::ChunkedArray; -use vortex::arrow::FromArrowArray; -use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; -use vortex_datafusion::SessionContextExt; - -enum Format { - Csv, - Arrow, - VortexUncompressed, -} - -// Generate table dataset. -async fn load_datasets>( - base_dir: P, - format: Format, -) -> anyhow::Result { - let context = SessionContext::new(); - let base_dir = base_dir.as_ref(); - - let customer = base_dir.join("customer.tbl"); - let lineitem = base_dir.join("lineitem.tbl"); - let nation = base_dir.join("nation.tbl"); - let orders = base_dir.join("orders.tbl"); - let part = base_dir.join("part.tbl"); - let partsupp = base_dir.join("partsupp.tbl"); - let region = base_dir.join("region.tbl"); - let supplier = base_dir.join("supplier.tbl"); - - macro_rules! register_table { - ($name:ident, $schema:expr) => { - match format { - Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await, - Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await, - Format::VortexUncompressed => { - register_vortex(&context, stringify!($name), &$name, $schema).await - } - } - }; - } - - register_table!(customer, &tpch::schema::CUSTOMER)?; - register_table!(lineitem, &tpch::schema::LINEITEM)?; - register_table!(nation, &tpch::schema::NATION)?; - register_table!(orders, &tpch::schema::ORDERS)?; - register_table!(part, &tpch::schema::PART)?; - register_table!(partsupp, &tpch::schema::PARTSUPP)?; - register_table!(region, &tpch::schema::REGION)?; - register_table!(supplier, &tpch::schema::SUPPLIER)?; - - Ok(context) -} - -async fn register_csv( - session: &SessionContext, - name: &str, - file: &Path, - schema: &Schema, -) -> anyhow::Result<()> { - session - .register_csv( - name, - file.to_str().unwrap(), - CsvReadOptions::default() - .delimiter(b'|') - .has_header(false) - .file_extension("tbl") - .schema(schema), - ) - .await?; - - Ok(()) -} - -async fn register_arrow( - session: &SessionContext, - name: &str, - file: &Path, - schema: &Schema, -) -> anyhow::Result<()> { - // Read CSV file into a set of Arrow RecordBatch. - let record_batches = session - .read_csv( - file.to_str().unwrap(), - CsvReadOptions::default() - .delimiter(b'|') - .has_header(false) - .file_extension("tbl") - .schema(schema), - ) - .await? - .collect() - .await?; - - let mem_table = MemTable::try_new(Arc::new(schema.clone()), vec![record_batches])?; - session.register_table(name, Arc::new(mem_table))?; - - Ok(()) -} - -async fn register_vortex( - session: &SessionContext, - name: &str, - file: &Path, - schema: &Schema, - // TODO(aduffy): add compression option -) -> anyhow::Result<()> { - let record_batches = session - .read_csv( - file.to_str().unwrap(), - CsvReadOptions::default() - .delimiter(b'|') - .has_header(false) - .file_extension("tbl") - .schema(schema), - ) - .await? - .collect() - .await?; - - // Create a ChunkedArray from the set of chunks. - let chunks: Vec = record_batches - .iter() - .cloned() - .map(StructArray::from) - .map(|struct_array| ArrayData::from_arrow(&struct_array, false).into_array()) - .collect(); - - let dtype = chunks[0].dtype().clone(); - let chunked_array = ChunkedArray::try_new(chunks, dtype)? - .into_canonical()? - .into_array(); - - session.register_vortex(name, chunked_array)?; - - Ok(()) -} +use bench_vortex::tpch::{load_datasets, Format}; async fn q1_csv(base_dir: &PathBuf) -> anyhow::Result<()> { let ctx = load_datasets(base_dir, Format::Csv).await?; diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index edde899090..93fce99c45 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -1,3 +1,147 @@ +use std::path::Path; +use std::sync::Arc; + +use arrow_array::StructArray; +use arrow_schema::Schema; +use datafusion::datasource::MemTable; +use datafusion::prelude::{CsvReadOptions, SessionContext}; +use vortex::array::chunked::ChunkedArray; +use vortex::arrow::FromArrowArray; +use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; +use vortex_datafusion::SessionContextExt; + pub mod dbgen; pub mod query; pub mod schema; + +pub enum Format { + Csv, + Arrow, + VortexUncompressed, +} + +// Generate table dataset. +pub async fn load_datasets>( + base_dir: P, + format: Format, +) -> anyhow::Result { + let context = SessionContext::new(); + let base_dir = base_dir.as_ref(); + + let customer = base_dir.join("customer.tbl"); + let lineitem = base_dir.join("lineitem.tbl"); + let nation = base_dir.join("nation.tbl"); + let orders = base_dir.join("orders.tbl"); + let part = base_dir.join("part.tbl"); + let partsupp = base_dir.join("partsupp.tbl"); + let region = base_dir.join("region.tbl"); + let supplier = base_dir.join("supplier.tbl"); + + macro_rules! register_table { + ($name:ident, $schema:expr) => { + match format { + Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await, + Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await, + Format::VortexUncompressed => { + register_vortex(&context, stringify!($name), &$name, $schema).await + } + } + }; + } + + register_table!(customer, &schema::CUSTOMER)?; + register_table!(lineitem, &schema::LINEITEM)?; + register_table!(nation, &schema::NATION)?; + register_table!(orders, &schema::ORDERS)?; + register_table!(part, &schema::PART)?; + register_table!(partsupp, &schema::PARTSUPP)?; + register_table!(region, &schema::REGION)?; + register_table!(supplier, &schema::SUPPLIER)?; + + Ok(context) +} + +async fn register_csv( + session: &SessionContext, + name: &str, + file: &Path, + schema: &Schema, +) -> anyhow::Result<()> { + session + .register_csv( + name, + file.to_str().unwrap(), + CsvReadOptions::default() + .delimiter(b'|') + .has_header(false) + .file_extension("tbl") + .schema(schema), + ) + .await?; + + Ok(()) +} + +async fn register_arrow( + session: &SessionContext, + name: &str, + file: &Path, + schema: &Schema, +) -> anyhow::Result<()> { + // Read CSV file into a set of Arrow RecordBatch. + let record_batches = session + .read_csv( + file.to_str().unwrap(), + CsvReadOptions::default() + .delimiter(b'|') + .has_header(false) + .file_extension("tbl") + .schema(schema), + ) + .await? + .collect() + .await?; + + let mem_table = MemTable::try_new(Arc::new(schema.clone()), vec![record_batches])?; + session.register_table(name, Arc::new(mem_table))?; + + Ok(()) +} + +async fn register_vortex( + session: &SessionContext, + name: &str, + file: &Path, + schema: &Schema, + // TODO(aduffy): add compression option +) -> anyhow::Result<()> { + let record_batches = session + .read_csv( + file.to_str().unwrap(), + CsvReadOptions::default() + .delimiter(b'|') + .has_header(false) + .file_extension("tbl") + .schema(schema), + ) + .await? + .collect() + .await?; + + // Create a ChunkedArray from the set of chunks. + let chunks: Vec = record_batches + .iter() + .cloned() + .map(StructArray::from) + .map(|struct_array| ArrayData::from_arrow(&struct_array, false).into_array()) + .collect(); + + let dtype = chunks[0].dtype().clone(); + let chunked_array = ChunkedArray::try_new(chunks, dtype)? + .into_canonical()? + .into_array(); + + session.register_vortex(name, chunked_array)?; + + Ok(()) +}