Skip to content

Commit

Permalink
Small refactor to make adding new data sources easier (#177)
Browse files Browse the repository at this point in the history
De-specialize download and conversion code so that it can be used with
other datasets than ny-taxi.
  • Loading branch information
jdcasale authored Apr 3, 2024
1 parent 538d089 commit 395d77b
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 67 deletions.
4 changes: 2 additions & 2 deletions bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bench_vortex::reader::{compress_vortex, open_vortex};
use bench_vortex::reader::{compress_parquet_to_vortex, open_vortex};
use bench_vortex::setup_logger;
use bench_vortex::taxi_data::taxi_data_parquet;
use log::LevelFilter;
Expand All @@ -14,7 +14,7 @@ pub fn main() {
let path: PathBuf = "taxi_data.vortex".into();
{
let mut write = File::create(&path).unwrap();
compress_vortex(&taxi_data_parquet(), &mut write).unwrap();
compress_parquet_to_vortex(&taxi_data_parquet(), &mut write).unwrap();
}

let taxi_vortex = open_vortex(&path).unwrap();
Expand Down
73 changes: 73 additions & 0 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use crate::idempotent;
use crate::reader::{compress_parquet_to_vortex, BATCH_SIZE};
use arrow_array::RecordBatchReader;
use itertools::Itertools;
use lance::dataset::WriteParams;
use lance::Dataset;
use lance_parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder as LanceParquetRecordBatchReaderBuilder;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
use std::path::{Path, PathBuf};
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::array::IntoArray;
use vortex::arrow::FromArrowType;
use vortex::serde::WriteCtx;
use vortex_schema::DType;

pub fn download_data(fname: &str, data_url: &str) -> PathBuf {
idempotent(fname, |path| {
let mut file = File::create(path).unwrap();

reqwest::blocking::get(data_url).unwrap().copy_to(&mut file)
})
.unwrap()
}

pub fn parquet_to_lance(lance_fname: &Path, read: File) -> PathBuf {
let write_params = WriteParams::default();
let reader = LanceParquetRecordBatchReaderBuilder::try_new(read)
.unwrap()
.build()
.unwrap();

Runtime::new()
.unwrap()
.block_on(Dataset::write(
reader,
lance_fname.to_str().unwrap(),
Some(write_params),
))
.unwrap();
PathBuf::from(lance_fname)
}

pub fn parquet_to_vortex(output_fname: &Path, data_to_compress: PathBuf) -> PathBuf {
let mut write = File::create(output_fname).unwrap();
compress_parquet_to_vortex(&data_to_compress, &mut write).unwrap();
output_fname.to_path_buf()
}

pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> PathBuf {
idempotent(fname_out, |path| {
let taxi_pq = File::open(downloaded_data).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap();

// FIXME(ngates): #157 the compressor should handle batch size.
let reader = builder.with_batch_size(BATCH_SIZE).build().unwrap();

let dtype = DType::from_arrow(reader.schema());

let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| record_batch.into_array())
.collect_vec();
let chunked = ChunkedArray::new(chunks, dtype.clone());

let mut write = File::create(path).unwrap();
let mut write_ctx = WriteCtx::new(&mut write);
write_ctx.dtype(&dtype)?;
write_ctx.write(&chunked)
})
.unwrap()
}
1 change: 1 addition & 0 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use vortex_ree::REEEncoding;
use vortex_roaring::RoaringBoolEncoding;
use vortex_schema::DType;

mod data_downloads;
pub mod reader;
pub mod taxi_data;

Expand Down
9 changes: 7 additions & 2 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use vortex::serde::{ReadCtx, WriteCtx};
use vortex_error::VortexResult;
use vortex_schema::DType;

pub const BATCH_SIZE: usize = 65_536;

pub fn open_vortex(path: &Path) -> VortexResult<ArrayRef> {
let mut file = File::open(path)?;
let dummy_dtype: DType = PType::U8.into();
Expand All @@ -35,12 +37,15 @@ pub fn open_vortex(path: &Path) -> VortexResult<ArrayRef> {
read_ctx.with_schema(&dtype).read()
}

pub fn compress_vortex<W: Write>(parquet_path: &Path, write: &mut W) -> VortexResult<()> {
pub fn compress_parquet_to_vortex<W: Write>(
parquet_path: &Path,
write: &mut W,
) -> VortexResult<()> {
let taxi_pq = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?;

// FIXME(ngates): #157 the compressor should handle batch size.
let reader = builder.with_batch_size(65_536).build()?;
let reader = builder.with_batch_size(BATCH_SIZE).build()?;

let dtype = DType::from_arrow(reader.schema());
let ctx = compress_ctx();
Expand Down
78 changes: 15 additions & 63 deletions bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
@@ -1,85 +1,37 @@
use itertools::Itertools;
use lance::dataset::WriteParams;
use lance::Dataset;
use lance_parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder as LanceParquetRecordBatchReaderBuilder;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

use arrow_array::RecordBatchReader;
use crate::data_downloads::{
data_vortex_uncompressed, download_data, parquet_to_lance, parquet_to_vortex,
};
use crate::idempotent;
use std::fs::File;
use std::path::PathBuf;
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::array::IntoArray;
use vortex::arrow::FromArrowType;
use vortex::serde::WriteCtx;
use vortex_schema::DType;

use crate::idempotent;
use crate::reader::compress_vortex;
use vortex_error::VortexError;

fn download_taxi_data() -> PathBuf {
idempotent("yellow-tripdata-2023-11.parquet", |path| {
let mut file = File::create(path).unwrap();
reqwest::blocking::get(
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet",
)
.unwrap()
.copy_to(&mut file)
})
.unwrap()
let taxi_parquet_fname = "yellow-tripdata-2023-11.parquet";
let taxi_data_url =
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet";
download_data(taxi_parquet_fname, taxi_data_url)
}

pub fn taxi_data_parquet() -> PathBuf {
download_taxi_data()
}

pub fn taxi_data_lance() -> PathBuf {
idempotent("taxi.lance", |path| {
let write_params = WriteParams::default();

let read = File::open(taxi_data_parquet()).unwrap();
let reader = LanceParquetRecordBatchReaderBuilder::try_new(read)
.unwrap()
.build()
.unwrap();

Runtime::new().unwrap().block_on(Dataset::write(
reader,
path.to_str().unwrap(),
Some(write_params),
))
idempotent("taxi_lance", |output_fname| {
let taxi_data = File::open(taxi_data_parquet()).unwrap();
Ok::<PathBuf, VortexError>(parquet_to_lance(output_fname, taxi_data))
})
.unwrap()
}

pub fn taxi_data_vortex_uncompressed() -> PathBuf {
idempotent("taxi-uncompressed.vortex", |path| {
let taxi_pq = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap();

// FIXME(ngates): #157 the compressor should handle batch size.
let reader = builder.with_batch_size(65_536).build().unwrap();

let dtype = DType::from_arrow(reader.schema());

let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| record_batch.into_array())
.collect_vec();
let chunked = ChunkedArray::new(chunks, dtype.clone());

let mut write = File::create(path).unwrap();
let mut write_ctx = WriteCtx::new(&mut write);
write_ctx.dtype(&dtype)?;
write_ctx.write(&chunked)
})
.unwrap()
data_vortex_uncompressed("taxi-uncompressed.vortex", download_taxi_data())
}

pub fn taxi_data_vortex() -> PathBuf {
idempotent("taxi.vortex", |path| {
let mut write = File::create(path).unwrap();
compress_vortex(&taxi_data_parquet(), &mut write)
idempotent("taxi.vortex", |output_fname| {
Ok::<PathBuf, VortexError>(parquet_to_vortex(output_fname, taxi_data_parquet()))
})
.unwrap()
}

0 comments on commit 395d77b

Please sign in to comment.