Skip to content

Commit

Permalink
draw the rest of the f*cking owl
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jul 12, 2024
1 parent cc4d32f commit 076be97
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 191 deletions.
7 changes: 5 additions & 2 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> {

#[tokio::main(flavor = "current_thread")]
async fn main() {
// uncomment the below to enable trace logging of datafusion execution
// setup_logger(LevelFilter::Trace);

// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

// q1_csv(&data_dir).await.unwrap();
q1_csv(&data_dir).await.unwrap();
q1_arrow(&data_dir).await.unwrap();
// q1_vortex(&data_dir).await.unwrap();
q1_vortex(&data_dir).await.unwrap();
}
12 changes: 3 additions & 9 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use datafusion::datasource::MemTable;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical};
use vortex::{Array, ArrayDType, ArrayData, IntoArray};
use vortex_datafusion::{SessionContextExt, VortexMemTableOptions};

pub mod dbgen;
Expand Down Expand Up @@ -146,17 +146,11 @@ async fn register_vortex(
.collect();

let dtype = chunks[0].dtype().clone();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?
.into_canonical()?
.into_array();

// Convert to Arrow to concat all chunks, then flip back to Vortex for processing.
let arrow = chunked_array.into_canonical()?.into_arrow();
let array = ArrayData::from_arrow(arrow, false).into_array();
let chunked_array = ChunkedArray::try_new(chunks, dtype)?.into_array();

session.register_vortex_opts(
name,
array,
chunked_array,
VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown),
)?;

Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ fn local_date_time_to_arrow(local_date_time_array: LocalDateTimeArray) -> ArrayR
.to_null_buffer()
.expect("null buffer");
let timestamps_len = timestamps.len();
let buffer = ScalarBuffer::<i64>::new(timestamps.into_buffer().into(), 0, timestamps_len);
let buffer = ScalarBuffer::<i64>::new(timestamps.into_buffer().into_arrow(), 0, timestamps_len);

match local_date_time_array.time_unit() {
TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)),
Expand Down
174 changes: 85 additions & 89 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
};
use futures::{Stream, StreamExt};
use futures::Stream;
use itertools::Itertools;
use pin_project::pin_project;
use vortex::array::chunked::ChunkedArray;
use vortex::array::struct_::StructArray;
use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical};
use vortex_dtype::DType;

Expand Down Expand Up @@ -109,7 +107,7 @@ impl SessionContextExt for SessionContext {
/// a table to DataFusion.
#[derive(Debug, Clone)]
pub struct VortexMemTable {
array: Array,
array: ChunkedArray,
schema_ref: SchemaRef,
options: VortexMemTableOptions,
}
Expand All @@ -124,6 +122,14 @@ impl VortexMemTable {
let arrow_schema = infer_schema(array.dtype());
let schema_ref = SchemaRef::new(arrow_schema);

let array = match ChunkedArray::try_from(&array) {
Ok(a) => a,
_ => {
let dtype = array.dtype().clone();
ChunkedArray::try_new(vec![array], dtype).unwrap()
}
};

Self {
array,
schema_ref,
Expand Down Expand Up @@ -176,12 +182,6 @@ impl TableProvider for VortexMemTable {
Some(filters)
};

let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) {
Partitioning::RoundRobinBatch(chunked_array.nchunks())
} else {
Partitioning::UnknownPartitioning(1)
};

let output_projection: Vec<usize> = match projection {
None => (0..self.schema_ref.fields().len()).collect(),
Some(proj) => proj.clone(),
Expand Down Expand Up @@ -215,7 +215,9 @@ impl TableProvider for VortexMemTable {
);
let plan_properties = PlanProperties::new(
EquivalenceProperties::new(output_schema),
partitioning,
// non-pushdown scans execute in single partition, where the partition
// yields one RecordBatch per chunk in the input ChunkedArray
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);

Expand Down Expand Up @@ -265,23 +267,21 @@ fn make_filter_then_take_plan(
schema: SchemaRef,
filter_exprs: &[Expr],
filter_projection: Vec<usize>,
array: Array,
chunked_array: ChunkedArray,
output_projection: Vec<usize>,
_session_state: &SessionState,
) -> Arc<dyn ExecutionPlan> {
let struct_array = StructArray::try_from(array).unwrap();

let filter_struct = struct_array
.project(filter_projection.as_slice())
.expect("projecting filter struct");

let row_selector_op = Arc::new(RowSelectorExec::new(filter_exprs, &filter_struct));
let row_selector_op = Arc::new(RowSelectorExec::new(
filter_exprs,
filter_projection,
&chunked_array,
));

Arc::new(TakeRowsExec::new(
schema.clone(),
&output_projection,
row_selector_op.clone(),
&struct_array,
&chunked_array,
))
}

Expand Down Expand Up @@ -372,86 +372,83 @@ fn get_column_references(expr: &Expr) -> HashSet<String> {
}

/// Physical plan node for scans against an in-memory, possibly chunked Vortex Array.
#[derive(Debug, Clone)]
#[derive(Clone)]
struct VortexScanExec {
array: Array,
array: ChunkedArray,
scan_projection: Vec<usize>,
plan_properties: PlanProperties,
}

impl Debug for VortexScanExec {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VortexScanExec")
.field("array_length", &self.array.len())
.field("array_dtype", &self.array.dtype())
.field("scan_projection", &self.scan_projection)
.field("plan_properties", &self.plan_properties)
.finish_non_exhaustive()
}
}

impl DisplayAs for VortexScanExec {
fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

/// Read a single array chunk from the source as a RecordBatch.
///
/// # Errors
/// This function will return an Error if `array` is not struct-typed. It will also return an
/// error if the projection references columns
fn execute_unfiltered(
array: Array,
projection: &Vec<usize>,
) -> DFResult<SendableRecordBatchStream> {
// Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef.
let struct_array = array
.clone()
.into_struct()
.map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?;

let projected_struct = struct_array
.project(projection.as_slice())
.map_err(|vortex_err| {
exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}")
})?;
let batch = RecordBatch::from(
projected_struct
.into_canonical()
.expect("struct arrays must canonicalize")
.into_arrow()
.as_any()
.downcast_ref::<ArrowStructArray>()
.expect("vortex StructArray must convert to arrow StructArray"),
);
Ok(Box::pin(VortexRecordBatchStream {
schema_ref: batch.schema(),
inner: futures::stream::iter(vec![batch]),
}))
}

// Row selector stream.
// I.e., send a stream of RowSelector which allows us to pass in a bunch of binary arrays
// back down to the other systems here instead.

#[pin_project]
pub(crate) struct VortexRecordBatchStream<I> {
pub(crate) struct VortexRecordBatchStream {
schema_ref: SchemaRef,

#[pin]
inner: I,
idx: usize,
num_chunks: usize,
chunks: ChunkedArray,

projection: Vec<usize>,
}

impl<I> Stream for VortexRecordBatchStream<I>
where
I: Stream<Item = RecordBatch>,
{
impl Stream for VortexRecordBatchStream {
type Item = DFResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match this.inner.poll_next_unpin(cx) {
Poll::Ready(Some(batch)) => Poll::Ready(Some(Ok(batch))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.idx >= this.num_chunks {
return Poll::Ready(None);
}

// Grab next chunk, project and convert to Arrow.
let chunk = this
.chunks
.chunk(this.idx)
.expect("nchunks should match precomputed");
this.idx += 1;

let struct_array = chunk
.clone()
.into_struct()
.map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?;

let projected_struct =
struct_array
.project(this.projection.as_slice())
.map_err(|vortex_err| {
exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}")
})?;

let batch = RecordBatch::from(
projected_struct
.into_canonical()
.expect("struct arrays must canonicalize")
.into_arrow()
.as_any()
.downcast_ref::<ArrowStructArray>()
.expect("vortex StructArray must convert to arrow StructArray"),
);

Poll::Ready(Some(Ok(batch)))
}
}

impl<I> RecordBatchStream for VortexRecordBatchStream<I>
where
I: Stream<Item = RecordBatch>,
{
impl RecordBatchStream for VortexRecordBatchStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema_ref)
}
Expand Down Expand Up @@ -480,18 +477,17 @@ impl ExecutionPlan for VortexScanExec {

fn execute(
&self,
partition: usize,
_partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) {
chunked_array
.chunk(partition)
.ok_or_else(|| exec_datafusion_err!("partition not found"))?
} else {
self.array.clone()
};

execute_unfiltered(chunk, &self.scan_projection)
// Send back a stream of RecordBatch that returns the next element of the chunk each time.
Ok(Box::pin(VortexRecordBatchStream {
schema_ref: self.schema().clone(),
idx: 0,
num_chunks: self.array.nchunks(),
chunks: self.array.clone(),
projection: self.scan_projection.clone(),
}))
}
}

Expand Down
Loading

0 comments on commit 076be97

Please sign in to comment.