Skip to content

Commit

Permalink
less
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed May 3, 2024
1 parent ce63945 commit f7291bc
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 35 deletions.
30 changes: 25 additions & 5 deletions vortex-ipc/benches/ipc_array_reader_take.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
use std::cell::RefCell;
use std::future::Future;
use std::io::Cursor;

use criterion::{black_box, Criterion, criterion_group, criterion_main};
use criterion::async_executor::AsyncExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use monoio::{FusionDriver, RuntimeBuilder};

use vortex::{Array, Context, IntoArray};
use monoio::{Driver, FusionDriver, FusionRuntime, RuntimeBuilder};
use vortex::array::primitive::PrimitiveArray;
use vortex::{Array, Context, IntoArray};
use vortex_dtype::{DType, Nullability, PType};
use vortex_ipc::iter::{FallibleIterator, FallibleLendingIterator};
use vortex_ipc::MonoioExecutor;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;

#[cfg(target_os = "linux")]
pub struct MonoioExecutor<L: Driver, R: Driver>(pub RefCell<FusionRuntime<L, R>>);

#[cfg(target_os = "linux")]
impl<L: Driver, R: Driver> AsyncExecutor for MonoioExecutor<L, R> {
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
self.0.borrow_mut().block_on(future)
}
}

#[cfg(not(target_os = "linux"))]
pub struct MonoioExecutor<R: Driver>(pub RefCell<FusionRuntime<R>>);

#[cfg(not(target_os = "linux"))]
impl<R: Driver> AsyncExecutor for MonoioExecutor<R> {
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
self.0.borrow_mut().block_on(future)
}
}

// 100 record batches, 100k rows each
// take from the first 20 batches and last batch
// compare with arrow
Expand Down
28 changes: 23 additions & 5 deletions vortex-ipc/benches/ipc_take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,42 @@ use std::sync::Arc;

use arrow::ipc::reader::StreamReader as ArrowStreamReader;
use arrow_array::{Int32Array, RecordBatch};
use arrow_ipc::{CompressionType, MetadataVersion};
use arrow_ipc::writer::{IpcWriteOptions, StreamWriter as ArrowStreamWriter};
use arrow_ipc::{CompressionType, MetadataVersion};
use arrow_schema::{DataType, Field, Schema};
use criterion::{black_box, Criterion, criterion_group, criterion_main};
use criterion::async_executor::AsyncExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use monoio::{Driver, FusionDriver, FusionRuntime, RuntimeBuilder};

use vortex::{Array, Context, IntoArray, OwnedArray};
use vortex::array::primitive::PrimitiveArray;
use vortex::compress::Compressor;
use vortex::compute::take::take;
use vortex::{Array, Context, IntoArray, OwnedArray};
use vortex_error::VortexResult;
use vortex_ipc::iter::FallibleLendingIterator;
use vortex_ipc::MonoioExecutor;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;

#[cfg(target_os = "linux")]
pub struct MonoioExecutor<L: Driver, R: Driver>(pub RefCell<FusionRuntime<L, R>>);

#[cfg(target_os = "linux")]
impl<L: Driver, R: Driver> AsyncExecutor for MonoioExecutor<L, R> {
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
self.0.borrow_mut().block_on(future)
}
}

#[cfg(not(target_os = "linux"))]
pub struct MonoioExecutor<R: Driver>(pub RefCell<FusionRuntime<R>>);

#[cfg(not(target_os = "linux"))]
impl<R: Driver> AsyncExecutor for MonoioExecutor<R> {
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
self.0.borrow_mut().block_on(future)
}
}

fn ipc_take(c: &mut Criterion) {
let mut group = c.benchmark_group("ipc_take");
let indices = Int32Array::from(vec![10, 11, 12, 13, 100_000, 2_999_999]);
Expand Down
25 changes: 0 additions & 25 deletions vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use std::cell::RefCell;
use std::future::Future;
use criterion::async_executor::AsyncExecutor;
use monoio::{Driver, FusionRuntime};
use vortex_error::{vortex_err, VortexError};

pub const ALIGNMENT: usize = 64;
Expand Down Expand Up @@ -38,27 +34,6 @@ pub(crate) const fn missing(field: &'static str) -> impl FnOnce() -> VortexError
move || vortex_err!(InvalidSerde: "missing field: {}", field)
}


#[cfg(target_os = "linux")]
pub struct MonoioExecutor<L: Driver, R: Driver>(pub RefCell<FusionRuntime<L, R>>);

#[cfg(target_os = "linux")]
impl<L: Driver, R: Driver> AsyncExecutor for MonoioExecutor<L, R> {
fn block_on<T>(&self, future: impl Future<Output=T>) -> T {
self.0.borrow_mut().block_on(future)
}
}

#[cfg(not(target_os = "linux"))]
pub struct MonoioExecutor<R: Driver>(pub RefCell<FusionRuntime<R>>);

#[cfg(not(target_os = "linux"))]
impl<R: Driver> AsyncExecutor for MonoioExecutor<R> {
fn block_on<T>(&self, future: impl Future<Output=T>) -> T {
self.0.borrow_mut().block_on(future)
}
}

#[cfg(test)]
mod tests {
use std::io::{Cursor, Write};
Expand Down

0 comments on commit f7291bc

Please sign in to comment.