Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup TPC-H benchmark infra #444

Merged
merged 9 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,6 @@ benchmarks/.out
# Scratch
*.txt
*.vortex

# TPC-H benchmarking data
data/
25 changes: 21 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ rust-version = { workspace = true }
workspace = true

[dependencies]
anyhow = "1.0"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
Expand Down Expand Up @@ -51,6 +52,7 @@ vortex-ipc = { path = "../vortex-ipc", features = ["object_store"] }
vortex-roaring = { path = "../encodings/roaring" }
vortex-runend = { path = "../encodings/runend" }
vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }
xshell = "0.2.6"

[dev-dependencies]
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }
Expand Down
21 changes: 17 additions & 4 deletions bench-vortex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,22 @@ There are a number of benchmarks in this repository that can be run using the `c
or less how you'd expect.

There are also some binaries that are not run by default, but produce some reporting artifacts that can be useful for comparing vortex compression to parquet and debugging vortex compression performance. These are:
* _compress.rs_
* This binary compresses a file using vortex compression and writes the compressed file to disk where it can be examined or used for other operations.
* _comparison.rs_
* This binary compresses a dataset using vortex compression and parquet, taking some stats on the compression performance of each run, and writes out these stats to a csv.

### `compress.rs`

This binary compresses a file using vortex compression and writes the compressed file to disk where it can be examined or used for other operations.

### `comparison.rs`

This binary compresses a dataset using vortex compression and parquet, taking some stats on the compression performance of each run, and writes out these stats to a csv.
* This csv can then be loaded into duckdb and analyzed with the included comparison.sql script.

### `tpch_benchmark.rs`

This binary will run TPC-H query 1 using DataFusion, comparing the Vortex in-memory provider against Arrow and CSV.

For profiling, you can open in Instruments using the following invocation:

```
cargo instruments -p bench-vortex --bin tpch_benchmark --template Time --profile bench
```
11 changes: 4 additions & 7 deletions bench-vortex/benches/datafusion_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,10 @@ fn bench_vortex<M: Measurement>(
compress: bool,
) {
let vortex_dataset = toy_dataset_vortex(compress);
let vortex_table = Arc::new(
VortexMemTable::try_new(
vortex_dataset,
VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown),
)
.unwrap(),
);
let vortex_table = Arc::new(VortexMemTable::new(
vortex_dataset,
VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown),
));

measure_provider(&mut group, session, vortex_table);
}
Expand Down
199 changes: 199 additions & 0 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#![allow(dead_code)]

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;

use arrow_array::StructArray;
use arrow_schema::Schema;
use bench_vortex::tpch;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical};
use vortex_datafusion::SessionContextExt;

enum Format {
Csv,
Arrow,
VortexUncompressed,
}

// Generate table dataset.
async fn load_datasets<P: AsRef<Path>>(
base_dir: P,
format: Format,
) -> anyhow::Result<SessionContext> {
let context = SessionContext::new();
let base_dir = base_dir.as_ref();

let customer = base_dir.join("customer.tbl");
let lineitem = base_dir.join("lineitem.tbl");
let nation = base_dir.join("nation.tbl");
let orders = base_dir.join("orders.tbl");
let part = base_dir.join("part.tbl");
let partsupp = base_dir.join("partsupp.tbl");
let region = base_dir.join("region.tbl");
let supplier = base_dir.join("supplier.tbl");

macro_rules! register_table {
($name:ident, $schema:expr) => {
match format {
Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await,
Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await,
Format::VortexUncompressed => {
register_vortex(&context, stringify!($name), &$name, $schema).await
}
}
};
}

register_table!(customer, &tpch::schema::CUSTOMER)?;
register_table!(lineitem, &tpch::schema::LINEITEM)?;
register_table!(nation, &tpch::schema::NATION)?;
register_table!(orders, &tpch::schema::ORDERS)?;
register_table!(part, &tpch::schema::PART)?;
register_table!(partsupp, &tpch::schema::PARTSUPP)?;
register_table!(region, &tpch::schema::REGION)?;
register_table!(supplier, &tpch::schema::SUPPLIER)?;

Ok(context)
}

async fn register_csv(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
session
.register_csv(
name,
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?;

Ok(())
}

async fn register_arrow(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
// Read CSV file into a set of Arrow RecordBatch.
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

let mem_table = MemTable::try_new(Arc::new(schema.clone()), vec![record_batches])?;
session.register_table(name, Arc::new(mem_table))?;

Ok(())
}

async fn register_vortex(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
// TODO(aduffy): add compression option
) -> anyhow::Result<()> {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

// Create a ChunkedArray from the set of chunks.
let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(StructArray::from)
.map(|struct_array| ArrayData::from_arrow(&struct_array, false).into_array())
.collect();

let dtype = chunks[0].dtype().clone();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?
.into_canonical()?
.into_array();

session.register_vortex(name, chunked_array)?;

Ok(())
}

async fn q1_csv(base_dir: &PathBuf) -> anyhow::Result<()> {
let ctx = load_datasets(base_dir, Format::Csv).await?;

println!("BEGIN: Q1(CSV)");

let start = SystemTime::now();
ctx.sql(tpch::query::Q1).await?.show().await?;
let elapsed = start.elapsed()?.as_millis();
println!("END CSV: {elapsed}ms");

Ok(())
}

async fn q1_arrow(base_dir: &PathBuf) -> anyhow::Result<()> {
let ctx = load_datasets(base_dir, Format::Arrow).await?;

println!("BEGIN: Q1(ARROW)");
let start = SystemTime::now();

ctx.sql(tpch::query::Q1).await?.show().await?;
let elapsed = start.elapsed()?.as_millis();

println!("END ARROW: {elapsed}ms");

Ok(())
}

async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> {
let ctx = load_datasets(base_dir, Format::VortexUncompressed).await?;

println!("BEGIN: Q1(VORTEX)");
let start = SystemTime::now();

ctx.sql(tpch::query::Q1).await?.show().await?;

let elapsed = start.elapsed()?.as_millis();
println!("END VORTEX: {elapsed}ms");

Ok(())
}

#[tokio::main]
async fn main() {
// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

q1_csv(&data_dir).await.unwrap();
q1_arrow(&data_dir).await.unwrap();
q1_vortex(&data_dir).await.unwrap();
}
1 change: 1 addition & 0 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub mod parquet_utils;
pub mod public_bi_data;
pub mod reader;
pub mod taxi_data;
pub mod tpch;
pub mod vortex_utils;

lazy_static! {
Expand Down
Loading
Loading