Skip to content

Commit

Permalink
Add Context and remove linkme (#295)
Browse files Browse the repository at this point in the history
Fixes #292
  • Loading branch information
gatesn authored May 3, 2024
1 parent 318115c commit 1a84bac
Show file tree
Hide file tree
Showing 43 changed files with 367 additions and 443 deletions.
28 changes: 0 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ humansize = "2.1.3"
itertools = "0.12.1"
lazy_static = "1.4.0"
leb128 = "0.2.5"
linkme = "0.3.25"
log = "0.4.21"
num-traits = "0.2.18"
num_enum = "0.7.2"
Expand All @@ -91,7 +90,3 @@ warnings = "deny"

[workspace.lints.clippy]
all = "deny"

[profile.dev]
# Need this for linkme crate to work. See https://github.com/dtolnay/linkme/issues/61
codegen-units = 1
1 change: 0 additions & 1 deletion bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ vortex-roaring = { path = "../vortex-roaring" }
[dev-dependencies]
criterion = { workspace = true }


[[bench]]
name = "compress_benchmark"
harness = false
Expand Down
7 changes: 3 additions & 4 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::arrow::FromArrowType;
use vortex::{IntoArray, SerdeContext, ToArrayData};
use vortex::{IntoArray, ToArrayData};
use vortex_dtype::DType;
use vortex_error::{VortexError, VortexResult};
use vortex_ipc::writer::StreamWriter;

use crate::idempotent;
use crate::reader::BATCH_SIZE;
use crate::{idempotent, CTX};

pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf {
idempotent(&fname, |path| {
Expand Down Expand Up @@ -59,9 +59,8 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
// FIXME(ngates): #157 the compressor should handle batch size.
let reader = builder.with_batch_size(BATCH_SIZE).build().unwrap();

let ctx = SerdeContext::default();
let mut write = File::create(path).unwrap();
let mut writer = StreamWriter::try_new(&mut write, ctx).unwrap();
let mut writer = StreamWriter::try_new(&mut write, &CTX).unwrap();

let dtype = DType::from_arrow(reader.schema());
writer.write_schema(&dtype).unwrap();
Expand Down
63 changes: 26 additions & 37 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@
use std::env::temp_dir;
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use arrow_array::RecordBatchReader;
use humansize::DECIMAL;
use itertools::Itertools;
use lazy_static::lazy_static;
use log::{info, LevelFilter};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::{CompressConfig, CompressCtx};
use vortex::encoding::{EncodingRef, VORTEX_ENCODINGS};
use vortex::{IntoArray, OwnedArray, ToArrayData};
use vortex::compress::Compressor;
use vortex::encoding::EncodingRef;
use vortex::{Context, IntoArray, OwnedArray, ToArrayData};
use vortex_alp::ALPEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
Expand All @@ -36,6 +36,22 @@ pub mod reader;
pub mod taxi_data;
pub mod vortex_utils;

lazy_static! {
pub static ref CTX: Context = Context::default().with_encodings([
&ALPEncoding as EncodingRef,
&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
&DateTimePartsEncoding,
// &DeltaEncoding, Blows up the search space too much.
&REEEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
// &ZigZagEncoding,
]);
}

/// Creates a file if it doesn't already exist.
/// NB: Does NOT modify the given path to ensure that it resides in the data directory.
pub fn idempotent<T, E, P: IdempotentPath + ?Sized>(
Expand Down Expand Up @@ -104,32 +120,6 @@ pub fn setup_logger(level: LevelFilter) {
.unwrap();
}

pub fn enumerate_arrays() -> Vec<EncodingRef> {
println!(
"FOUND {:?}",
VORTEX_ENCODINGS.iter().map(|e| e.id()).collect_vec()
);
vec![
&ALPEncoding,
&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
&DateTimePartsEncoding,
// &DeltaEncoding, Blows up the search space too much.
&REEEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
// &ZigZagEncoding,
]
}

pub fn compress_ctx() -> CompressCtx {
let cfg = CompressConfig::new().with_enabled(enumerate_arrays());
info!("Compression config {cfg:?}");
CompressCtx::new(Arc::new(cfg))
}

pub fn compress_taxi_data() -> OwnedArray {
let file = File::open(taxi_data_parquet()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
Expand All @@ -147,7 +137,6 @@ pub fn compress_taxi_data() -> OwnedArray {
.build()
.unwrap();

let ctx = compress_ctx();
let schema = reader.schema();
let mut uncompressed_size: usize = 0;
let chunks = reader
Expand All @@ -156,7 +145,7 @@ pub fn compress_taxi_data() -> OwnedArray {
.map(|batch| batch.to_array_data().into_array())
.map(|array| {
uncompressed_size += array.nbytes();
ctx.clone().compress(&array, None).unwrap()
Compressor::new(&CTX).compress(&array, None).unwrap()
})
.collect_vec();

Expand Down Expand Up @@ -226,13 +215,14 @@ mod test {
use log::LevelFilter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use vortex::arrow::FromArrowArray;
use vortex::compress::Compressor;
use vortex::compute::as_arrow::as_arrow;
use vortex::{ArrayData, IntoArray};
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;

use crate::taxi_data::taxi_data_parquet;
use crate::{compress_ctx, compress_taxi_data, setup_logger};
use crate::{compress_taxi_data, setup_logger, CTX};

#[ignore]
#[test]
Expand All @@ -255,12 +245,12 @@ mod test {

let mut buf = Vec::<u8>::new();
{
let mut writer = StreamWriter::try_new(&mut buf, Default::default()).unwrap();
let mut writer = StreamWriter::try_new(&mut buf, &CTX).unwrap();
writer.write_array(&vortex_array).unwrap();
}

let mut read = buf.as_slice();
let mut reader = StreamReader::try_new(&mut read).unwrap();
let mut reader = StreamReader::try_new(&mut read, &CTX).unwrap();
reader.read_array().unwrap();
}
}
Expand Down Expand Up @@ -290,13 +280,12 @@ mod test {
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let reader = builder.with_limit(1).build().unwrap();

let ctx = compress_ctx();
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();

let compressed = ctx.clone().compress(&vortex_array, None).unwrap();
let compressed = Compressor::new(&CTX).compress(&vortex_array, None).unwrap();
let compressed_as_arrow = as_arrow(&compressed).unwrap();
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
}
Expand Down
14 changes: 7 additions & 7 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::Compressor;
use vortex::compute::take::take;
use vortex::{IntoArray, OwnedArray, SerdeContext, ToArrayData, ToStatic};
use vortex::{IntoArray, OwnedArray, ToArrayData, ToStatic};
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_ipc::iter::FallibleLendingIterator;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;

use crate::compress_ctx;
use crate::CTX;

pub const BATCH_SIZE: usize = 65_536;

pub fn open_vortex(path: &Path) -> VortexResult<OwnedArray> {
let mut file = File::open(path)?;

let mut reader = StreamReader::try_new(&mut file)?;
let mut reader = StreamReader::try_new(&mut file, &CTX)?;
let mut reader = reader.next()?.unwrap();
let dtype = reader.dtype().clone();
let mut chunks = vec![];
Expand All @@ -50,26 +51,25 @@ pub fn rewrite_parquet_as_vortex<W: Write>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

let mut writer = StreamWriter::try_new(write, SerdeContext::default()).unwrap();
let mut writer = StreamWriter::try_new(write, &CTX).unwrap();
writer.write_array(&chunked.into_array()).unwrap();
Ok(())
}

pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<ChunkedArray> {
pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<ChunkedArray<'static>> {
let taxi_pq = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?;

// FIXME(ngates): #157 the compressor should handle batch size.
let reader = builder.with_batch_size(BATCH_SIZE).build()?;

let dtype = DType::from_arrow(reader.schema());
let ctx = compress_ctx();

let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| {
let vortex_array = record_batch.to_array_data().into_array();
ctx.compress(&vortex_array, None).unwrap()
Compressor::new(&CTX).compress(&vortex_array, None).unwrap()
})
.collect_vec();
ChunkedArray::try_new(chunks, dtype.clone())
Expand Down
10 changes: 0 additions & 10 deletions pyvortex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use dtype::PyDType;
use log::debug;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use vortex::encoding::VORTEX_ENCODINGS;
use vortex_dtype::{DType, PType};

use crate::array::*;
Expand All @@ -18,14 +16,6 @@ mod vortex_arrow;
fn _lib(_py: Python, m: &PyModule) -> PyResult<()> {
pyo3_log::init();

debug!(
"Discovered encodings: {:?}",
VORTEX_ENCODINGS
.iter()
.map(|e| e.id().to_string())
.collect::<Vec<String>>()
);

m.add_function(wrap_pyfunction!(encode::encode, m)?)?;
// m.add_function(wrap_pyfunction!(compress::compress, m)?)?;

Expand Down
1 change: 0 additions & 1 deletion vortex-alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ workspace = true

[dependencies]
itertools = { workspace = true }
linkme = { workspace = true }
num-traits = { workspace = true }
serde = { workspace = true, features = ["derive"] }
vortex-array = { path = "../vortex-array" }
Expand Down
4 changes: 2 additions & 2 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use itertools::Itertools;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::sparse::{Sparse, SparseArray};
use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::compress::{CompressConfig, Compressor, EncodingCompression};
use vortex::validity::Validity;
use vortex::{Array, ArrayDType, ArrayDef, AsArray, IntoArray, OwnedArray};
use vortex_dtype::{NativePType, PType};
Expand Down Expand Up @@ -48,7 +48,7 @@ impl EncodingCompression for ALPEncoding {
&self,
array: &Array,
like: Option<&Array>,
ctx: CompressCtx,
ctx: Compressor,
) -> VortexResult<Array<'static>> {
let like_alp = like.map(|like_array| like_array.as_array_ref());
let like_exponents = like
Expand Down
Loading

0 comments on commit 1a84bac

Please sign in to comment.