Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FOR bug, also fix bench to compile #341

Merged
merged 16 commits into from
Jun 11, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { workspace = true }
uuid = { workspace = true }
vortex-alp = { path = "../vortex-alp" }
vortex-array = { path = "../vortex-array" }
vortex-buffer = { path = "../vortex-buffer" }
vortex-datetime-parts = { path = "../vortex-datetime-parts" }
vortex-dict = { path = "../vortex-dict" }
vortex-dtype = { path = "../vortex-dtype" }
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn vortex_compress_taxi(c: &mut Criterion) {
taxi_data_parquet();
let mut group = c.benchmark_group("end to end");
let mut group = c.benchmark_group("end to end - taxi");
group.sample_size(10);
group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data())));
group.finish()
Expand All @@ -16,7 +16,7 @@ fn vortex_compress_taxi(c: &mut Criterion) {
fn vortex_compress_medicare1(c: &mut Criterion) {
let dataset = BenchmarkDatasets::PBI(Medicare1);
dataset.as_uncompressed();
let mut group = c.benchmark_group("end to end");
let mut group = c.benchmark_group("end to end - medicare");
group.sample_size(10);
group.bench_function("compress", |b| {
b.iter(|| black_box(dataset.compress_to_vortex()))
Expand Down
1 change: 1 addition & 0 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ fn random_access(c: &mut Criterion) {
});

let dataset = BenchmarkDatasets::PBI(Medicare1);
dataset.write_as_parquet();
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
dataset.write_as_lance();
// NB: our parquet benchmarks read from a single file, and we (currently) write each
// file to an individual lance dataset for comparison parity.
Expand Down
22 changes: 11 additions & 11 deletions bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@ use std::path::PathBuf;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets::PBI;
use bench_vortex::public_bi_data::PBIDataset;
use bench_vortex::reader::{open_vortex, rewrite_parquet_as_vortex};
use bench_vortex::reader::{open_vortex_async, rewrite_parquet_as_vortex};
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{setup_logger, IdempotentPath};
use futures::executor::block_on;
use log::{info, LevelFilter};
use tokio::fs::File;
use vortex_error::VortexResult;

pub fn main() {
#[tokio::main]
pub async fn main() {
setup_logger(LevelFilter::Info);
// compress_pbi(PBIDataset::Medicare1);
compress_taxi();
compress_taxi().await.unwrap();
}

fn compress_taxi() {
async fn compress_taxi() -> VortexResult<()> {
let path: PathBuf = "taxi_data.vortex".to_data_path();
block_on(async {
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await
})
.unwrap();
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?;

let taxi_vortex = open_vortex(&path).unwrap();
let taxi_vortex = open_vortex_async(&path).await?;
info!("{}", taxi_vortex.tree_display());

let pq_size = taxi_data_parquet().metadata().unwrap().size();
let vx_size = taxi_vortex.nbytes();

info!("Parquet size: {}, Vortex size: {}", pq_size, vx_size);
info!("Compression ratio: {}", vx_size as f32 / pq_size as f32);

Ok(())
}

#[allow(dead_code)]
Expand Down
13 changes: 12 additions & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ pub fn open_vortex(path: &Path) -> VortexResult<Array> {
.map(|a| a.into_array())
}

pub async fn open_vortex_async(path: &Path) -> VortexResult<Array> {
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
let file = tokio::fs::File::open(path).await.unwrap();
let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap();
msgs.array_stream_from_messages(&CTX)
.await
.unwrap()
.collect_chunked()
.await
.map(|a| a.into_array())
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
parquet_path: PathBuf,
write: W,
Expand Down Expand Up @@ -103,7 +114,7 @@ pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult<Array> {
let array = open_vortex(path)?;
let taken = take(&array, &indices.to_vec().into_array())?;
// For equivalence.... we flatten to make sure we're not cheating too much.
taken.flatten().map(|x| x.into_array())
Ok(taken.flatten()?.into_array())
}

pub fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult<RecordBatch> {
Expand Down
31 changes: 29 additions & 2 deletions bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::future::{ready, Future};
use std::io::Write;
use std::path::PathBuf;

use futures::executor::block_on;
use tokio::fs::File;
use vortex_buffer::io_buf::IoBuf;
use vortex_error::VortexError;
use vortex_ipc::io::VortexWrite;

use crate::data_downloads::{data_vortex_uncompressed, download_data, parquet_to_lance};
use crate::reader::rewrite_parquet_as_vortex;
Expand Down Expand Up @@ -33,10 +36,34 @@ pub fn taxi_data_vortex_uncompressed() -> PathBuf {
pub fn taxi_data_vortex() -> PathBuf {
idempotent("taxi.vortex", |output_fname| {
block_on(async {
let output_file = File::create(output_fname).await?;
let output_file = std::fs::File::create(output_fname)?;
let output_file = StdFile(output_file);
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?;
Ok::<PathBuf, VortexError>(output_fname.to_path_buf())
})
})
.unwrap()
}

//
// Test code uses futures_executor with a local pool, and nothing in VortexWrite ties us to Tokio,
// so this is a simple bridge to allow us to use a `std::fs::File` as a `VortexWrite`.
//

struct StdFile(std::fs::File);

impl VortexWrite for StdFile {
async fn write_all<B: IoBuf>(&mut self, buffer: B) -> std::io::Result<B> {
self.0.write_all(buffer.as_slice())?;
Ok(buffer)
}

async fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()?;
Ok(())
}

fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
ready(Ok(()))
}
}
27 changes: 0 additions & 27 deletions vortex-array/src/array/bool/compute/as_contiguous.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/bool/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::array::bool::BoolArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::as_contiguous::AsContiguousFn;
use crate::compute::compare::CompareFn;
use crate::compute::fill::FillForwardFn;
use crate::compute::scalar_at::ScalarAtFn;
Expand All @@ -9,7 +8,6 @@ use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;

mod as_arrow;
mod as_contiguous;
mod compare;
mod fill;
mod flatten;
Expand All @@ -22,10 +20,6 @@ impl ArrayCompute for BoolArray {
Some(self)
}

fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
Some(self)
}

fn compare(&self) -> Option<&dyn CompareFn> {
Some(self)
}
Expand Down
23 changes: 2 additions & 21 deletions vortex-array/src/array/chunked/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ use vortex_error::VortexResult;
use vortex_scalar::Scalar;

use crate::array::chunked::ChunkedArray;
use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn};
use crate::compute::scalar_at::{scalar_at, ScalarAtFn};
use crate::compute::scalar_subtract::SubtractScalarFn;
use crate::compute::slice::SliceFn;
use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;
use crate::Array;

mod slice;
mod take;

impl ArrayCompute for ChunkedArray {
fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}

fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> {
Some(self)
}

Expand All @@ -29,23 +27,6 @@ impl ArrayCompute for ChunkedArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> {
Some(self)
}
}

impl AsContiguousFn for ChunkedArray {
fn as_contiguous(&self, arrays: &[Array]) -> VortexResult<Array> {
// Combine all the chunks into one, then call as_contiguous again.
let mut chunks = Vec::with_capacity(self.nchunks());
for array in arrays {
for chunk in Self::try_from(array).unwrap().chunks() {
chunks.push(chunk);
}
}
as_contiguous(&chunks)
}
}

impl ScalarAtFn for ChunkedArray {
Expand Down
16 changes: 5 additions & 11 deletions vortex-array/src/array/chunked/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ impl TakeFn for ChunkedArray {

#[cfg(test)]
mod test {
use itertools::Itertools;

use crate::array::chunked::ChunkedArray;
use crate::compute::as_contiguous::as_contiguous;
use crate::compute::take::take;
use crate::{ArrayDType, ArrayTrait, AsArray, IntoArray};

Expand All @@ -68,14 +65,11 @@ mod test {
assert_eq!(arr.len(), 9);
let indices = vec![0, 0, 6, 4].into_array();

let result = as_contiguous(
&ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap())
.unwrap()
.chunks()
.collect_vec(),
)
.unwrap()
.into_primitive();
let result = &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap())
.unwrap()
.into_array()
.flatten_primitive()
.unwrap();
assert_eq!(result.typed_data::<i32>(), &[1, 1, 1, 2]);
}
}
Loading