diff --git a/bench-vortex/src/bin/compress.rs b/bench-vortex/src/bin/compress.rs index 68f157572a..9f572bfd80 100644 --- a/bench-vortex/src/bin/compress.rs +++ b/bench-vortex/src/bin/compress.rs @@ -1,4 +1,4 @@ -use bench_vortex::reader::{compress_vortex, open_vortex}; +use bench_vortex::reader::{compress_parquet_to_vortex, open_vortex}; use bench_vortex::setup_logger; use bench_vortex::taxi_data::taxi_data_parquet; use log::LevelFilter; @@ -14,7 +14,7 @@ pub fn main() { let path: PathBuf = "taxi_data.vortex".into(); { let mut write = File::create(&path).unwrap(); - compress_vortex(&taxi_data_parquet(), &mut write).unwrap(); + compress_parquet_to_vortex(&taxi_data_parquet(), &mut write).unwrap(); } let taxi_vortex = open_vortex(&path).unwrap(); diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs new file mode 100644 index 0000000000..90c5bcfbeb --- /dev/null +++ b/bench-vortex/src/data_downloads.rs @@ -0,0 +1,73 @@ +use crate::idempotent; +use crate::reader::{compress_parquet_to_vortex, BATCH_SIZE}; +use arrow_array::RecordBatchReader; +use itertools::Itertools; +use lance::dataset::WriteParams; +use lance::Dataset; +use lance_parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder as LanceParquetRecordBatchReaderBuilder; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use std::fs::File; +use std::path::{Path, PathBuf}; +use tokio::runtime::Runtime; +use vortex::array::chunked::ChunkedArray; +use vortex::array::IntoArray; +use vortex::arrow::FromArrowType; +use vortex::serde::WriteCtx; +use vortex_schema::DType; + +pub fn download_data(fname: &str, data_url: &str) -> PathBuf { + idempotent(fname, |path| { + let mut file = File::create(path).unwrap(); + + reqwest::blocking::get(data_url).unwrap().copy_to(&mut file) + }) + .unwrap() +} + +pub fn parquet_to_lance(lance_fname: &Path, read: File) -> PathBuf { + let write_params = WriteParams::default(); + let reader = LanceParquetRecordBatchReaderBuilder::try_new(read) + .unwrap() + .build() + .unwrap(); + + Runtime::new() + .unwrap() + .block_on(Dataset::write( + reader, + lance_fname.to_str().unwrap(), + Some(write_params), + )) + .unwrap(); + PathBuf::from(lance_fname) +} + +pub fn parquet_to_vortex(output_fname: &Path, data_to_compress: PathBuf) -> PathBuf { + let mut write = File::create(output_fname).unwrap(); + compress_parquet_to_vortex(&data_to_compress, &mut write).unwrap(); + output_fname.to_path_buf() +} + +pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> PathBuf { + idempotent(fname_out, |path| { + let taxi_pq = File::open(downloaded_data).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap(); + + // FIXME(ngates): #157 the compressor should handle batch size. + let reader = builder.with_batch_size(BATCH_SIZE).build().unwrap(); + + let dtype = DType::from_arrow(reader.schema()); + + let chunks = reader + .map(|batch_result| batch_result.unwrap()) + .map(|record_batch| record_batch.into_array()) + .collect_vec(); + let chunked = ChunkedArray::new(chunks, dtype.clone()); + + let mut write = File::create(path).unwrap(); + let mut write_ctx = WriteCtx::new(&mut write); + write_ctx.dtype(&dtype)?; + write_ctx.write(&chunked) + }) + .unwrap() +} diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index cca6d9af8f..add7cdd759 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -26,6 +26,7 @@ use vortex_ree::REEEncoding; use vortex_roaring::RoaringBoolEncoding; use vortex_schema::DType; +mod data_downloads; pub mod reader; pub mod taxi_data; diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 698d3b500e..8f1d5cae37 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -27,6 +27,8 @@ use vortex::serde::{ReadCtx, WriteCtx}; use vortex_error::VortexResult; use vortex_schema::DType; +pub const BATCH_SIZE: usize = 65_536; + pub fn open_vortex(path: &Path) -> VortexResult { let mut file = File::open(path)?; let dummy_dtype: DType = PType::U8.into(); @@ -35,12 +37,15 @@ pub fn open_vortex(path: &Path) -> VortexResult { read_ctx.with_schema(&dtype).read() } -pub fn compress_vortex(parquet_path: &Path, write: &mut W) -> VortexResult<()> { +pub fn compress_parquet_to_vortex( + parquet_path: &Path, + write: &mut W, +) -> VortexResult<()> { let taxi_pq = File::open(parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?; // FIXME(ngates): #157 the compressor should handle batch size. - let reader = builder.with_batch_size(65_536).build()?; + let reader = builder.with_batch_size(BATCH_SIZE).build()?; let dtype = DType::from_arrow(reader.schema()); let ctx = compress_ctx(); diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index 8b22b26651..ee355db274 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -1,32 +1,16 @@ -use itertools::Itertools; -use lance::dataset::WriteParams; -use lance::Dataset; -use lance_parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder as LanceParquetRecordBatchReaderBuilder; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; - -use arrow_array::RecordBatchReader; +use crate::data_downloads::{ + data_vortex_uncompressed, download_data, parquet_to_lance, parquet_to_vortex, +}; +use crate::idempotent; use std::fs::File; use std::path::PathBuf; -use tokio::runtime::Runtime; -use vortex::array::chunked::ChunkedArray; -use vortex::array::IntoArray; -use vortex::arrow::FromArrowType; -use vortex::serde::WriteCtx; -use vortex_schema::DType; - -use crate::idempotent; -use crate::reader::compress_vortex; +use vortex_error::VortexError; fn download_taxi_data() -> PathBuf { - idempotent("yellow-tripdata-2023-11.parquet", |path| { - let mut file = File::create(path).unwrap(); - reqwest::blocking::get( - "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet", - ) - .unwrap() - .copy_to(&mut file) - }) - .unwrap() + let taxi_parquet_fname = "yellow-tripdata-2023-11.parquet"; + let taxi_data_url = + "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet"; + download_data(taxi_parquet_fname, taxi_data_url) } pub fn taxi_data_parquet() -> PathBuf { @@ -34,52 +18,20 @@ pub fn taxi_data_parquet() -> PathBuf { } pub fn taxi_data_lance() -> PathBuf { - idempotent("taxi.lance", |path| { - let write_params = WriteParams::default(); - - let read = File::open(taxi_data_parquet()).unwrap(); - let reader = LanceParquetRecordBatchReaderBuilder::try_new(read) - .unwrap() - .build() - .unwrap(); - - Runtime::new().unwrap().block_on(Dataset::write( - reader, - path.to_str().unwrap(), - Some(write_params), - )) + idempotent("taxi_lance", |output_fname| { + let taxi_data = File::open(taxi_data_parquet()).unwrap(); + Ok::(parquet_to_lance(output_fname, taxi_data)) }) .unwrap() } pub fn taxi_data_vortex_uncompressed() -> PathBuf { - idempotent("taxi-uncompressed.vortex", |path| { - let taxi_pq = File::open(download_taxi_data()).unwrap(); - let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap(); - - // FIXME(ngates): #157 the compressor should handle batch size. - let reader = builder.with_batch_size(65_536).build().unwrap(); - - let dtype = DType::from_arrow(reader.schema()); - - let chunks = reader - .map(|batch_result| batch_result.unwrap()) - .map(|record_batch| record_batch.into_array()) - .collect_vec(); - let chunked = ChunkedArray::new(chunks, dtype.clone()); - - let mut write = File::create(path).unwrap(); - let mut write_ctx = WriteCtx::new(&mut write); - write_ctx.dtype(&dtype)?; - write_ctx.write(&chunked) - }) - .unwrap() + data_vortex_uncompressed("taxi-uncompressed.vortex", download_taxi_data()) } pub fn taxi_data_vortex() -> PathBuf { - idempotent("taxi.vortex", |path| { - let mut write = File::create(path).unwrap(); - compress_vortex(&taxi_data_parquet(), &mut write) + idempotent("taxi.vortex", |output_fname| { + Ok::(parquet_to_vortex(output_fname, taxi_data_parquet())) }) .unwrap() }