diff --git a/Cargo.lock b/Cargo.lock index 78898413a5..c55210ea94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -961,6 +961,8 @@ dependencies = [ "criterion", "csv", "enum-iterator", + "fs_extra", + "humansize", "itertools 0.12.1", "lance", "lazy_static", @@ -1994,6 +1996,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.30" diff --git a/Cargo.toml b/Cargo.toml index f34bdf4231..3eafabf689 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,7 @@ csv = "1.3.0" arrow-csv = "51.0.0" lazy_static = "1.4.0" enum-iterator = "2.0.0" +fs_extra = "1.3.0" [workspace.lints.rust] warnings = "deny" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index a464f8cfa6..6e570a0ff4 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -43,6 +43,8 @@ arrow-csv = { workspace = true } arrow = { workspace = true } lazy_static = { workspace = true } enum-iterator = { workspace = true } +fs_extra = { workspace = true } +humansize = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index 1c0e98e57e..b41bda0499 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -1,6 +1,8 @@ -use bench_vortex::medicare_data::medicare_data_csv; +use bench_vortex::compress_taxi_data; +use bench_vortex::data_downloads::BenchmarkDataset; +use bench_vortex::public_bi_data::BenchmarkDatasets; +use bench_vortex::public_bi_data::PBIDataset::Medicare1; use bench_vortex::taxi_data::taxi_data_parquet; -use bench_vortex::{compress_medicare_data, compress_taxi_data}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; fn vortex_compress_taxi(c: &mut Criterion) { @@ -11,15 +13,16 @@ fn vortex_compress_taxi(c: &mut Criterion) { group.finish() } -fn vortex_compress_medicare(c: &mut Criterion) { - medicare_data_csv(); +fn vortex_compress_medicare1(c: &mut Criterion) { + let dataset = BenchmarkDatasets::PBI(Medicare1); + dataset.as_uncompressed(); let mut group = c.benchmark_group("end to end"); group.sample_size(10); group.bench_function("compress", |b| { - b.iter(|| black_box(compress_medicare_data())) + b.iter(|| black_box(dataset.compress_to_vortex())) }); group.finish() } -criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare); +criterion_group!(benches, vortex_compress_taxi, vortex_compress_medicare1); criterion_main!(benches); diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index 819758b6a4..f472683366 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -1,4 +1,6 @@ -use bench_vortex::medicare_data::medicare_data_lance; +use bench_vortex::data_downloads::{BenchmarkDataset, FileType}; +use bench_vortex::public_bi_data::BenchmarkDatasets; +use bench_vortex::public_bi_data::PBIDataset::Medicare1; use bench_vortex::reader::{take_lance, take_parquet, take_vortex}; use bench_vortex::taxi_data::{taxi_data_lance, taxi_data_parquet, taxi_data_vortex}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; @@ -20,13 +22,19 @@ fn random_access(c: &mut Criterion) { }); let taxi_lance = taxi_data_lance(); - group.bench_function("lance", |b| { + group.bench_function("taxi_lance", |b| { b.iter(|| black_box(take_lance(&taxi_lance, &indices))) }); - let medicare_lance = medicare_data_lance(); - group.bench_function("lance", |b| { - b.iter(|| black_box(take_lance(&medicare_lance, &indices))) + let dataset = BenchmarkDatasets::PBI(Medicare1); + dataset.write_as_lance(); + // NB: our parquet benchmarks read from a single file, and we (currently) write each + // file to an individual lance dataset for comparison parity. + // TODO(@jcasale): use datafusion for parquet random-access benchmarks and modify + // lance-writing code to write a single dataset. + let medicare_lance = dataset.list_files(FileType::Lance); + group.bench_function("medic_lance2", |b| { + b.iter(|| black_box(take_lance(medicare_lance.first().unwrap(), &indices))) }); } diff --git a/bench-vortex/src/bin/compress.rs b/bench-vortex/src/bin/compress.rs index 0dcbe52e0c..871020c628 100644 --- a/bench-vortex/src/bin/compress.rs +++ b/bench-vortex/src/bin/compress.rs @@ -37,7 +37,8 @@ fn compress_taxi() { fn compress_pbi(which_pbi: PBIDataset) { let dataset = PBI(which_pbi); - dataset.uncompressed(); + dataset.as_uncompressed(); dataset.write_as_vortex(); dataset.write_as_parquet(); + dataset.write_as_lance(); } diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs index 4afc2b0c75..4989b9e64a 100644 --- a/bench-vortex/src/data_downloads.rs +++ b/bench-vortex/src/data_downloads.rs @@ -12,10 +12,10 @@ use log::info; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::runtime::Runtime; use vortex::array::chunked::ChunkedArray; -use vortex::array::IntoArray; +use vortex::array::{ArrayRef, IntoArray}; use vortex::arrow::FromArrowType; use vortex::serde::WriteCtx; -use vortex_error::VortexError; +use vortex_error::{VortexError, VortexResult}; use vortex_schema::DType; use crate::idempotent; @@ -30,8 +30,9 @@ pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf { .unwrap() } -pub fn parquet_to_lance(lance_fname: &Path, read: File) -> PathBuf { +pub fn parquet_to_lance(lance_fname: &Path, parquet_file: &Path) -> VortexResult { let write_params = WriteParams::default(); + let read = File::open(parquet_file).unwrap(); let reader = LanceParquetRecordBatchReaderBuilder::try_new(read) .unwrap() .build() @@ -45,7 +46,7 @@ pub fn parquet_to_lance(lance_fname: &Path, read: File) -> PathBuf { Some(write_params), )) .unwrap(); - PathBuf::from(lance_fname) + Ok(PathBuf::from(lance_fname)) } pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> PathBuf { @@ -93,9 +94,19 @@ pub fn decompress_bz2(input_path: PathBuf, output_path: PathBuf) -> PathBuf { } pub trait BenchmarkDataset { - fn uncompressed(&self); + fn as_uncompressed(&self); + fn compress_to_vortex(&self) -> Vec; fn write_as_parquet(&self); fn write_as_vortex(&self); - fn list_files(&self) -> Vec; + fn write_as_lance(&self); + fn list_files(&self, file_type: FileType) -> Vec; fn directory_location(&self) -> PathBuf; } + +#[derive(Clone, Copy)] +pub enum FileType { + Csv, + Parquet, + Vortex, + Lance, +} diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 7cc959ffc2..e2bb47f65c 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -1,11 +1,10 @@ use std::fs::{create_dir_all, File}; -use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; use arrow::datatypes::SchemaRef; use arrow_array::RecordBatchReader; -use arrow_csv::reader::Format; +use humansize::DECIMAL; use itertools::Itertools; use log::{info, warn, LevelFilter}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -27,12 +26,10 @@ use vortex_ree::REEEncoding; use vortex_roaring::RoaringBoolEncoding; use vortex_schema::DType; -use crate::medicare_data::medicare_data_csv; use crate::reader::BATCH_SIZE; use crate::taxi_data::taxi_data_parquet; pub mod data_downloads; -pub mod medicare_data; pub mod public_bi_data; pub mod reader; pub mod taxi_data; @@ -141,34 +138,6 @@ pub fn compress_taxi_data() -> ArrayRef { chunks_to_array(schema, uncompressed_size, chunks) } -pub fn compress_medicare_data() -> ArrayRef { - let csv_file = File::open(medicare_data_csv()).unwrap(); - let reader = BufReader::new(csv_file.try_clone().unwrap()); - - let (schema, _) = Format::default() - .with_delimiter(u8::try_from('|').unwrap()) - .infer_schema(&mut csv_file.try_clone().unwrap(), None) - .unwrap(); - - let csv_reader = arrow::csv::ReaderBuilder::new(Arc::new(schema.clone())) - .with_batch_size(BATCH_SIZE * 10) - .build(reader) - .unwrap(); - - let ctx = compress_ctx(); - let mut uncompressed_size: usize = 0; - let chunks = csv_reader - .into_iter() - .map(|batch_result| batch_result.unwrap()) - .map(|batch| batch.into_array()) - .map(|array| { - uncompressed_size += array.nbytes(); - ctx.clone().compress(&array, None).unwrap() - }) - .collect_vec(); - chunks_to_array(SchemaRef::new(schema), uncompressed_size, chunks) -} - fn chunks_to_array(schema: SchemaRef, uncompressed_size: usize, chunks: Vec) -> ArrayRef { let dtype = DType::from_arrow(schema.clone()); let compressed = ChunkedArray::new(chunks.clone(), dtype).into_array(); @@ -186,7 +155,8 @@ fn chunks_to_array(schema: SchemaRef, uncompressed_size: usize, chunks: Vec PathBuf { - let fname = "Medicare1_1.csv.bz2"; - download_data( - fname.to_idempotent_path(), - "http://www.cwi.nl/~boncz/PublicBIbenchmark/Medicare1/Medicare1_1.csv.bz2", - ); - decompress_bz2( - fname.to_idempotent_path(), - "Medicare1_1.csv".to_idempotent_path(), - ) -} - -pub fn medicare_data_lance() -> PathBuf { - let taxi_data = File::open(medicare_data_parquet()).unwrap(); - idempotent("medicare.lance", |path| { - Ok::(parquet_to_lance(path, taxi_data)) - }) - .unwrap() -} - -pub fn medicare_data_vortex_uncompressed() -> PathBuf { - idempotent("medicare-uncompressed.vortex", |path| { - let csv_file = File::open(medicare_data_csv()).unwrap(); - let reader = BufReader::new(csv_file.try_clone().unwrap()); - - let (schema, _) = Format::default() - .infer_schema(&mut csv_file.try_clone().unwrap(), None) - .unwrap(); - - let csv_reader = arrow::csv::ReaderBuilder::new(Arc::new(schema.clone())) - .with_batch_size(crate::reader::BATCH_SIZE) - .build(reader)?; - - let dtype = DType::from_arrow(SchemaRef::new(schema.clone())); - - let chunks = csv_reader - .map(|batch_result| batch_result.unwrap()) - .map(|record_batch| record_batch.into_array()) - .collect_vec(); - let chunked = ChunkedArray::new(chunks, dtype.clone()); - - let mut write = File::create(path).unwrap(); - let mut write_ctx = WriteCtx::new(&mut write); - write_ctx.dtype(&dtype)?; - write_ctx.write(&chunked) - }) - .unwrap() -} - -pub fn medicare_data_vortex() -> PathBuf { - idempotent("medicare.vortex", |path| { - let mut write = File::create(path).unwrap(); - let delimiter = u8::try_from('|').unwrap(); - compress_csv_to_vortex( - medicare_data_csv(), - default_csv_format().with_delimiter(delimiter), - &mut write, - ) - }) - .unwrap() -} - -pub fn medicare_data_parquet() -> PathBuf { - idempotent("medicare.parquet", |path| { - let delimiter = u8::try_from('|').unwrap(); - let format = default_csv_format().with_delimiter(delimiter); - let file = File::create(path).unwrap(); - write_csv_as_parquet(medicare_data_csv(), format, file) - }) - .unwrap() -} diff --git a/bench-vortex/src/public_bi_data.rs b/bench-vortex/src/public_bi_data.rs index a3b68a0b0e..3f184d122a 100644 --- a/bench-vortex/src/public_bi_data.rs +++ b/bench-vortex/src/public_bi_data.rs @@ -5,15 +5,21 @@ use std::os::unix::fs::MetadataExt; use std::path::PathBuf; use enum_iterator::Sequence; +use fs_extra::dir::get_size; +use humansize::{format_size, DECIMAL}; use itertools::Itertools; use log::info; use reqwest::Url; +use vortex::array::ArrayRef; use vortex::formatter::display_tree; -use crate::data_downloads::{decompress_bz2, download_data, BenchmarkDataset}; +use crate::data_downloads::{ + decompress_bz2, download_data, parquet_to_lance, BenchmarkDataset, FileType, +}; use crate::public_bi_data::PBIDataset::*; use crate::reader::{ compress_csv_to_vortex, default_csv_format, open_vortex, write_csv_as_parquet, + write_csv_to_vortex, }; use crate::{idempotent, IdempotentPath}; @@ -284,18 +290,27 @@ impl PBIDataset { url.first().unwrap().dataset_name } - fn csv_files(&self) -> Vec { + fn list_files(&self, file_type: FileType) -> Vec { let urls = URLS.get(self).unwrap(); - self.dataset_name(); - urls.iter().map(|url| self.get_csv_path(url)).collect_vec() + urls.iter() + .map(|url| self.get_file_path(url, file_type)) + .collect_vec() } - fn get_csv_path(&self, url: &PBIUrl) -> PathBuf { + fn get_file_path(&self, url: &PBIUrl, file_type: FileType) -> PathBuf { + let extension = match file_type { + FileType::Csv => "csv", + FileType::Parquet => "parquet", + FileType::Vortex => "vortex", + FileType::Lance => "lance", + }; + "PBI" .to_idempotent_path() .join(self.dataset_name()) - .join("csv") - .join(url.file_name.strip_suffix(".bz2").unwrap()) + .join(extension) + .join(url.file_name.strip_suffix(".csv.bz2").unwrap()) + .with_extension(extension) } fn download_bzip(&self) { @@ -318,7 +333,7 @@ impl PBIDataset { fn unzip(&self) { for url in URLS.get(self).unwrap() { let bzipped = self.get_bzip_path(url); - let unzipped_csv = self.get_csv_path(url); + let unzipped_csv = self.get_file_path(url, FileType::Csv); decompress_bz2(bzipped, unzipped_csv); } } @@ -403,7 +418,7 @@ pub enum BenchmarkDatasets { } impl BenchmarkDataset for BenchmarkDatasets { - fn uncompressed(&self) { + fn as_uncompressed(&self) { match self { BenchmarkDatasets::PBI(dataset) => { dataset.download_bzip(); @@ -413,7 +428,7 @@ impl BenchmarkDataset for BenchmarkDatasets { } fn write_as_parquet(&self) { - for f in self.list_files() { + for f in self.list_files(FileType::Csv) { let output_fname = f .file_name() .unwrap() @@ -436,13 +451,33 @@ impl BenchmarkDataset for BenchmarkDatasets { ) .expect("Failed to compress to parquet"); let pq_size = compressed.metadata().unwrap().size(); - info!("Parquet size: {}", pq_size); + info!( + "Parquet size: {}, {}B", + format_size(pq_size, DECIMAL), + pq_size + ); } } + /// Compresses the CSV files to Vortex format. Does NOT write any data to disk. + /// Used for benchmarking. + fn compress_to_vortex(&self) -> Vec { + self.list_files(FileType::Csv) + .into_iter() + .map(|csv_input| { + info!("Compressing {} to vortex", csv_input.to_str().unwrap()); + compress_csv_to_vortex( + csv_input, + default_csv_format().with_delimiter(u8::try_from('|').unwrap()), + ) + .1 + }) + .collect_vec() + } + fn write_as_vortex(&self) { - for f in self.list_files() { - info!("Compressing {} to vortex", f.to_str().unwrap()); + for f in self.list_files(FileType::Csv) { + info!("Compressing and writing {} to vortex", f.to_str().unwrap()); let output_fname = f .file_name() .unwrap() @@ -457,7 +492,7 @@ impl BenchmarkDataset for BenchmarkDatasets { let mut write = File::create(output_path).unwrap(); let delimiter = u8::try_from('|').unwrap(); let csv_input = f; - compress_csv_to_vortex( + write_csv_to_vortex( csv_input, default_csv_format().with_delimiter(delimiter), &mut write, @@ -468,14 +503,49 @@ impl BenchmarkDataset for BenchmarkDatasets { let from_vortex = open_vortex(&compressed).unwrap(); let vx_size = from_vortex.nbytes(); - info!("Vortex size: {}", vx_size); + info!( + "Vortex size: {}, {}B", + format_size(vx_size as u64, DECIMAL), + vx_size + ); info!("{}\n\n", display_tree(from_vortex.as_ref())); } } - fn list_files(&self) -> Vec { + fn write_as_lance(&self) { + for f in self.list_files(FileType::Csv) { + info!("Compressing {} to lance", f.to_str().unwrap()); + let output_fname = f + .file_name() + .unwrap() + .to_str() + .unwrap() + .strip_suffix(".csv") + .unwrap(); + let compressed = idempotent( + &path_for_file_type(self, output_fname, "lance"), + |output_path| { + parquet_to_lance( + output_path, + path_for_file_type(self, output_fname, "parquet").as_path(), + ) + }, + ) + .expect("Failed to compress to lance"); + + let lance_dir_bytes_exact = get_size(compressed).unwrap(); + let lance_dir_size = humansize::format_size(lance_dir_bytes_exact, DECIMAL); + + info!( + "Lance directory aggregate size: {}, {}B", + lance_dir_size, lance_dir_bytes_exact + ); + } + } + + fn list_files(&self, types: FileType) -> Vec { match self { - BenchmarkDatasets::PBI(dataset) => dataset.csv_files(), + BenchmarkDatasets::PBI(dataset) => dataset.list_files(types), } } diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 66247481b3..775747ff57 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -34,10 +34,10 @@ use vortex::serde::{ReadCtx, WriteCtx}; use vortex_error::{VortexError, VortexResult}; use vortex_schema::DType; -use crate::compress_ctx; +use crate::{chunks_to_array, compress_ctx}; pub const BATCH_SIZE: usize = 65_536; -const CSV_SCHEMA_SAMPLE_ROWS: usize = 10_000_000; +pub const CSV_SCHEMA_SAMPLE_ROWS: usize = 10_000_000; const DEFAULT_DELIMITER: u8 = b','; pub fn open_vortex(path: &Path) -> VortexResult { @@ -88,36 +88,47 @@ pub fn default_csv_format() -> Format { .with_null_regex("null".parse().unwrap()) } -pub fn compress_csv_to_vortex( - csv_path: PathBuf, - format: Format, - write: &mut W, -) -> VortexResult<()> { - let csv_file_for_schema = File::open(csv_path.clone()).unwrap(); +pub fn compress_csv_to_vortex(csv_path: PathBuf, format: Format) -> (DType, ArrayRef) { + let csv_file = File::open(csv_path.clone()).unwrap(); + let (schema, _) = format + .infer_schema( + &mut csv_file.try_clone().unwrap(), + Some(CSV_SCHEMA_SAMPLE_ROWS), + ) + .unwrap(); - // Infer the schema of the CSV file - let (schema, _) = format.infer_schema( - csv_file_for_schema.try_clone().unwrap(), - Some(CSV_SCHEMA_SAMPLE_ROWS), - )?; - let csv_file_for_read = File::open(csv_path.clone()).unwrap(); - let file_reader = BufReader::new(csv_file_for_read.try_clone().unwrap()); + let csv_file2 = File::open(csv_path.clone()).unwrap(); + let reader = BufReader::new(csv_file2.try_clone().unwrap()); - let csv_reader = csv::ReaderBuilder::new(SchemaRef::new(schema.clone())) - .with_format(format.clone()) + let csv_reader = arrow::csv::ReaderBuilder::new(Arc::new(schema.clone())) + .with_format(format) .with_batch_size(BATCH_SIZE) - .build(file_reader)?; - let dtype = DType::from_arrow(SchemaRef::from(schema.clone())); - let ctx = compress_ctx(); + .build(reader) + .unwrap(); + let ctx = compress_ctx(); + let mut uncompressed_size: usize = 0; let chunks = csv_reader + .into_iter() .map(|batch_result| batch_result.unwrap()) - .map(|record_batch| { - let vortex_array = record_batch.into_array(); - ctx.compress(&vortex_array, None).unwrap() + .map(|batch| batch.into_array()) + .map(|array| { + uncompressed_size += array.nbytes(); + ctx.clone().compress(&array, None).unwrap() }) .collect_vec(); - let chunked = ChunkedArray::new(chunks, dtype.clone()); + ( + DType::from_arrow(SchemaRef::new(schema.clone())), + chunks_to_array(SchemaRef::new(schema), uncompressed_size, chunks), + ) +} + +pub fn write_csv_to_vortex( + csv_path: PathBuf, + format: Format, + write: &mut W, +) -> VortexResult<()> { + let (dtype, chunked) = compress_csv_to_vortex(csv_path, format); let mut write_ctx = WriteCtx::new(write); write_ctx.dtype(&dtype).unwrap(); diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index 67afd149ea..bc5b703643 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -20,8 +20,7 @@ pub fn taxi_data_parquet() -> PathBuf { pub fn taxi_data_lance() -> PathBuf { idempotent("taxi_lance", |output_fname| { - let taxi_data = File::open(taxi_data_parquet()).unwrap(); - Ok::(parquet_to_lance(output_fname, taxi_data)) + parquet_to_lance(output_fname, taxi_data_parquet().as_path()) }) .unwrap() }