Skip to content

Commit

Permalink
Merge branch 'develop' into runend-primitive-move
Browse files Browse the repository at this point in the history
# Conflicts:
#	encodings/runend/src/default/compute.rs
  • Loading branch information
joseph-isaacs committed Jul 17, 2024
2 parents 34de230 + 4996820 commit cc86a3e
Show file tree
Hide file tree
Showing 26 changed files with 240 additions and 268 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ jobs:
run: cargo fmt --all --check
- name: Rust Lint - Clippy
run: cargo clippy --all-features --all-targets
- name: Docs
run: cargo doc --no-deps
- name: Rust Test
run: cargo test --workspace --all-features

Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 24 additions & 23 deletions bench-vortex/benches/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::tpch::{load_datasets, Format};
use bench_vortex::tpch::{load_datasets, tpch_queries, Format};
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::runtime::Builder;

Expand All @@ -9,42 +9,36 @@ fn benchmark(c: &mut Criterion) {
// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

let vortex_no_pushdown_ctx = runtime
let vortex_pushdown_disabled_ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::Vortex {
disable_pushdown: false,
disable_pushdown: true,
},
))
.unwrap();
let vortex_ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::Vortex {
disable_pushdown: true,
disable_pushdown: false,
},
))
.unwrap();
let csv_ctx = runtime
.block_on(load_datasets(&data_dir, Format::Csv))
.unwrap();
let arrow_ctx = runtime
.block_on(load_datasets(&data_dir, Format::Arrow))
.unwrap();
let parquet_ctx = runtime
.block_on(load_datasets(&data_dir, Format::Parquet))
.unwrap();

for q in 1..=22 {
if q == 15 {
// DataFusion does not support query 15 since it has multiple SQL statements.
}

let query = bench_vortex::tpch::tpch_query(q);

for (q, query) in tpch_queries() {
let mut group = c.benchmark_group(format!("tpch_q{q}"));
group.sample_size(10);

group.bench_function("vortex-pushdown", |b| {
group.bench_function("vortex-pushdown-disabled", |b| {
b.to_async(&runtime).iter(|| async {
vortex_ctx
vortex_pushdown_disabled_ctx
.sql(&query)
.await
.unwrap()
Expand All @@ -54,9 +48,9 @@ fn benchmark(c: &mut Criterion) {
})
});

group.bench_function("vortex-nopushdown", |b| {
group.bench_function("vortex-pushdown-enabled", |b| {
b.to_async(&runtime).iter(|| async {
vortex_no_pushdown_ctx
vortex_ctx
.sql(&query)
.await
.unwrap()
Expand All @@ -66,11 +60,6 @@ fn benchmark(c: &mut Criterion) {
})
});

group.bench_function("csv", |b| {
b.to_async(&runtime)
.iter(|| async { csv_ctx.sql(&query).await.unwrap().collect().await.unwrap() })
});

group.bench_function("arrow", |b| {
b.to_async(&runtime).iter(|| async {
arrow_ctx
Expand All @@ -82,6 +71,18 @@ fn benchmark(c: &mut Criterion) {
.unwrap()
})
});

group.bench_function("parquet", |b| {
b.to_async(&runtime).iter(|| async {
parquet_ctx
.sql(&query)
.await
.unwrap()
.collect()
.await
.unwrap()
})
});
}
}

Expand Down
144 changes: 97 additions & 47 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync;
use std::time::SystemTime;

use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::tpch::{load_datasets, tpch_query, Format};
use bench_vortex::tpch::{load_datasets, tpch_queries, Format};
use futures::future::join_all;
use indicatif::ProgressBar;
use itertools::Itertools;
Expand All @@ -18,7 +19,7 @@ async fn main() {
// The formats to run against (vs the baseline)
let formats = [
Format::Arrow,
Format::Csv,
Format::Parquet,
Format::Vortex {
disable_pushdown: false,
},
Expand All @@ -40,57 +41,106 @@ async fn main() {

// Set up a results table
let mut table = Table::new();
let mut cells = vec![Cell::new("Query")];
cells.extend(formats.iter().map(|f| Cell::new(&format!("{:?}", f))));
table.add_row(Row::new(cells));
{
let mut cells = vec![Cell::new("Query")];
cells.extend(formats.iter().map(|f| Cell::new(&format!("{:?}", f))));
table.add_row(Row::new(cells));
}

// Setup a progress bar
let progress = ProgressBar::new(22 * formats.len() as u64);

for i in 1..=22 {
// Skip query 15 as it is not supported by DataFusion
if i == 15 {
continue;
}
let progress = ProgressBar::new(21 * formats.len() as u64);

let query = tpch_query(i);
let mut cells = Vec::with_capacity(formats.len());
cells.push(Cell::new(&format!("Q{}", i)));
// Send back a channel with the results of Row.
let (rows_tx, rows_rx) = sync::mpsc::channel();
for (q, query) in tpch_queries() {
let _ctxs = ctxs.clone();
let _tx = rows_tx.clone();
let _progress = progress.clone();
rayon::spawn_fifo(move || {
let mut cells = Vec::with_capacity(formats.len());
cells.push(Cell::new(&format!("Q{}", q)));

let mut elapsed_us = Vec::new();
for (ctx, format) in ctxs.iter().zip(formats.iter()) {
let start = SystemTime::now();
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", i, format, e))
.unwrap()
.collect()
.await
.map_err(|e| println!("Failed to collect {} {:?}: {}", i, format, e))
let mut elapsed_us = Vec::new();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let elapsed = start.elapsed().unwrap();
elapsed_us.push(elapsed);
progress.inc(1);
}
for (ctx, format) in _ctxs.iter().zip(formats.iter()) {
for _ in 0..3 {
// warmup
rt.block_on(async {
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e))
.unwrap()
.collect()
.await
.map_err(|e| println!("Failed to collect {} {:?}: {}", q, format, e))
.unwrap();
})
}
let mut measure = Vec::new();
for _ in 0..10 {
let start = SystemTime::now();
rt.block_on(async {
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e))
.unwrap()
.collect()
.await
.map_err(|e| println!("Failed to collect {} {:?}: {}", q, format, e))
.unwrap();
});
let elapsed = start.elapsed().unwrap();
measure.push(elapsed);
}
let fastest = measure.iter().cloned().min().unwrap();
elapsed_us.push(fastest);

let baseline = elapsed_us.first().unwrap();
// yellow: 10% slower than baseline
let yellow = baseline.as_micros() + (baseline.as_micros() / 10);
// red: 50% slower than baseline
let red = baseline.as_micros() + (baseline.as_micros() / 50);
cells.push(Cell::new(&format!("{} us", baseline.as_micros())).style_spec("b"));
for measure in elapsed_us.iter().skip(1) {
let style_spec = if measure.as_micros() > red {
"bBr"
} else if measure.as_micros() > yellow {
"bFdBy"
} else {
"bFdBG"
};
cells.push(Cell::new(&format!("{} us", measure.as_micros())).style_spec(style_spec));
}
table.add_row(Row::new(cells));
_progress.inc(1);
}

let baseline = elapsed_us.first().unwrap();
// yellow: 10% slower than baseline
let yellow = baseline.as_micros() + (baseline.as_micros() / 10);
// red: 50% slower than baseline
let red = baseline.as_micros() + (baseline.as_micros() / 2);
cells.push(Cell::new(&format!("{} us", baseline.as_micros())).style_spec("b"));
for measure in elapsed_us.iter().skip(1) {
let style_spec = if measure.as_micros() > red {
"bBr"
} else if measure.as_micros() > yellow {
"bFdBy"
} else {
"bFdBG"
};
cells.push(
Cell::new(&format!(
"{} us ({:.2})",
measure.as_micros(),
measure.as_micros() as f64 / baseline.as_micros() as f64
))
.style_spec(style_spec),
);
}

_tx.send((q, Row::new(cells))).unwrap();
});
}
progress.clone().finish();

// delete parent handle to tx
drop(rows_tx);

let mut rows = vec![];
while let Ok((idx, row)) = rows_rx.recv() {
rows.push((idx, row));
}
rows.sort_by(|(idx0, _), (idx1, _)| idx0.cmp(idx1));
for (_, row) in rows {
table.add_row(row);
}

progress.finish();
table.printstd();
}
59 changes: 58 additions & 1 deletion bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@ use std::sync::Arc;

use arrow_array::StructArray;
use arrow_schema::Schema;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use futures::executor::block_on;
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray};
use vortex_datafusion::{SessionContextExt, VortexMemTableOptions};

use crate::idempotent;

pub mod dbgen;
pub mod schema;

#[derive(Clone, Copy, Debug)]
pub enum Format {
Csv,
Arrow,
Parquet,
Vortex { disable_pushdown: bool },
}

Expand All @@ -43,6 +48,9 @@ pub async fn load_datasets<P: AsRef<Path>>(
match format {
Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await,
Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await,
Format::Parquet => {
register_parquet(&context, stringify!($name), &$name, $schema).await
}
Format::Vortex {
disable_pushdown, ..
} => {
Expand Down Expand Up @@ -118,6 +126,46 @@ async fn register_arrow(
Ok(())
}

async fn register_parquet(
session: &SessionContext,
name: &str,
file: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
// Idempotent conversion from TPCH CSV to Parquet.
let pq_file = idempotent(
&file.with_extension("").with_extension("parquet"),
|pq_file| {
let df = block_on(
session.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
),
)
.unwrap();

block_on(df.write_parquet(
pq_file.as_os_str().to_str().unwrap(),
DataFrameWriteOptions::default(),
None,
))
},
)
.unwrap();

Ok(session
.register_parquet(
name,
pq_file.as_os_str().to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?)
}

async fn register_vortex(
session: &SessionContext,
name: &str,
Expand Down Expand Up @@ -158,6 +206,15 @@ async fn register_vortex(
Ok(())
}

pub fn tpch_queries() -> impl Iterator<Item = (usize, String)> {
(1..=22)
.filter(|q| {
// Query 15 has multiple SQL statements so doesn't yet run in DataFusion.
*q != 15
})
.map(|q| (q, tpch_query(q)))
}

pub fn tpch_query(query_idx: usize) -> String {
let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tpch")
Expand Down
Loading

0 comments on commit cc86a3e

Please sign in to comment.