Skip to content

Commit

Permalink
Assert expected row count in tpch_benchmark binary (#620)
Browse files Browse the repository at this point in the history
Will add similar logic in the benchmark in follow up
  • Loading branch information
robert3005 authored Aug 14, 2024
1 parent 664ffb6 commit b650874
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 23 deletions.
54 changes: 46 additions & 8 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![allow(clippy::use_debug)]

use std::collections::HashMap;
use std::process::ExitCode;
use std::sync;
use std::time::SystemTime;

use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::tpch::{load_datasets, tpch_queries, Format};
use bench_vortex::tpch::{load_datasets, tpch_queries, Format, EXPECTED_ROW_COUNTS};
use clap::Parser;
use futures::future::try_join_all;
use indicatif::ProgressBar;
use itertools::Itertools;
use prettytable::{Cell, Row, Table};

#[derive(Parser, Debug)]
Expand All @@ -19,7 +22,7 @@ struct Args {
threads: Option<usize>,
}

fn main() {
fn main() -> ExitCode {
let args = Args::parse();

let runtime = match args.threads {
Expand All @@ -37,10 +40,10 @@ fn main() {
}
.expect("Failed building the Runtime");

runtime.block_on(bench_main(args.queries));
runtime.block_on(bench_main(args.queries))
}

async fn bench_main(queries: Option<Vec<usize>>) {
async fn bench_main(queries: Option<Vec<usize>>) -> ExitCode {
// uncomment the below to enable trace logging of datafusion execution
// setup_logger(LevelFilter::Trace);

Expand Down Expand Up @@ -82,6 +85,7 @@ async fn bench_main(queries: Option<Vec<usize>>) {

// Send back a channel with the results of Row.
let (rows_tx, rows_rx) = sync::mpsc::channel();
let (row_count_tx, row_count_rx) = sync::mpsc::channel();
for (q, query) in tpch_queries() {
if let Some(queries) = queries.as_ref() {
if !queries.contains(&q) {
Expand All @@ -90,6 +94,7 @@ async fn bench_main(queries: Option<Vec<usize>>) {
}
let ctxs = ctxs.clone();
let tx = rows_tx.clone();
let count_tx = row_count_tx.clone();
let progress = progress.clone();
rayon::spawn_fifo(move || {
let mut cells = Vec::with_capacity(formats.len());
Expand All @@ -101,18 +106,24 @@ async fn bench_main(queries: Option<Vec<usize>>) {
.build()
.unwrap();
for (ctx, format) in ctxs.iter().zip(formats.iter()) {
for _ in 0..3 {
for i in 0..3 {
// warmup
rt.block_on(async {
let row_count: usize = rt.block_on(async {
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e))
.unwrap()
.collect()
.await
.map_err(|e| println!("Failed to collect {} {:?}: {}", q, format, e))
.unwrap();
})
.unwrap()
.iter()
.map(|r| r.num_rows())
.sum()
});
if i == 0 {
count_tx.send((q, *format, row_count)).unwrap();
}
}
let mut measure = Vec::new();
for _ in 0..10 {
Expand Down Expand Up @@ -166,6 +177,28 @@ async fn bench_main(queries: Option<Vec<usize>>) {

// delete parent handle to tx
drop(rows_tx);
drop(row_count_tx);

let mut format_row_counts: HashMap<Format, Vec<usize>> = HashMap::new();
while let Ok((idx, format, row_count)) = row_count_rx.recv() {
format_row_counts
.entry(format)
.or_insert_with(|| vec![0; EXPECTED_ROW_COUNTS.len()])[idx] = row_count;
}

let mut mismatched = false;
for (format, row_counts) in format_row_counts {
row_counts
.into_iter()
.zip_eq(EXPECTED_ROW_COUNTS)
.enumerate()
.for_each(|(idx, (row_count, expected_row_count))| {
if row_count != expected_row_count {
println!("Mismatched row count {row_count} instead of {expected_row_count} in query {idx} for format {format:?}");
mismatched = true;
}
})
}

let mut rows = vec![];
while let Ok((idx, row)) = rows_rx.recv() {
Expand All @@ -178,4 +211,9 @@ async fn bench_main(queries: Option<Vec<usize>>) {

progress.finish();
table.printstd();
if mismatched {
ExitCode::FAILURE
} else {
ExitCode::SUCCESS
}
}
8 changes: 6 additions & 2 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ use crate::idempotent_async;
pub mod dbgen;
pub mod schema;

#[derive(Clone, Copy, Debug)]
pub const EXPECTED_ROW_COUNTS: [usize; 23] = [
0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 0, 18314, 1, 57, 1, 186, 411, 7,
];

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum Format {
Csv,
Arrow,
Expand Down Expand Up @@ -362,7 +366,7 @@ pub fn tpch_queries() -> impl Iterator<Item = (usize, String)> {
.map(|q| (q, tpch_query(q)))
}

pub fn tpch_query(query_idx: usize) -> String {
fn tpch_query(query_idx: usize) -> String {
let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tpch")
.join(format!("q{}.sql", query_idx));
Expand Down
7 changes: 7 additions & 0 deletions encodings/runend/src/runend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ impl RunEndArray {
);
}

if offset != 0 && !ends.is_empty() {
let first_run_end: usize = scalar_at(&ends, 0)?.as_ref().try_into()?;
if first_run_end <= offset {
vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
}
}

if !ends.statistics().compute_is_strict_sorted().unwrap_or(true) {
vortex_bail!("Ends array must be strictly sorted",);
}
Expand Down
7 changes: 6 additions & 1 deletion vortex-datafusion/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code)]

use std::fmt::Debug;
use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};
Expand Down Expand Up @@ -46,18 +47,21 @@ pub(crate) fn simplify_expr(expr: &Expr, schema: SchemaRef) -> DFResult<Expr> {
simplifier.simplify(expr.clone())
}

pub trait VortexPhysicalExpr: Send + Sync {
pub trait VortexPhysicalExpr: Debug + Send + Sync {
fn evaluate(&self, array: &Array) -> VortexResult<Array>;
}

#[derive(Debug)]
pub struct NoOp;

#[derive(Debug)]
pub struct BinaryExpr {
left: Arc<dyn VortexPhysicalExpr>,
right: Arc<dyn VortexPhysicalExpr>,
operator: DFOperator,
}

#[derive(Debug)]
pub struct Column {
name: String,
index: usize,
Expand All @@ -76,6 +80,7 @@ impl VortexPhysicalExpr for Column {
}
}

#[derive(Debug)]
pub struct Literal {
scalar_value: Scalar,
}
Expand Down
1 change: 0 additions & 1 deletion vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const SUPPORTED_BINARY_OPS: &[Operator] = &[

fn supported_data_types(dt: DataType) -> bool {
dt.is_integer()
|| dt.is_signed_integer()
|| dt.is_floating()
|| dt.is_null()
|| dt == DataType::Boolean
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl FileOpener for VortexFileOpener {
array
};

VortexResult::Ok(RecordBatch::from(array))
Ok(RecordBatch::from(array))
}
})
.map_err(|e| e.into());
Expand Down
14 changes: 4 additions & 10 deletions vortex-serde/src/layouts/reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,10 @@ impl<R: VortexReadAt> VortexLayoutReaderBuilder<R> {
vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
}

let footer_offset = u64::from_le_bytes(
buf[magic_bytes_loc - 8..magic_bytes_loc]
.try_into()
.unwrap(),
);
let schema_offset = u64::from_le_bytes(
buf[magic_bytes_loc - 16..magic_bytes_loc - 8]
.try_into()
.unwrap(),
);
let footer_offset =
u64::from_le_bytes(buf[magic_bytes_loc - 8..magic_bytes_loc].try_into()?);
let schema_offset =
u64::from_le_bytes(buf[magic_bytes_loc - 16..magic_bytes_loc - 8].try_into()?);

Ok(Footer {
schema_offset,
Expand Down

0 comments on commit b650874

Please sign in to comment.