Skip to content

Commit

Permalink
Setup TPC-H benchmark infra (#444)
Browse files Browse the repository at this point in the history
Add in TPC-H benchmark infra.

This includes generating all tables with configurable scale-factor.
Currently only Query 1 is implemented, other queries can be added as
needed.

## Running benchmark

To execute the benchmark a single time, including to generate and cache
the dataset, you can run

```
cargo run --release -p bench-vortex --bin tpch_benchmark
```

If you have `cargo-instruments` installed and you've already cached the
data above, you can do a profiling run with:

```
cargo instruments -p bench-vortex --bin tpch_benchmark --template Time --profile bench
```

## FLUPs

Potential avenues for extension from here:

* Adding more queries
* Adding as a proper criterion benchmark, currently though it's more
useful to use as a source for Instruments profiles
* Add CLI flags to run different configurations for comparison across
different queries
  • Loading branch information
a10y authored Jul 10, 2024
1 parent e8b81c7 commit 9419059
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 21 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,6 @@ benchmarks/.out
# Scratch
*.txt
*.vortex

# TPC-H benchmarking data
data/
25 changes: 21 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ rust-version = { workspace = true }
workspace = true

[dependencies]
anyhow = "1.0"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
Expand Down Expand Up @@ -51,6 +52,7 @@ vortex-ipc = { path = "../vortex-ipc", features = ["object_store"] }
vortex-roaring = { path = "../encodings/roaring" }
vortex-runend = { path = "../encodings/runend" }
vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }
xshell = "0.2.6"

[dev-dependencies]
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }
Expand Down
21 changes: 17 additions & 4 deletions bench-vortex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,22 @@ There are a number of benchmarks in this repository that can be run using the `c
or less how you'd expect.

There are also some binaries that are not run by default, but produce some reporting artifacts that can be useful for comparing vortex compression to parquet and debugging vortex compression performance. These are:
* _compress.rs_
* This binary compresses a file using vortex compression and writes the compressed file to disk where it can be examined or used for other operations.
* _comparison.rs_
* This binary compresses a dataset using vortex compression and parquet, taking some stats on the compression performance of each run, and writes out these stats to a csv.

### `compress.rs`

This binary compresses a file using vortex compression and writes the compressed file to disk where it can be examined or used for other operations.

### `comparison.rs`

This binary compresses a dataset using vortex compression and parquet, taking some stats on the compression performance of each run, and writes out these stats to a csv.
* This csv can then be loaded into duckdb and analyzed with the included comparison.sql script.

### `tpch_benchmark.rs`

This binary will run TPC-H query 1 using DataFusion, comparing the Vortex in-memory provider against Arrow and CSV.

For profiling, you can open in Instruments using the following invocation:

```
cargo instruments -p bench-vortex --bin tpch_benchmark --template Time --profile bench
```
11 changes: 4 additions & 7 deletions bench-vortex/benches/datafusion_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,10 @@ fn bench_vortex<M: Measurement>(
compress: bool,
) {
let vortex_dataset = toy_dataset_vortex(compress);
let vortex_table = Arc::new(
VortexMemTable::try_new(
vortex_dataset,
VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown),
)
.unwrap(),
);
let vortex_table = Arc::new(VortexMemTable::new(
vortex_dataset,
VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown),
));

measure_provider(&mut group, session, vortex_table);
}
Expand Down
199 changes: 199 additions & 0 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#![allow(dead_code)]

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;

use arrow_array::StructArray;
use arrow_schema::Schema;
use bench_vortex::tpch;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical};
use vortex_datafusion::SessionContextExt;

enum Format {
Csv,
Arrow,
VortexUncompressed,
}

// Generate table dataset.
async fn load_datasets<P: AsRef<Path>>(
base_dir: P,
format: Format,
) -> anyhow::Result<SessionContext> {
let context = SessionContext::new();
let base_dir = base_dir.as_ref();

let customer = base_dir.join("customer.tbl");
let lineitem = base_dir.join("lineitem.tbl");
let nation = base_dir.join("nation.tbl");
let orders = base_dir.join("orders.tbl");
let part = base_dir.join("part.tbl");
let partsupp = base_dir.join("partsupp.tbl");
let region = base_dir.join("region.tbl");
let supplier = base_dir.join("supplier.tbl");

macro_rules! register_table {
($name:ident, $schema:expr) => {
match format {
Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await,
Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await,
Format::VortexUncompressed => {
register_vortex(&context, stringify!($name), &$name, $schema).await
}
}
};
}

register_table!(customer, &tpch::schema::CUSTOMER)?;
register_table!(lineitem, &tpch::schema::LINEITEM)?;
register_table!(nation, &tpch::schema::NATION)?;
register_table!(orders, &tpch::schema::ORDERS)?;
register_table!(part, &tpch::schema::PART)?;
register_table!(partsupp, &tpch::schema::PARTSUPP)?;
register_table!(region, &tpch::schema::REGION)?;
register_table!(supplier, &tpch::schema::SUPPLIER)?;

Ok(context)
}

async fn register_csv(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
session
.register_csv(
name,
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?;

Ok(())
}

async fn register_arrow(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
// Read CSV file into a set of Arrow RecordBatch.
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

let mem_table = MemTable::try_new(Arc::new(schema.clone()), vec![record_batches])?;
session.register_table(name, Arc::new(mem_table))?;

Ok(())
}

async fn register_vortex(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
// TODO(aduffy): add compression option
) -> anyhow::Result<()> {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

// Create a ChunkedArray from the set of chunks.
let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(StructArray::from)
.map(|struct_array| ArrayData::from_arrow(&struct_array, false).into_array())
.collect();

let dtype = chunks[0].dtype().clone();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?
.into_canonical()?
.into_array();

session.register_vortex(name, chunked_array)?;

Ok(())
}

async fn q1_csv(base_dir: &PathBuf) -> anyhow::Result<()> {
let ctx = load_datasets(base_dir, Format::Csv).await?;

println!("BEGIN: Q1(CSV)");

let start = SystemTime::now();
ctx.sql(tpch::query::Q1).await?.show().await?;
let elapsed = start.elapsed()?.as_millis();
println!("END CSV: {elapsed}ms");

Ok(())
}

async fn q1_arrow(base_dir: &PathBuf) -> anyhow::Result<()> {
let ctx = load_datasets(base_dir, Format::Arrow).await?;

println!("BEGIN: Q1(ARROW)");
let start = SystemTime::now();

ctx.sql(tpch::query::Q1).await?.show().await?;
let elapsed = start.elapsed()?.as_millis();

println!("END ARROW: {elapsed}ms");

Ok(())
}

async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> {
let ctx = load_datasets(base_dir, Format::VortexUncompressed).await?;

println!("BEGIN: Q1(VORTEX)");
let start = SystemTime::now();

ctx.sql(tpch::query::Q1).await?.show().await?;

let elapsed = start.elapsed()?.as_millis();
println!("END VORTEX: {elapsed}ms");

Ok(())
}

#[tokio::main]
async fn main() {
// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

q1_csv(&data_dir).await.unwrap();
q1_arrow(&data_dir).await.unwrap();
q1_vortex(&data_dir).await.unwrap();
}
1 change: 1 addition & 0 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub mod parquet_utils;
pub mod public_bi_data;
pub mod reader;
pub mod taxi_data;
pub mod tpch;
pub mod vortex_utils;

lazy_static! {
Expand Down
Loading

0 comments on commit 9419059

Please sign in to comment.