Skip to content

Commit

Permalink
Fix FOR bug, also fix bench to compile (#341)
Browse files Browse the repository at this point in the history
@robert3005 

1. Fix issue with FoR compression where nullability was getting
stripped, causing `compress` benchmark to fail
2. Fix some random tokio bs that was also causing `compress` benchmark
to fail due to recursive runtime creations
  • Loading branch information
a10y authored Jun 11, 2024
1 parent 0e83666 commit d416f69
Show file tree
Hide file tree
Showing 39 changed files with 684 additions and 439 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { workspace = true }
uuid = { workspace = true }
vortex-alp = { path = "../vortex-alp" }
vortex-array = { path = "../vortex-array" }
vortex-buffer = { path = "../vortex-buffer" }
vortex-datetime-parts = { path = "../vortex-datetime-parts" }
vortex-dict = { path = "../vortex-dict" }
vortex-dtype = { path = "../vortex-dtype" }
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn vortex_compress_taxi(c: &mut Criterion) {
taxi_data_parquet();
let mut group = c.benchmark_group("end to end");
let mut group = c.benchmark_group("end to end - taxi");
group.sample_size(10);
group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data())));
group.finish()
Expand All @@ -16,7 +16,7 @@ fn vortex_compress_taxi(c: &mut Criterion) {
fn vortex_compress_medicare1(c: &mut Criterion) {
let dataset = BenchmarkDatasets::PBI(Medicare1);
dataset.as_uncompressed();
let mut group = c.benchmark_group("end to end");
let mut group = c.benchmark_group("end to end - medicare");
group.sample_size(10);
group.bench_function("compress", |b| {
b.iter(|| black_box(dataset.compress_to_vortex()))
Expand Down
1 change: 1 addition & 0 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ fn random_access(c: &mut Criterion) {
});

let dataset = BenchmarkDatasets::PBI(Medicare1);
dataset.write_as_parquet();
dataset.write_as_lance();
// NB: our parquet benchmarks read from a single file, and we (currently) write each
// file to an individual lance dataset for comparison parity.
Expand Down
22 changes: 11 additions & 11 deletions bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@ use std::path::PathBuf;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets::PBI;
use bench_vortex::public_bi_data::PBIDataset;
use bench_vortex::reader::{open_vortex, rewrite_parquet_as_vortex};
use bench_vortex::reader::{open_vortex_async, rewrite_parquet_as_vortex};
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{setup_logger, IdempotentPath};
use futures::executor::block_on;
use log::{info, LevelFilter};
use tokio::fs::File;
use vortex_error::VortexResult;

pub fn main() {
#[tokio::main]
pub async fn main() {
setup_logger(LevelFilter::Info);
// compress_pbi(PBIDataset::Medicare1);
compress_taxi();
compress_taxi().await.unwrap();
}

fn compress_taxi() {
async fn compress_taxi() -> VortexResult<()> {
let path: PathBuf = "taxi_data.vortex".to_data_path();
block_on(async {
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await
})
.unwrap();
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?;

let taxi_vortex = open_vortex(&path).unwrap();
let taxi_vortex = open_vortex_async(&path).await?;
info!("{}", taxi_vortex.tree_display());

let pq_size = taxi_data_parquet().metadata().unwrap().size();
let vx_size = taxi_vortex.nbytes();

info!("Parquet size: {}, Vortex size: {}", pq_size, vx_size);
info!("Compression ratio: {}", vx_size as f32 / pq_size as f32);

Ok(())
}

#[allow(dead_code)]
Expand Down
13 changes: 12 additions & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ pub fn open_vortex(path: &Path) -> VortexResult<Array> {
.map(|a| a.into_array())
}

pub async fn open_vortex_async(path: &Path) -> VortexResult<Array> {
let file = tokio::fs::File::open(path).await.unwrap();
let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap();
msgs.array_stream_from_messages(&CTX)
.await
.unwrap()
.collect_chunked()
.await
.map(|a| a.into_array())
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
parquet_path: PathBuf,
write: W,
Expand Down Expand Up @@ -103,7 +114,7 @@ pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult<Array> {
let array = open_vortex(path)?;
let taken = take(&array, &indices.to_vec().into_array())?;
// For equivalence.... we flatten to make sure we're not cheating too much.
taken.flatten().map(|x| x.into_array())
Ok(taken.flatten()?.into_array())
}

pub fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult<RecordBatch> {
Expand Down
31 changes: 29 additions & 2 deletions bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::future::{ready, Future};
use std::io::Write;
use std::path::PathBuf;

use futures::executor::block_on;
use tokio::fs::File;
use vortex_buffer::io_buf::IoBuf;
use vortex_error::VortexError;
use vortex_ipc::io::VortexWrite;

use crate::data_downloads::{data_vortex_uncompressed, download_data, parquet_to_lance};
use crate::reader::rewrite_parquet_as_vortex;
Expand Down Expand Up @@ -33,10 +36,34 @@ pub fn taxi_data_vortex_uncompressed() -> PathBuf {
pub fn taxi_data_vortex() -> PathBuf {
idempotent("taxi.vortex", |output_fname| {
block_on(async {
let output_file = File::create(output_fname).await?;
let output_file = std::fs::File::create(output_fname)?;
let output_file = StdFile(output_file);
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?;
Ok::<PathBuf, VortexError>(output_fname.to_path_buf())
})
})
.unwrap()
}

//
// Test code uses futures_executor with a local pool, and nothing in VortexWrite ties us to Tokio,
// so this is a simple bridge to allow us to use a `std::fs::File` as a `VortexWrite`.
//

struct StdFile(std::fs::File);

impl VortexWrite for StdFile {
async fn write_all<B: IoBuf>(&mut self, buffer: B) -> std::io::Result<B> {
self.0.write_all(buffer.as_slice())?;
Ok(buffer)
}

async fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()?;
Ok(())
}

fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
ready(Ok(()))
}
}
27 changes: 0 additions & 27 deletions vortex-array/src/array/bool/compute/as_contiguous.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/bool/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::array::bool::BoolArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::as_contiguous::AsContiguousFn;
use crate::compute::compare::CompareFn;
use crate::compute::fill::FillForwardFn;
use crate::compute::scalar_at::ScalarAtFn;
Expand All @@ -9,7 +8,6 @@ use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;

mod as_arrow;
mod as_contiguous;
mod compare;
mod fill;
mod flatten;
Expand All @@ -22,10 +20,6 @@ impl ArrayCompute for BoolArray {
Some(self)
}

fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
Some(self)
}

fn compare(&self) -> Option<&dyn CompareFn> {
Some(self)
}
Expand Down
23 changes: 2 additions & 21 deletions vortex-array/src/array/chunked/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ use vortex_error::VortexResult;
use vortex_scalar::Scalar;

use crate::array::chunked::ChunkedArray;
use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn};
use crate::compute::scalar_at::{scalar_at, ScalarAtFn};
use crate::compute::scalar_subtract::SubtractScalarFn;
use crate::compute::slice::SliceFn;
use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;
use crate::Array;

mod slice;
mod take;

impl ArrayCompute for ChunkedArray {
fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}

fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> {
Some(self)
}

Expand All @@ -29,23 +27,6 @@ impl ArrayCompute for ChunkedArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> {
Some(self)
}
}

impl AsContiguousFn for ChunkedArray {
fn as_contiguous(&self, arrays: &[Array]) -> VortexResult<Array> {
// Combine all the chunks into one, then call as_contiguous again.
let mut chunks = Vec::with_capacity(self.nchunks());
for array in arrays {
for chunk in Self::try_from(array).unwrap().chunks() {
chunks.push(chunk);
}
}
as_contiguous(&chunks)
}
}

impl ScalarAtFn for ChunkedArray {
Expand Down
16 changes: 5 additions & 11 deletions vortex-array/src/array/chunked/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ impl TakeFn for ChunkedArray {

#[cfg(test)]
mod test {
use itertools::Itertools;

use crate::array::chunked::ChunkedArray;
use crate::compute::as_contiguous::as_contiguous;
use crate::compute::take::take;
use crate::{ArrayDType, ArrayTrait, AsArray, IntoArray};

Expand All @@ -68,14 +65,11 @@ mod test {
assert_eq!(arr.len(), 9);
let indices = vec![0, 0, 6, 4].into_array();

let result = as_contiguous(
&ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap())
.unwrap()
.chunks()
.collect_vec(),
)
.unwrap()
.into_primitive();
let result = &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap())
.unwrap()
.into_array()
.flatten_primitive()
.unwrap();
assert_eq!(result.typed_data::<i32>(), &[1, 1, 1, 2]);
}
}
Loading

0 comments on commit d416f69

Please sign in to comment.