Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make into_arrow truly zero-copy, rewrite DataFusion operators #451

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions bench-vortex/benches/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::tpch::query::Q1;
use bench_vortex::tpch::{load_datasets, Format};
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::runtime::Runtime;
use tokio::runtime::Builder;

fn benchmark(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let runtime = Builder::new_current_thread().enable_all().build().unwrap();

// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();
Expand All @@ -14,9 +14,27 @@ fn benchmark(c: &mut Criterion) {
group.sample_size(10);

let ctx = runtime
.block_on(load_datasets(&data_dir, Format::VortexUncompressed))
.block_on(load_datasets(
&data_dir,
Format::Vortex {
disable_pushdown: false,
},
))
.unwrap();
group.bench_function("vortex", |b| {
group.bench_function("vortex-pushdown", |b| {
b.to_async(&runtime)
.iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() })
});

let ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::Vortex {
disable_pushdown: true,
},
))
.unwrap();
group.bench_function("vortex-nopushdown", |b| {
b.to_async(&runtime)
.iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() })
});
Expand Down
14 changes: 12 additions & 2 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(dead_code)]
use std::path::PathBuf;
use std::time::SystemTime;

Expand Down Expand Up @@ -33,7 +34,13 @@ async fn q1_arrow(base_dir: &PathBuf) -> anyhow::Result<()> {
}

async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> {
let ctx = load_datasets(base_dir, Format::VortexUncompressed).await?;
let ctx = load_datasets(
base_dir,
Format::Vortex {
disable_pushdown: true,
},
)
.await?;

println!("BEGIN: Q1(VORTEX)");
let start = SystemTime::now();
Expand All @@ -46,8 +53,11 @@ async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> {
Ok(())
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
// uncomment the below to enable trace logging of datafusion execution
// setup_logger(LevelFilter::Trace);

// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

Expand Down
31 changes: 21 additions & 10 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical};
use vortex_datafusion::SessionContextExt;
use vortex::{Array, ArrayDType, ArrayData, IntoArray};
use vortex_datafusion::{SessionContextExt, VortexMemTableOptions};

pub mod dbgen;
pub mod query;
Expand All @@ -17,7 +17,7 @@ pub mod schema;
pub enum Format {
Csv,
Arrow,
VortexUncompressed,
Vortex { disable_pushdown: bool },
}

// Generate table dataset.
Expand All @@ -42,8 +42,17 @@ pub async fn load_datasets<P: AsRef<Path>>(
match format {
Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await,
Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await,
Format::VortexUncompressed => {
register_vortex(&context, stringify!($name), &$name, $schema).await
Format::Vortex {
disable_pushdown, ..
} => {
register_vortex(
&context,
stringify!($name),
&$name,
$schema,
disable_pushdown,
)
.await
}
}
};
Expand Down Expand Up @@ -113,7 +122,7 @@ async fn register_vortex(
name: &str,
file: &Path,
schema: &Schema,
// TODO(aduffy): add compression option
disable_pushdown: bool,
) -> anyhow::Result<()> {
let record_batches = session
.read_csv(
Expand All @@ -137,11 +146,13 @@ async fn register_vortex(
.collect();

let dtype = chunks[0].dtype().clone();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?
.into_canonical()?
.into_array();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?.into_array();

session.register_vortex(name, chunked_array)?;
session.register_vortex_opts(
name,
chunked_array,
VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown),
)?;

Ok(())
}
4 changes: 2 additions & 2 deletions vortex-array/src/arrow/wrappers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow_buffer::{ArrowNativeType, Buffer as ArrowBuffer, OffsetBuffer, ScalarBuffer};
use arrow_buffer::{ArrowNativeType, OffsetBuffer, ScalarBuffer};
use vortex_dtype::NativePType;

use crate::array::primitive::PrimitiveArray;
Expand All @@ -7,7 +7,7 @@ pub fn as_scalar_buffer<T: NativePType + ArrowNativeType>(
array: PrimitiveArray,
) -> ScalarBuffer<T> {
assert_eq!(array.ptype(), T::PTYPE);
ScalarBuffer::from(ArrowBuffer::from(array.buffer()))
ScalarBuffer::from(array.buffer().clone().into_arrow())
}

pub fn as_offset_buffer<T: NativePType + ArrowNativeType>(
Expand Down
52 changes: 30 additions & 22 deletions vortex-array/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ fn primitive_to_arrow(primitive_array: PrimitiveArray) -> ArrayRef {
array: &PrimitiveArray,
) -> ArrowPrimitiveArray<T> {
ArrowPrimitiveArray::new(
ScalarBuffer::<T::Native>::new(array.buffer().clone().into(), 0, array.len()),
ScalarBuffer::<T::Native>::new(array.buffer().clone().into_arrow(), 0, array.len()),
array
.logical_validity()
.to_null_buffer()
Expand Down Expand Up @@ -242,29 +242,37 @@ fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef {
// Switch on Arrow DType.
match varbin_array.dtype() {
DType::Binary(_) => match offsets.ptype() {
PType::I32 => Arc::new(BinaryArray::new(
as_offset_buffer::<i32>(offsets),
data.into(),
nulls,
)),
PType::I64 => Arc::new(LargeBinaryArray::new(
as_offset_buffer::<i64>(offsets),
data.into(),
nulls,
)),
PType::I32 => Arc::new(unsafe {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how much this actually saves us, once I get to ~parity with arrow numbers I'll turn the validations back on and see what difference it makes

BinaryArray::new_unchecked(
as_offset_buffer::<i32>(offsets),
data.clone().into_arrow(),
nulls,
)
}),
PType::I64 => Arc::new(unsafe {
LargeBinaryArray::new_unchecked(
as_offset_buffer::<i64>(offsets),
data.clone().into_arrow(),
nulls,
)
}),
_ => panic!("Invalid offsets type"),
},
DType::Utf8(_) => match offsets.ptype() {
PType::I32 => Arc::new(StringArray::new(
as_offset_buffer::<i32>(offsets),
data.into(),
nulls,
)),
PType::I64 => Arc::new(LargeStringArray::new(
as_offset_buffer::<i64>(offsets),
data.into(),
nulls,
)),
PType::I32 => Arc::new(unsafe {
StringArray::new_unchecked(
as_offset_buffer::<i32>(offsets),
data.clone().into_arrow(),
nulls,
)
}),
PType::I64 => Arc::new(unsafe {
LargeStringArray::new_unchecked(
as_offset_buffer::<i64>(offsets),
data.clone().into_arrow(),
nulls,
)
}),
_ => panic!("Invalid offsets type"),
},
_ => panic!(
Expand All @@ -286,7 +294,7 @@ fn local_date_time_to_arrow(local_date_time_array: LocalDateTimeArray) -> ArrayR
.to_null_buffer()
.expect("null buffer");
let timestamps_len = timestamps.len();
let buffer = ScalarBuffer::<i64>::new(timestamps.into_buffer().into(), 0, timestamps_len);
let buffer = ScalarBuffer::<i64>::new(timestamps.into_buffer().into_arrow(), 0, timestamps_len);

match local_date_time_array.time_unit() {
TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)),
Expand Down
20 changes: 16 additions & 4 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
mod flexbuffers;
pub mod io_buf;
mod string;

use std::cmp::Ordering;
use std::ops::{Deref, Range};

use arrow_buffer::{ArrowNativeType, Buffer as ArrowBuffer};
pub use string::*;

mod flexbuffers;
pub mod io_buf;
mod string;

#[derive(Debug, Clone)]
pub enum Buffer {
// TODO(ngates): we could add Aligned(Arc<AVec>) from aligned-vec package
Expand Down Expand Up @@ -56,6 +56,18 @@ impl Buffer {
Self::Bytes(_) => Err(self),
}
}

/// Convert a Buffer into an ArrowBuffer with no copying.
pub fn into_arrow(self) -> ArrowBuffer {
match self {
Buffer::Arrow(a) => a,
Buffer::Bytes(b) => {
let v: Vec<u8> = b.into();

ArrowBuffer::from_vec(v)
}
}
}
}

impl Deref for Buffer {
Expand Down
Loading
Loading