Skip to content

Commit

Permalink
Run TPCH as benchmark (#449)
Browse files Browse the repository at this point in the history
```
Benchmarking tpch q1/vortex
Benchmarking tpch q1/vortex: Warming up for 3.0000 s


Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 7.1s.
Benchmarking tpch q1/vortex: Collecting 10 samples in estimated 7.1180 s (10 iterations)
Benchmarking tpch q1/vortex: Analyzing
tpch q1/vortex          time:   [668.09 ms 685.95 ms 705.55 ms]
                        change: [+317551% +334478% +349967%] (p = 0.00 < 0.05)
                        Performance has regressed.
Benchmarking tpch q1/csv
Benchmarking tpch q1/csv: Warming up for 3.0000 s
Benchmarking tpch q1/csv: Collecting 10 samples in estimated 7.8648 s (20 iterations)
Benchmarking tpch q1/csv: Analyzing
tpch q1/csv             time:   [358.38 ms 364.37 ms 370.34 ms]
                        change: [+183409% +187136% +190580%] (p = 0.00 < 0.05)
                        Performance has regressed.
Benchmarking tpch q1/arrow
Benchmarking tpch q1/arrow: Warming up for 3.0000 s
Benchmarking tpch q1/arrow: Collecting 10 samples in estimated 8.7555 s (110 iterations)
Benchmarking tpch q1/arrow: Analyzing
tpch q1/arrow           time:   [74.480 ms 74.907 ms 75.247 ms]
                        change: [+38009% +38428% +38916%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 2 outliers among 10 measurements (20.00%)
  1 (10.00%) low mild
  1 (10.00%) high severe
 ```
  • Loading branch information
gatesn authored Jul 11, 2024
1 parent 767c9c1 commit a52f5e0
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 144 deletions.
4 changes: 4 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ harness = false
[[bench]]
name = "datafusion_benchmark"
harness = false

[[bench]]
name = "tpch_benchmark"
harness = false
42 changes: 42 additions & 0 deletions bench-vortex/benches/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::tpch::query::Q1;
use bench_vortex::tpch::{load_datasets, Format};
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::runtime::Runtime;

fn benchmark(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();

// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

let mut group = c.benchmark_group("tpch q1");
group.sample_size(10);

let ctx = runtime
.block_on(load_datasets(&data_dir, Format::VortexUncompressed))
.unwrap();
group.bench_function("vortex", |b| {
b.to_async(&runtime)
.iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() })
});

let ctx = runtime
.block_on(load_datasets(&data_dir, Format::Csv))
.unwrap();
group.bench_function("csv", |b| {
b.to_async(&runtime)
.iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() })
});

let ctx = runtime
.block_on(load_datasets(&data_dir, Format::Arrow))
.unwrap();
group.bench_function("arrow", |b| {
b.to_async(&runtime)
.iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() })
});
}

criterion_group!(benches, benchmark);
criterion_main!(benches);
146 changes: 2 additions & 144 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,151 +1,9 @@
#![allow(dead_code)]

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::path::PathBuf;
use std::time::SystemTime;

use arrow_array::StructArray;
use arrow_schema::Schema;
use bench_vortex::tpch;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical};
use vortex_datafusion::SessionContextExt;

enum Format {
Csv,
Arrow,
VortexUncompressed,
}

// Generate table dataset.
async fn load_datasets<P: AsRef<Path>>(
base_dir: P,
format: Format,
) -> anyhow::Result<SessionContext> {
let context = SessionContext::new();
let base_dir = base_dir.as_ref();

let customer = base_dir.join("customer.tbl");
let lineitem = base_dir.join("lineitem.tbl");
let nation = base_dir.join("nation.tbl");
let orders = base_dir.join("orders.tbl");
let part = base_dir.join("part.tbl");
let partsupp = base_dir.join("partsupp.tbl");
let region = base_dir.join("region.tbl");
let supplier = base_dir.join("supplier.tbl");

macro_rules! register_table {
($name:ident, $schema:expr) => {
match format {
Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await,
Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await,
Format::VortexUncompressed => {
register_vortex(&context, stringify!($name), &$name, $schema).await
}
}
};
}

register_table!(customer, &tpch::schema::CUSTOMER)?;
register_table!(lineitem, &tpch::schema::LINEITEM)?;
register_table!(nation, &tpch::schema::NATION)?;
register_table!(orders, &tpch::schema::ORDERS)?;
register_table!(part, &tpch::schema::PART)?;
register_table!(partsupp, &tpch::schema::PARTSUPP)?;
register_table!(region, &tpch::schema::REGION)?;
register_table!(supplier, &tpch::schema::SUPPLIER)?;

Ok(context)
}

async fn register_csv(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
session
.register_csv(
name,
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?;

Ok(())
}

async fn register_arrow(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
// Read CSV file into a set of Arrow RecordBatch.
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

let mem_table = MemTable::try_new(Arc::new(schema.clone()), vec![record_batches])?;
session.register_table(name, Arc::new(mem_table))?;

Ok(())
}

async fn register_vortex(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
// TODO(aduffy): add compression option
) -> anyhow::Result<()> {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

// Create a ChunkedArray from the set of chunks.
let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(StructArray::from)
.map(|struct_array| ArrayData::from_arrow(&struct_array, false).into_array())
.collect();

let dtype = chunks[0].dtype().clone();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?
.into_canonical()?
.into_array();

session.register_vortex(name, chunked_array)?;

Ok(())
}
use bench_vortex::tpch::{load_datasets, Format};

async fn q1_csv(base_dir: &PathBuf) -> anyhow::Result<()> {
let ctx = load_datasets(base_dir, Format::Csv).await?;
Expand Down
144 changes: 144 additions & 0 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,147 @@
use std::path::Path;
use std::sync::Arc;

use arrow_array::StructArray;
use arrow_schema::Schema;
use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical};
use vortex_datafusion::SessionContextExt;

pub mod dbgen;
pub mod query;
pub mod schema;

pub enum Format {
Csv,
Arrow,
VortexUncompressed,
}

// Generate table dataset.
pub async fn load_datasets<P: AsRef<Path>>(
base_dir: P,
format: Format,
) -> anyhow::Result<SessionContext> {
let context = SessionContext::new();
let base_dir = base_dir.as_ref();

let customer = base_dir.join("customer.tbl");
let lineitem = base_dir.join("lineitem.tbl");
let nation = base_dir.join("nation.tbl");
let orders = base_dir.join("orders.tbl");
let part = base_dir.join("part.tbl");
let partsupp = base_dir.join("partsupp.tbl");
let region = base_dir.join("region.tbl");
let supplier = base_dir.join("supplier.tbl");

macro_rules! register_table {
($name:ident, $schema:expr) => {
match format {
Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await,
Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await,
Format::VortexUncompressed => {
register_vortex(&context, stringify!($name), &$name, $schema).await
}
}
};
}

register_table!(customer, &schema::CUSTOMER)?;
register_table!(lineitem, &schema::LINEITEM)?;
register_table!(nation, &schema::NATION)?;
register_table!(orders, &schema::ORDERS)?;
register_table!(part, &schema::PART)?;
register_table!(partsupp, &schema::PARTSUPP)?;
register_table!(region, &schema::REGION)?;
register_table!(supplier, &schema::SUPPLIER)?;

Ok(context)
}

async fn register_csv(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
session
.register_csv(
name,
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?;

Ok(())
}

async fn register_arrow(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
// Read CSV file into a set of Arrow RecordBatch.
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

let mem_table = MemTable::try_new(Arc::new(schema.clone()), vec![record_batches])?;
session.register_table(name, Arc::new(mem_table))?;

Ok(())
}

async fn register_vortex(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
// TODO(aduffy): add compression option
) -> anyhow::Result<()> {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

// Create a ChunkedArray from the set of chunks.
let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(StructArray::from)
.map(|struct_array| ArrayData::from_arrow(&struct_array, false).into_array())
.collect();

let dtype = chunks[0].dtype().clone();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?
.into_canonical()?
.into_array();

session.register_vortex(name, chunked_array)?;

Ok(())
}

0 comments on commit a52f5e0

Please sign in to comment.