Skip to content

Commit

Permalink
benchmarks, fixes found while doing benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jun 26, 2024
1 parent 36e714d commit 17dcb03
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 26 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ workspace = true

[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
bytes = { workspace = true }
bzip2 = { workspace = true }
csv = { workspace = true }
datafusion = { workspace = true }
enum-iterator = { workspace = true }
flexbuffers = { workspace = true }
futures = { workspace = true }
Expand All @@ -29,6 +31,7 @@ lazy_static = { workspace = true }
log = { workspace = true }
mimalloc = { workspace = true }
parquet = { workspace = true, features = [] }
rand = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
simplelog = { workspace = true }
Expand All @@ -37,6 +40,7 @@ uuid = { workspace = true, features = ["v4"] }
vortex-alp = { path = "../encodings/alp" }
vortex-array = { path = "../vortex-array" }
vortex-buffer = { path = "../vortex-buffer" }
vortex-datafusion = { path = "../vortex-datafusion" }
vortex-datetime-parts = { path = "../encodings/datetime-parts" }
vortex-dict = { path = "../encodings/dict" }
vortex-dtype = { path = "../vortex-dtype" }
Expand All @@ -56,3 +60,7 @@ harness = false
[[bench]]
name = "random_access"
harness = false

[[bench]]
name = "datafusion_benchmark"
harness = false
160 changes: 160 additions & 0 deletions bench-vortex/benches/datafusion_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use std::sync::Arc;

use arrow_array::builder::{StringBuilder, UInt32Builder};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::datasource::MemTable;
use datafusion::functions_aggregate::expr_fn::sum;
use datafusion::logical_expr::lit;
use datafusion::prelude::{col, SessionContext};
use lazy_static::lazy_static;
use vortex::compress::Compressor;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray, ToArrayData};
use vortex_datafusion::{VortexMemTable, VortexMemTableOptions};
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};

lazy_static! {
pub static ref CTX: Context = Context::default().with_encodings([
&BitPackedEncoding as EncodingRef,
// &DictEncoding,
&FoREncoding,
&DeltaEncoding,
]);
}

fn toy_dataset_arrow() -> RecordBatch {
// 64,000 rows of string and numeric data.
// 8,000 values of first string, second string, third string, etc.

let names = vec![
"Alexander",
"Anastasia",
"Archibald",
"Bartholomew",
"Benjamin",
"Christopher",
"Elizabeth",
"Gabriella",
];

let mut col1 = StringBuilder::with_capacity(64_000, 64_000_000);
let mut col2 = UInt32Builder::with_capacity(64_000);
for i in 0..64_000 {
col1.append_value(names[i % 8]);
col2.append_value(u32::try_from(i).unwrap());
}

let col1 = col1.finish();
let col2 = col2.finish();

RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("names", DataType::Utf8, false),
Field::new("scores", DataType::UInt32, false),
])),
vec![Arc::new(col1), Arc::new(col2)],
)
.unwrap()
}

fn toy_dataset_vortex() -> Array {
let uncompressed = toy_dataset_arrow().to_array_data().into_array();
println!("uncompressed vortex size: {}B", uncompressed.nbytes());

let compressor = Compressor::new(&CTX);
let compressed = compressor.compress(&uncompressed, None).unwrap();
println!("compressed vortex size: {} B", compressed.nbytes());
compressed
}

fn bench_datafusion(c: &mut Criterion) {
let mut group = c.benchmark_group("datafusion");

let session = SessionContext::new();

let arrow_dataset = toy_dataset_arrow();
let arrow_table =
Arc::new(MemTable::try_new(arrow_dataset.schema(), vec![vec![arrow_dataset]]).unwrap());

group.bench_function("arrow", |b| {
let arrow_table = arrow_table.clone();
b.to_async(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap(),
)
.iter(|| async {
black_box(session.read_table(arrow_table.clone()).unwrap())
.filter(col("scores").gt_eq(lit(3_000)))
.unwrap()
.filter(col("scores").lt_eq(lit(4_000)))
.unwrap()
.aggregate(vec![], vec![sum(col("scores"))])
.unwrap()
.collect()
.await
.unwrap();
})
});

let vortex_dataset = toy_dataset_vortex();
let vortex_table_pushdown = Arc::new(
VortexMemTable::try_new(vortex_dataset, VortexMemTableOptions::default()).unwrap(),
);
group.bench_function("vortex_pushdown", |b| {
let vortex_table_pushdown = vortex_table_pushdown.clone();
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter(|| async {
black_box(session.read_table(vortex_table_pushdown.clone()).unwrap())
.filter(col("scores").gt_eq(lit(3_000)))
.unwrap()
.filter(col("scores").lt_eq(lit(4_000)))
.unwrap()
.aggregate(vec![], vec![sum(col("scores"))])
.unwrap()
.collect()
.await
.unwrap();
})
});

let vortex_dataset = toy_dataset_vortex();
let vortex_table_no_pushdown = Arc::new(
VortexMemTable::try_new(
vortex_dataset,
VortexMemTableOptions::default().with_disable_pushdown(true),
)
.unwrap(),
);
group.bench_function("vortex_no_pushdown", |b| {
let vortex_table_no_pushdown = vortex_table_no_pushdown.clone();
b.to_async(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap(),
)
.iter(|| async {
black_box(
session
.read_table(vortex_table_no_pushdown.clone())
.unwrap(),
)
.filter(col("scores").gt_eq(lit(3_000)))
.unwrap()
.filter(col("scores").lt_eq(lit(4_000)))
.unwrap()
.aggregate(vec![], vec![sum(col("scores"))])
.unwrap()
.collect()
.await
.unwrap();
})
});
}

criterion_group!(benches, bench_datafusion);
criterion_main!(benches);
61 changes: 52 additions & 9 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct VortexMemTableOptions {
}

impl VortexMemTableOptions {
pub fn with_disable_pushdown(&mut self, disable_pushdown: bool) -> &mut Self {
pub fn with_disable_pushdown(mut self, disable_pushdown: bool) -> Self {
self.disable_pushdown = disable_pushdown;
self
}
Expand Down Expand Up @@ -83,7 +83,7 @@ impl SessionContextExt for SessionContext {
/// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as
/// a table to DataFusion.
#[derive(Debug, Clone)]
pub(crate) struct VortexMemTable {
pub struct VortexMemTable {
array: Array,
schema_ref: SchemaRef,
options: VortexMemTableOptions,
Expand Down Expand Up @@ -297,6 +297,8 @@ fn can_be_pushed_down(expr: &Expr, schema_columns: &HashSet<String>) -> DFResult
| Expr::IsFalse(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::Column(_)
| Expr::Literal(_)
// TODO(aduffy): ensure that cast can be pushed down.
| Expr::Cast(_) => true,
_ => false,
Expand Down Expand Up @@ -374,17 +376,20 @@ fn execute_unfiltered(
array: &Array,
projection: &Vec<usize>,
) -> DFResult<SendableRecordBatchStream> {
println!("EXECUTE_UNFILTERED");
// Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef.
let struct_array = array
.clone()
.into_struct()
.map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?;

println!("PROJECTION: {:?}", projection);
let projected_struct = struct_array
.project(projection.as_slice())
.map_err(|vortex_err| {
exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}")
})?;
println!("PROJECTED SCHEMA: {:?}", projected_struct.dtype());
let batch = RecordBatch::from(
projected_struct
.into_canonical()
Expand Down Expand Up @@ -463,6 +468,7 @@ impl ExecutionPlan for VortexScanExec {
partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
println!("EXECUTE VortexScanExec");
let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) {
chunked_array
.chunk(partition)
Expand All @@ -485,13 +491,12 @@ mod test {
use vortex::array::struct_::StructArray;
use vortex::array::varbin::VarBinArray;
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex::{Array, IntoArray};
use vortex_dtype::{DType, Nullability};

use crate::SessionContextExt;
use crate::{SessionContextExt, VortexMemTableOptions};

#[tokio::test]
async fn test_datafusion_simple() {
fn presidents_array() -> Array {
let names = VarBinArray::from_vec(
vec![
"Washington",
Expand All @@ -508,15 +513,53 @@ mod test {
Validity::NonNullable,
);

let presidents = StructArray::from_fields(&[
StructArray::from_fields(&[
("president", names.into_array()),
("term_start", term_start.into_array()),
])
.into_array();
.into_array()
}

#[tokio::test]
async fn test_datafusion_pushdown() {
let ctx = SessionContext::new();

let df = ctx.read_vortex(presidents_array()).unwrap();

let distinct_names = df
.filter(col("term_start").gt_eq(lit(1795)))
.unwrap()
.aggregate(vec![], vec![count_distinct(col("president"))])
.unwrap()
.collect()
.await
.unwrap();

assert_eq!(distinct_names.len(), 1);

assert_eq!(
*distinct_names[0]
.column(0)
.as_primitive::<Int64Type>()
.values()
.first()
.unwrap(),
4i64
);
}

#[tokio::test]
async fn test_datafusion_no_pushdown() {
let ctx = SessionContext::new();

let df = ctx.read_vortex(presidents).unwrap();
let df = ctx
.read_vortex_opts(
presidents_array(),
// Disable pushdown. We run this test to make sure that the naive codepath also
// produces correct results and does not panic anywhere.
VortexMemTableOptions::default().with_disable_pushdown(true),
)
.unwrap();

let distinct_names = df
.filter(col("term_start").gt_eq(lit(1795)))
Expand Down
Loading

0 comments on commit 17dcb03

Please sign in to comment.