Skip to content

Commit

Permalink
Run tpch_benchmark queries single-threaded in rayon pool (#463)
Browse files Browse the repository at this point in the history
In #459 , we used an 8-thread pool and a single tokio runtime to run all
of the queries in parallel.

There was huge variability on the values returned by those benchmarks.
This PR

* Switches from using tokio global runtime for parallelism to using a
Rayon pool
* Each query executes in a current-thread tokio runtime, which should be
a bit more apples-to-apples
* Removed CSV measurements, and now have each query+format pair do 3
warmup runs, followed by 20 measured runs, then take the min over
measurements
* Fixed the coloration logic for table (oops)
* A few small drive-bys
  • Loading branch information
a10y authored Jul 16, 2024
1 parent d4558a1 commit d56ef59
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 57 deletions.
23 changes: 8 additions & 15 deletions bench-vortex/benches/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,40 @@ fn benchmark(c: &mut Criterion) {
// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

let vortex_no_pushdown_ctx = runtime
let vortex_pushdown_disabled_ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::Vortex {
disable_pushdown: false,
disable_pushdown: true,
},
))
.unwrap();
let vortex_ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::Vortex {
disable_pushdown: true,
disable_pushdown: false,
},
))
.unwrap();
let csv_ctx = runtime
.block_on(load_datasets(&data_dir, Format::Csv))
.unwrap();
let arrow_ctx = runtime
.block_on(load_datasets(&data_dir, Format::Arrow))
.unwrap();

for q in 1..=22 {
if q == 15 {
// DataFusion does not support query 15 since it has multiple SQL statements.
continue;
}

let query = bench_vortex::tpch::tpch_query(q);

let mut group = c.benchmark_group(format!("tpch_q{q}"));
group.sample_size(10);

group.bench_function("vortex-pushdown", |b| {
group.bench_function("vortex-pushdown-disabled", |b| {
b.to_async(&runtime).iter(|| async {
vortex_ctx
vortex_pushdown_disabled_ctx
.sql(&query)
.await
.unwrap()
Expand All @@ -54,9 +52,9 @@ fn benchmark(c: &mut Criterion) {
})
});

group.bench_function("vortex-nopushdown", |b| {
group.bench_function("vortex-pushdown-enabled", |b| {
b.to_async(&runtime).iter(|| async {
vortex_no_pushdown_ctx
vortex_ctx
.sql(&query)
.await
.unwrap()
Expand All @@ -66,11 +64,6 @@ fn benchmark(c: &mut Criterion) {
})
});

group.bench_function("csv", |b| {
b.to_async(&runtime)
.iter(|| async { csv_ctx.sql(&query).await.unwrap().collect().await.unwrap() })
});

group.bench_function("arrow", |b| {
b.to_async(&runtime).iter(|| async {
arrow_ctx
Expand Down
132 changes: 90 additions & 42 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync;
use std::time::SystemTime;

use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
Expand All @@ -18,7 +19,7 @@ async fn main() {
// The formats to run against (vs the baseline)
let formats = [
Format::Arrow,
Format::Csv,
// Format::Csv,
Format::Vortex {
disable_pushdown: false,
},
Expand All @@ -40,57 +41,104 @@ async fn main() {

// Set up a results table
let mut table = Table::new();
let mut cells = vec![Cell::new("Query")];
cells.extend(formats.iter().map(|f| Cell::new(&format!("{:?}", f))));
table.add_row(Row::new(cells));
{
let mut cells = vec![Cell::new("Query")];
cells.extend(formats.iter().map(|f| Cell::new(&format!("{:?}", f))));
table.add_row(Row::new(cells));
}

// Setup a progress bar
let progress = ProgressBar::new(22 * formats.len() as u64);
let progress = ProgressBar::new(21 * formats.len() as u64);

// Send back a channel with the results of Row.
let (rows_tx, rows_rx) = sync::mpsc::channel();
for i in 1..=22 {
// Skip query 15 as it is not supported by DataFusion
if i == 15 {
continue;
}
let _ctxs = ctxs.clone();
let _tx = rows_tx.clone();
let _progress = progress.clone();
rayon::spawn_fifo(move || {
let query = tpch_query(i);
let mut cells = Vec::with_capacity(formats.len());
cells.push(Cell::new(&format!("Q{}", i)));

let query = tpch_query(i);
let mut cells = Vec::with_capacity(formats.len());
cells.push(Cell::new(&format!("Q{}", i)));

let mut elapsed_us = Vec::new();
for (ctx, format) in ctxs.iter().zip(formats.iter()) {
let start = SystemTime::now();
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", i, format, e))
.unwrap()
.collect()
.await
.map_err(|e| println!("Failed to collect {} {:?}: {}", i, format, e))
let mut elapsed_us = Vec::new();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let elapsed = start.elapsed().unwrap();
elapsed_us.push(elapsed);
progress.inc(1);
}
for (ctx, format) in _ctxs.iter().zip(formats.iter()) {
for _ in 0..3 {
// warmup
rt.block_on(async {
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", i, format, e))
.unwrap()
.collect()
.await
.map_err(|e| println!("Failed to collect {} {:?}: {}", i, format, e))
.unwrap();
})
}
let mut measure = Vec::new();
for _ in 0..20 {
let start = SystemTime::now();
rt.block_on(async {
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", i, format, e))
.unwrap()
.collect()
.await
.map_err(|e| println!("Failed to collect {} {:?}: {}", i, format, e))
.unwrap();
});
let elapsed = start.elapsed().unwrap();
measure.push(elapsed);
}
let fastest = measure.iter().cloned().min().unwrap();
elapsed_us.push(fastest);

let baseline = elapsed_us.first().unwrap();
// yellow: 10% slower than baseline
let yellow = baseline.as_micros() + (baseline.as_micros() / 10);
// red: 50% slower than baseline
let red = baseline.as_micros() + (baseline.as_micros() / 50);
cells.push(Cell::new(&format!("{} us", baseline.as_micros())).style_spec("b"));
for measure in elapsed_us.iter().skip(1) {
let style_spec = if measure.as_micros() > red {
"bBr"
} else if measure.as_micros() > yellow {
"bFdBy"
} else {
"bFdBG"
};
cells.push(Cell::new(&format!("{} us", measure.as_micros())).style_spec(style_spec));
}
table.add_row(Row::new(cells));
_progress.inc(1);
}

let baseline = elapsed_us.first().unwrap();
// yellow: 10% slower than baseline
let yellow = baseline.as_micros() + (baseline.as_micros() / 10);
// red: 50% slower than baseline
let red = baseline.as_micros() + (baseline.as_micros() / 2);
cells.push(Cell::new(&format!("{} us", baseline.as_micros())).style_spec("b"));
for measure in elapsed_us.iter().skip(1) {
let style_spec = if measure.as_micros() > red {
"bBr"
} else if measure.as_micros() > yellow {
"bFdBy"
} else {
"bFdBG"
};
cells
.push(Cell::new(&format!("{} us", measure.as_micros())).style_spec(style_spec));
}

_tx.send((i, Row::new(cells))).unwrap();
});
}

// delete parent handle to tx
drop(rows_tx);

let mut rows = vec![];
while let Ok((idx, row)) = rows_rx.recv() {
rows.push((idx, row));
}
rows.sort_by(|(idx0, _), (idx1, _)| idx0.cmp(idx1));
for (_, row) in rows {
table.add_row(row);
}
progress.clone().finish();

progress.finish();
table.printstd();
}
4 changes: 4 additions & 0 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,10 @@ impl Stream for VortexRecordBatchStream {

Poll::Ready(Some(Ok(batch)))
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.num_chunks, Some(self.num_chunks))
}
}

impl RecordBatchStream for VortexRecordBatchStream {
Expand Down

0 comments on commit d56ef59

Please sign in to comment.