From d56ef59ed9d33acc4191d3c52b8badd8531fc318 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 16 Jul 2024 17:43:09 +0100 Subject: [PATCH] Run tpch_benchmark queries single-threaded in rayon pool (#463) In #459 , we used an 8-thread pool and a single tokio runtime to run all of the queries in parallel. There was huge variability on the values returned by those benchmarks. This PR * Switches from using tokio global runtime for parallelism to using a Rayon pool * Each query executes in a current-thread tokio runtime, which should be a bit more apples-to-apples * Removed CSV measurements, and now have each query+format pair do 3 warmup runs, followed by 20 measured runs, then take the min over measurements * Fixed the coloration logic for table (oops) * A few small drive-bys --- bench-vortex/benches/tpch_benchmark.rs | 23 ++--- bench-vortex/src/bin/tpch_benchmark.rs | 132 +++++++++++++++++-------- vortex-datafusion/src/lib.rs | 4 + 3 files changed, 102 insertions(+), 57 deletions(-) diff --git a/bench-vortex/benches/tpch_benchmark.rs b/bench-vortex/benches/tpch_benchmark.rs index d49e76504a..20dd5224a3 100644 --- a/bench-vortex/benches/tpch_benchmark.rs +++ b/bench-vortex/benches/tpch_benchmark.rs @@ -9,11 +9,11 @@ fn benchmark(c: &mut Criterion) { // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); - let vortex_no_pushdown_ctx = runtime + let vortex_pushdown_disabled_ctx = runtime .block_on(load_datasets( &data_dir, Format::Vortex { - disable_pushdown: false, + disable_pushdown: true, }, )) .unwrap(); @@ -21,13 +21,10 @@ fn benchmark(c: &mut Criterion) { .block_on(load_datasets( &data_dir, Format::Vortex { - disable_pushdown: true, + disable_pushdown: false, }, )) .unwrap(); - let csv_ctx = runtime - .block_on(load_datasets(&data_dir, Format::Csv)) - .unwrap(); let arrow_ctx = runtime .block_on(load_datasets(&data_dir, Format::Arrow)) .unwrap(); @@ -35,6 +32,7 @@ fn benchmark(c: &mut Criterion) { for q in 1..=22 { if q == 15 { // DataFusion does not support query 15 since it has multiple SQL statements. + continue; } let query = bench_vortex::tpch::tpch_query(q); @@ -42,9 +40,9 @@ fn benchmark(c: &mut Criterion) { let mut group = c.benchmark_group(format!("tpch_q{q}")); group.sample_size(10); - group.bench_function("vortex-pushdown", |b| { + group.bench_function("vortex-pushdown-disabled", |b| { b.to_async(&runtime).iter(|| async { - vortex_ctx + vortex_pushdown_disabled_ctx .sql(&query) .await .unwrap() @@ -54,9 +52,9 @@ fn benchmark(c: &mut Criterion) { }) }); - group.bench_function("vortex-nopushdown", |b| { + group.bench_function("vortex-pushdown-enabled", |b| { b.to_async(&runtime).iter(|| async { - vortex_no_pushdown_ctx + vortex_ctx .sql(&query) .await .unwrap() @@ -66,11 +64,6 @@ fn benchmark(c: &mut Criterion) { }) }); - group.bench_function("csv", |b| { - b.to_async(&runtime) - .iter(|| async { csv_ctx.sql(&query).await.unwrap().collect().await.unwrap() }) - }); - group.bench_function("arrow", |b| { b.to_async(&runtime).iter(|| async { arrow_ctx diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index e250a100cc..b443bf6898 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -1,3 +1,4 @@ +use std::sync; use std::time::SystemTime; use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; @@ -18,7 +19,7 @@ async fn main() { // The formats to run against (vs the baseline) let formats = [ Format::Arrow, - Format::Csv, + // Format::Csv, Format::Vortex { disable_pushdown: false, }, @@ -40,57 +41,104 @@ async fn main() { // Set up a results table let mut table = Table::new(); - let mut cells = vec![Cell::new("Query")]; - cells.extend(formats.iter().map(|f| Cell::new(&format!("{:?}", f)))); - table.add_row(Row::new(cells)); + { + let mut cells = vec![Cell::new("Query")]; + cells.extend(formats.iter().map(|f| Cell::new(&format!("{:?}", f)))); + table.add_row(Row::new(cells)); + } // Setup a progress bar - let progress = ProgressBar::new(22 * formats.len() as u64); + let progress = ProgressBar::new(21 * formats.len() as u64); + // Send back a channel with the results of Row. + let (rows_tx, rows_rx) = sync::mpsc::channel(); for i in 1..=22 { - // Skip query 15 as it is not supported by DataFusion if i == 15 { continue; } + let _ctxs = ctxs.clone(); + let _tx = rows_tx.clone(); + let _progress = progress.clone(); + rayon::spawn_fifo(move || { + let query = tpch_query(i); + let mut cells = Vec::with_capacity(formats.len()); + cells.push(Cell::new(&format!("Q{}", i))); - let query = tpch_query(i); - let mut cells = Vec::with_capacity(formats.len()); - cells.push(Cell::new(&format!("Q{}", i))); - - let mut elapsed_us = Vec::new(); - for (ctx, format) in ctxs.iter().zip(formats.iter()) { - let start = SystemTime::now(); - ctx.sql(&query) - .await - .map_err(|e| println!("Failed to run {} {:?}: {}", i, format, e)) - .unwrap() - .collect() - .await - .map_err(|e| println!("Failed to collect {} {:?}: {}", i, format, e)) + let mut elapsed_us = Vec::new(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() .unwrap(); - let elapsed = start.elapsed().unwrap(); - elapsed_us.push(elapsed); - progress.inc(1); - } + for (ctx, format) in _ctxs.iter().zip(formats.iter()) { + for _ in 0..3 { + // warmup + rt.block_on(async { + ctx.sql(&query) + .await + .map_err(|e| println!("Failed to run {} {:?}: {}", i, format, e)) + .unwrap() + .collect() + .await + .map_err(|e| println!("Failed to collect {} {:?}: {}", i, format, e)) + .unwrap(); + }) + } + let mut measure = Vec::new(); + for _ in 0..20 { + let start = SystemTime::now(); + rt.block_on(async { + ctx.sql(&query) + .await + .map_err(|e| println!("Failed to run {} {:?}: {}", i, format, e)) + .unwrap() + .collect() + .await + .map_err(|e| println!("Failed to collect {} {:?}: {}", i, format, e)) + .unwrap(); + }); + let elapsed = start.elapsed().unwrap(); + measure.push(elapsed); + } + let fastest = measure.iter().cloned().min().unwrap(); + elapsed_us.push(fastest); - let baseline = elapsed_us.first().unwrap(); - // yellow: 10% slower than baseline - let yellow = baseline.as_micros() + (baseline.as_micros() / 10); - // red: 50% slower than baseline - let red = baseline.as_micros() + (baseline.as_micros() / 50); - cells.push(Cell::new(&format!("{} us", baseline.as_micros())).style_spec("b")); - for measure in elapsed_us.iter().skip(1) { - let style_spec = if measure.as_micros() > red { - "bBr" - } else if measure.as_micros() > yellow { - "bFdBy" - } else { - "bFdBG" - }; - cells.push(Cell::new(&format!("{} us", measure.as_micros())).style_spec(style_spec)); - } - table.add_row(Row::new(cells)); + _progress.inc(1); + } + + let baseline = elapsed_us.first().unwrap(); + // yellow: 10% slower than baseline + let yellow = baseline.as_micros() + (baseline.as_micros() / 10); + // red: 50% slower than baseline + let red = baseline.as_micros() + (baseline.as_micros() / 2); + cells.push(Cell::new(&format!("{} us", baseline.as_micros())).style_spec("b")); + for measure in elapsed_us.iter().skip(1) { + let style_spec = if measure.as_micros() > red { + "bBr" + } else if measure.as_micros() > yellow { + "bFdBy" + } else { + "bFdBG" + }; + cells + .push(Cell::new(&format!("{} us", measure.as_micros())).style_spec(style_spec)); + } + + _tx.send((i, Row::new(cells))).unwrap(); + }); + } + + // delete parent handle to tx + drop(rows_tx); + + let mut rows = vec![]; + while let Ok((idx, row)) = rows_rx.recv() { + rows.push((idx, row)); + } + rows.sort_by(|(idx0, _), (idx1, _)| idx0.cmp(idx1)); + for (_, row) in rows { + table.add_row(row); } - progress.clone().finish(); + + progress.finish(); table.printstd(); } diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 9a6aeb0007..49ea4e404a 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -446,6 +446,10 @@ impl Stream for VortexRecordBatchStream { Poll::Ready(Some(Ok(batch))) } + + fn size_hint(&self) -> (usize, Option) { + (self.num_chunks, Some(self.num_chunks)) + } } impl RecordBatchStream for VortexRecordBatchStream {