Skip to content

Commit

Permalink
Refactor (#237)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Kruszewski <[email protected]>
Co-authored-by: Josh Casale <[email protected]>
  • Loading branch information
3 people authored Apr 23, 2024
1 parent 1175d8a commit 88d40c1
Show file tree
Hide file tree
Showing 283 changed files with 5,257 additions and 14,672 deletions.
300 changes: 138 additions & 162 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ members = [
"vortex-alloc",
"vortex-alp",
"vortex-array",
"vortex-array2",
"vortex-datetime",
"vortex-datetime-parts",
"vortex-dict",
"vortex-error",
"vortex-fastlanes",
Expand All @@ -17,7 +16,7 @@ members = [
"vortex-ree",
"vortex-roaring",
"vortex-schema",
"vortex-zigzag",
#"vortex-zigzag",
]
resolver = "2"

Expand All @@ -31,7 +30,7 @@ keywords = ["vortex"]
include = [
"benches/*.rs",
"src/**/*.rs",
"Carsgo.toml",
"Cargo.toml",
]
edition = "2021"
rust-version = "1.76"
Expand Down
10 changes: 5 additions & 5 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ rust-version = { workspace = true }
workspace = true

[dependencies]
#vortex-alp = { path = "../vortex-alp" }
vortex-roaring = { path = "../vortex-roaring" }
#vortex-zigzag = { path = "../vortex-zigzag" }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-data = { workspace = true }
arrow-csv = { workspace = true }
arrow-data = { workspace = true }
arrow-select = { workspace = true }
bzip2 = { workspace = true }
csv = { workspace = true }
Expand All @@ -39,16 +42,13 @@ tokio = { workspace = true }
uuid = { workspace = true }
vortex-alp = { path = "../vortex-alp" }
vortex-array = { path = "../vortex-array" }
vortex-array2 = { path = "../vortex-array2" }
vortex-datetime = { path = "../vortex-datetime" }
vortex-datetime-parts = { path = "../vortex-datetime-parts" }
vortex-dict = { path = "../vortex-dict" }
vortex-error = { path = "../vortex-error", features = ["parquet"] }
vortex-fastlanes = { path = "../vortex-fastlanes" }
vortex-ipc = { path = "../vortex-ipc" }
vortex-ree = { path = "../vortex-ree" }
vortex-roaring = { path = "../vortex-roaring" }
vortex-schema = { path = "../vortex-schema" }
vortex-zigzag = { path = "../vortex-zigzag" }

[dev-dependencies]
criterion = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use bench_vortex::reader::{open_vortex, rewrite_parquet_as_vortex};
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{setup_logger, IdempotentPath};
use log::{info, LevelFilter};
use vortex::array::Array;

pub fn main() {
setup_logger(LevelFilter::Info);
compress_pbi(PBIDataset::Medicare1);
// compress_pbi(PBIDataset::Medicare1);
compress_taxi();
}

Expand All @@ -25,6 +24,7 @@ fn compress_taxi() {
}

let taxi_vortex = open_vortex(&path).unwrap();
info!("{}", taxi_vortex.tree_display());

let pq_size = taxi_data_parquet().metadata().unwrap().size();
let vx_size = taxi_vortex.nbytes();
Expand All @@ -33,6 +33,7 @@ fn compress_taxi() {
info!("Compression ratio: {}", vx_size as f32 / pq_size as f32);
}

#[allow(dead_code)]
fn compress_pbi(which_pbi: PBIDataset) {
let dataset = PBI(which_pbi);
dataset.write_as_vortex();
Expand Down
29 changes: 14 additions & 15 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ use std::path::{Path, PathBuf};

use arrow_array::RecordBatchReader;
use bzip2::read::BzDecoder;
use itertools::Itertools;
use lance::dataset::WriteParams;
use lance::Dataset;
use lance_parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder as LanceParquetRecordBatchReaderBuilder;
use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::array::IntoArray;
use vortex::arrow::FromArrowType;
use vortex::serde::WriteCtx;
use vortex::{IntoArray, SerdeContext, ToArrayData};
use vortex_error::{VortexError, VortexResult};
use vortex_ipc::writer::StreamWriter;
use vortex_schema::DType;

use crate::idempotent;
Expand All @@ -37,7 +35,7 @@ pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf {

pub fn parquet_to_lance(lance_fname: &Path, parquet_file: &Path) -> VortexResult<PathBuf> {
let write_params = WriteParams::default();
let read = File::open(parquet_file).unwrap();
let read = File::open(parquet_file)?;
let reader = LanceParquetRecordBatchReaderBuilder::try_new(read)
.unwrap()
.build()
Expand All @@ -62,18 +60,19 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
// FIXME(ngates): #157 the compressor should handle batch size.
let reader = builder.with_batch_size(BATCH_SIZE).build().unwrap();

let dtype = DType::from_arrow(reader.schema());
let ctx = SerdeContext::default();
let mut write = File::create(path).unwrap();
let mut writer = StreamWriter::try_new(&mut write, ctx).unwrap();

let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| record_batch.into_array())
.collect_vec();
let chunked = ChunkedArray::new(chunks, dtype.clone());
let dtype = DType::from_arrow(reader.schema());
writer.write_schema(&dtype).unwrap();
for batch_result in reader {
writer
.write_batch(&batch_result.unwrap().to_array_data().into_array())
.unwrap();
}

let mut write = File::create(path).unwrap();
let mut write_ctx = WriteCtx::new(&mut write);
write_ctx.dtype(&dtype)?;
write_ctx.write(&chunked)
Ok::<(), VortexError>(())
})
.unwrap()
}
Expand Down
55 changes: 31 additions & 24 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use vortex::array::chunked::ChunkedArray;
use vortex::array::IntoArray;
use vortex::array::{Array, ArrayRef};
use vortex::arrow::FromArrowType;
use vortex::compress::{CompressConfig, CompressCtx};
use vortex::encoding::{EncodingRef, ENCODINGS};
use vortex::encoding::{EncodingRef, VORTEX_ENCODINGS};
use vortex::{IntoArray, OwnedArray, ToArrayData};
use vortex_alp::ALPEncoding;
use vortex_datetime::DateTimeEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
use vortex_ree::REEEncoding;
Expand Down Expand Up @@ -104,17 +103,20 @@ pub fn setup_logger(level: LevelFilter) {
}

pub fn enumerate_arrays() -> Vec<EncodingRef> {
println!("FOUND {:?}", ENCODINGS.iter().map(|e| e.id()).collect_vec());
println!(
"FOUND {:?}",
VORTEX_ENCODINGS.iter().map(|e| e.id()).collect_vec()
);
vec![
&ALPEncoding,
&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
&DateTimeEncoding,
&DateTimePartsEncoding,
// &DeltaEncoding, Blows up the search space too much.
&REEEncoding,
&RoaringBoolEncoding,
// RoaringIntEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
// ZigZagEncoding,
]
Expand All @@ -126,10 +128,10 @@ pub fn compress_ctx() -> CompressCtx {
CompressCtx::new(Arc::new(cfg))
}

pub fn compress_taxi_data() -> ArrayRef {
pub fn compress_taxi_data() -> OwnedArray {
let file = File::open(taxi_data_parquet()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let _mask = ProjectionMask::roots(builder.parquet_schema(), [1]);
let _mask = ProjectionMask::roots(builder.parquet_schema(), [6]);
let _no_datetime_mask = ProjectionMask::roots(
builder.parquet_schema(),
[0, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
Expand All @@ -149,14 +151,16 @@ pub fn compress_taxi_data() -> ArrayRef {
let chunks = reader
.into_iter()
.map(|batch_result| batch_result.unwrap())
.map(|batch| batch.into_array())
.map(|batch| batch.to_array_data().into_array())
.map(|array| {
uncompressed_size += array.nbytes();
ctx.clone().compress(&array, None).unwrap()
})
.collect_vec();

let compressed = ChunkedArray::new(chunks.clone(), DType::from_arrow(schema)).into_array();
let compressed = ChunkedArray::try_new(chunks.clone(), DType::from_arrow(schema))
.unwrap()
.into_array();

info!(
"{}, Bytes: {}, Ratio {}",
Expand Down Expand Up @@ -219,10 +223,11 @@ mod test {
use arrow_array::{ArrayRef as ArrowArrayRef, StructArray as ArrowStructArray};
use log::LevelFilter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use vortex::array::ArrayRef;
use vortex::arrow::FromArrowArray;
use vortex::compute::as_arrow::as_arrow;
use vortex::encode::FromArrowArray;
use vortex::serde::{ReadCtx, WriteCtx};
use vortex::{ArrayData, IntoArray};
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;

use crate::taxi_data::taxi_data_parquet;
use crate::{compress_ctx, compress_taxi_data, setup_logger};
Expand All @@ -244,15 +249,17 @@ mod test {
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayRef::from_arrow(arrow_array.clone(), false);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();

let mut buf = Vec::<u8>::new();
let mut write_ctx = WriteCtx::new(&mut buf);
write_ctx.write(vortex_array.as_ref()).unwrap();
{
let mut writer = StreamWriter::try_new(&mut buf, Default::default()).unwrap();
writer.write_array(&vortex_array).unwrap();
}

let mut read = buf.as_slice();
let mut read_ctx = ReadCtx::new(vortex_array.dtype(), &mut read);
read_ctx.read().unwrap();
let mut reader = StreamReader::try_new(&mut read).unwrap();
reader.read_array().unwrap();
}
}

Expand All @@ -266,8 +273,8 @@ mod test {
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayRef::from_arrow(arrow_array.clone(), false);
let vortex_as_arrow = as_arrow(vortex_array.as_ref()).unwrap();
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();
let vortex_as_arrow = as_arrow(&vortex_array).unwrap();
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
}
}
Expand All @@ -285,10 +292,10 @@ mod test {
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayRef::from_arrow(arrow_array.clone(), false);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();

let compressed = ctx.clone().compress(vortex_array.as_ref(), None).unwrap();
let compressed_as_arrow = as_arrow(compressed.as_ref()).unwrap();
let compressed = ctx.clone().compress(&vortex_array, None).unwrap();
let compressed_as_arrow = as_arrow(&compressed).unwrap();
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
}
}
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/public_bi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use humansize::{format_size, DECIMAL};
use itertools::Itertools;
use log::info;
use reqwest::Url;
use vortex::array::Array;
use vortex::ArrayTrait;
use vortex_error::VortexResult;

use crate::data_downloads::{
Expand Down
37 changes: 20 additions & 17 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,30 @@ use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef, IntoArray};
use vortex::arrow::FromArrowType;
use vortex::compute::flatten::flatten;
use vortex::compute::take::take;
use vortex::ptype::PType;
use vortex::serde::{ReadCtx, WriteCtx};
use vortex::{IntoArray, OwnedArray, SerdeContext, ToArrayData, ToStatic};
use vortex_error::VortexResult;
use vortex_ipc::iter::FallibleLendingIterator;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;
use vortex_schema::DType;

use crate::compress_ctx;

pub const BATCH_SIZE: usize = 65_536;

pub fn open_vortex(path: &Path) -> VortexResult<ArrayRef> {
pub fn open_vortex(path: &Path) -> VortexResult<OwnedArray> {
let mut file = File::open(path)?;
let dummy_dtype: DType = PType::U8.into();
let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file);
let dtype = read_ctx.dtype()?;
read_ctx.with_schema(&dtype).read()

let mut reader = StreamReader::try_new(&mut file).unwrap();
let mut reader = reader.next()?.unwrap();
let dtype = reader.dtype().clone();
let mut chunks = vec![];
while let Some(chunk) = reader.next()? {
chunks.push(chunk.to_static())
}
Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
}

pub fn rewrite_parquet_as_vortex<W: Write>(
Expand All @@ -47,9 +51,8 @@ pub fn rewrite_parquet_as_vortex<W: Write>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

let mut write_ctx = WriteCtx::new(write);
write_ctx.dtype(chunked.dtype()).unwrap();
write_ctx.write(&chunked).unwrap();
let mut writer = StreamWriter::try_new(write, SerdeContext::default()).unwrap();
writer.write_array(&chunked.into_array()).unwrap();
Ok(())
}

Expand All @@ -66,7 +69,7 @@ pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<ChunkedAr
let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| {
let vortex_array = record_batch.into_array();
let vortex_array = record_batch.to_array_data().into_array();
ctx.compress(&vortex_array, None).unwrap()
})
.collect_vec();
Expand Down Expand Up @@ -94,11 +97,11 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu
Ok(())
}

pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult<ArrayRef> {
pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult<OwnedArray> {
let array = open_vortex(path)?;
let taken = take(&array, &PrimitiveArray::from(indices.to_vec()))?;
let taken = take(&array, &indices.to_vec().into_array())?;
// For equivalence.... we flatten to make sure we're not cheating too much.
flatten(&taken).map(|x| x.into_array())
taken.flatten().map(|x| x.into_array())
}

pub fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult<RecordBatch> {
Expand Down
Loading

0 comments on commit 88d40c1

Please sign in to comment.