Skip to content

Commit

Permalink
More Compression (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Mar 8, 2024
1 parent 2976d3d commit 1da315e
Show file tree
Hide file tree
Showing 25 changed files with 337 additions and 197 deletions.
14 changes: 7 additions & 7 deletions .github/actions/setup-rust/action.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
name: 'Setup Rust'
description: 'Toolchain setup and Initial compilation'
inputs:
rust-toolchain: # id of input
description: 'Rust toolchain version to use'
required: true
default: stable
runs:
using: "composite"
steps:
- name: Rust Version
id: rust-version
shell: bash
run: echo "version=$(cat rust-toolchain.toml | grep channel | awk -F'\"' '{print $2}')" >> $GITHUB_OUTPUT

- name: Rust Toolchain Cache
id: rustup-cache
uses: actions/cache@v4
with:
path: ~/.rustup
key: "rustup-${{ runner.os }}-${{ inputs.rust-toolchain }}"
key: "rustup-${{ runner.os }}-${{ steps.rust-version.outputs.version }}"

- name: Rust Toolchain
uses: dtolnay/rust-toolchain@stable
if: steps.rustup-cache.outputs.cache-hit != 'true'
with:
toolchain: "${{ inputs.rust-toolchain }}"
toolchain: "${{ steps.rust-version.outputs.version }}"
components: clippy, rustfmt
- name: Rust Dependency Cache
uses: Swatinem/rust-cache@v2
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 5 additions & 23 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
use std::fs::{create_dir_all, File};
use std::path::Path;

use bench_vortex::compress_taxi_data;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn download_taxi_data() -> &'static Path {
let download_path = Path::new("data/yellow-tripdata-2023-11.parquet");
if download_path.exists() {
return download_path;
}

create_dir_all(download_path.parent().unwrap()).unwrap();
let mut download_file = File::create(download_path).unwrap();
reqwest::blocking::get(
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet",
)
.unwrap()
.copy_to(&mut download_file)
.unwrap();

download_path
}
use bench_vortex::{compress_taxi_data, download_taxi_data};

fn enc_compress(c: &mut Criterion) {
download_taxi_data();

c.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data())));
let mut group = c.benchmark_group("end to end");
group.sample_size(10);
group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data())));
group.finish()
}

criterion_group!(benches, enc_compress);
Expand Down
41 changes: 26 additions & 15 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use parquet::arrow::ProjectionMask;
use std::collections::HashSet;
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use vortex::array::bool::BoolEncoding;
use vortex::array::chunked::{ChunkedArray, ChunkedEncoding};
use vortex::array::constant::ConstantEncoding;
Expand Down Expand Up @@ -44,13 +45,14 @@ pub fn enumerate_arrays() -> Vec<&'static dyn Encoding> {
&ALPEncoding,
&DictEncoding,
&BitPackedEncoding,
// &DeltaEncoding,
&FoREncoding,
// &DeltaEncoding,
// &FFoREncoding,
&REEEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
//&ZigZagEncoding,
// &ZigZagEncoding,
]
}

Expand All @@ -76,11 +78,17 @@ pub fn download_taxi_data() -> PathBuf {
pub fn compress_taxi_data() -> ArrayRef {
let file = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let _mask = ProjectionMask::roots(builder.parquet_schema(), [10]);
let _mask = ProjectionMask::roots(builder.parquet_schema(), [6]);
let _no_datetime_mask = ProjectionMask::roots(
builder.parquet_schema(),
[0, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
);
let reader = builder
// .with_projection(mask)
//.with_projection(mask)
//.with_projection(no_datetime_mask)
.with_batch_size(65_536)
//.with_limit(100_000)
// .with_batch_size(5_000_000)
// .with_limit(100_000)
.build()
.unwrap();

Expand All @@ -89,30 +97,27 @@ pub fn compress_taxi_data() -> ArrayRef {
HashSet::from_iter(enumerate_arrays().iter().map(|e| (*e).id())),
HashSet::default(),
);
println!("Compression config {cfg:?}");
let ctx = CompressCtx::new(&cfg);
info!("Compression config {cfg:?}");
let ctx = CompressCtx::new(Arc::new(cfg));

let schema = reader.schema();
let mut uncompressed_size = 0;
let chunks = reader
.into_iter()
//.skip(39)
//.take(1)
.map(|batch_result| batch_result.unwrap())
.map(ArrayRef::from)
.map(|array| {
uncompressed_size += array.nbytes();
ctx.compress(array.as_ref(), None).unwrap()
ctx.clone().compress(array.as_ref(), None).unwrap()
})
.collect_vec();

let dtype: DType = schema.clone().try_into().unwrap();
let compressed = ChunkedArray::new(chunks.clone(), dtype).boxed();

info!("Compressed array {}", display_tree(compressed.as_ref()));
info!(
"NBytes {}, Ratio {}",
compressed.nbytes(),
compressed.nbytes() as f32 / uncompressed_size as f32
);

let mut field_bytes = vec![0; schema.fields().len()];
for chunk in chunks {
Expand All @@ -122,8 +127,13 @@ pub fn compress_taxi_data() -> ArrayRef {
}
}
field_bytes.iter().enumerate().for_each(|(i, &nbytes)| {
info!("{},{}", schema.field(i).name(), nbytes);
println!("{},{}", schema.field(i).name(), nbytes);
});
println!(
"NBytes {}, Ratio {}",
compressed.nbytes(),
compressed.nbytes() as f32 / uncompressed_size as f32
);

compressed
}
Expand All @@ -146,9 +156,10 @@ mod test {
.unwrap();
}

#[ignore]
#[test]
fn compression_ratio() {
setup_logger(LevelFilter::Info);
setup_logger(LevelFilter::Warn);
_ = compress_taxi_data();
}
}
3 changes: 2 additions & 1 deletion pyvortex/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use pyo3::types::PyType;
use pyo3::{pyclass, pyfunction, pymethods, Py, PyResult, Python};
use std::sync::Arc;

use vortex::compress::{CompressConfig, CompressCtx};

Expand Down Expand Up @@ -33,7 +34,7 @@ pub fn compress(
opts: Option<PyCompressConfig>,
) -> PyResult<Py<PyArray>> {
let compress_opts = opts.map(|o| o.inner).unwrap_or_default();
let ctx = CompressCtx::new(&compress_opts);
let ctx = CompressCtx::new(Arc::new(compress_opts));
let compressed = py
.allow_threads(|| ctx.compress(arr.unwrap(), None))
.map_err(PyVortexError::map_err)?;
Expand Down
1 change: 0 additions & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@
channel = "nightly"
components = ["rust-src", "rustfmt", "clippy"]
profile = "minimal"

10 changes: 6 additions & 4 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,23 @@ impl EncodingCompression for ALPEncoding {

let (exponents, encoded, patches) = match parray.ptype() {
PType::F32 => {
encode_to_array(parray.typed_data::<f32>(), like_alp.map(|a| a.exponents()))
encode_to_array(parray.typed_data::<f32>(), like_alp.map(|l| l.exponents()))
}
PType::F64 => {
encode_to_array(parray.typed_data::<f64>(), like_alp.map(|a| a.exponents()))
encode_to_array(parray.typed_data::<f64>(), like_alp.map(|l| l.exponents()))
}
_ => panic!("Unsupported ptype"),
};

let compressed_encoded = ctx
.next_level()
.named("packed")
.excluding(&ALPEncoding::ID)
.compress(encoded.as_ref(), like_alp.map(|a| a.encoded()))?;

let compressed_patches = patches
.map(|p| {
ctx.next_level()
ctx.auxiliary("patches")
.excluding(&ALPEncoding::ID)
.compress(p.as_ref(), like_alp.and_then(|a| a.patches()))
})
.transpose()?;
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl<'arr> AsRef<(dyn Array + 'arr)> for ChunkedArray {
impl ArrayDisplay for ChunkedArray {
fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result {
for (i, c) in self.chunks().iter().enumerate() {
f.child(&format!("[{}]", i), c.as_ref())?
f.new_total_size(c.nbytes(), |f| f.child(&format!("[{}]", i), c.as_ref()))?;
}
Ok(())
}
Expand Down
10 changes: 8 additions & 2 deletions vortex-array/src/array/sparse/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use crate::compress::{CompressConfig, CompressCtx, EncodingCompression};
use crate::error::VortexResult;

impl EncodingCompression for SparseEncoding {
fn cost(&self) -> u8 {
0
}

fn can_compress(
&self,
array: &dyn Array,
Expand All @@ -22,8 +26,10 @@ impl EncodingCompression for SparseEncoding {
let sparse_array = array.as_sparse();
let sparse_like = like.map(|la| la.as_sparse());
Ok(SparseArray::new(
ctx.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices()))?,
ctx.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?,
ctx.auxiliary("indices")
.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices()))?,
ctx.named("values")
.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?,
sparse_array.len(),
)
.boxed())
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/array/struct_/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ impl EncodingCompression for StructEncoding {
let like_chunk = struct_like
.and_then(|c_like| c_like.fields().get(i))
.map(Deref::deref);
ctx.compress(chunk.deref(), like_chunk)
ctx.auxiliary(&format!("[{}]", i))
.compress(chunk.deref(), like_chunk)
})
.try_collect()?;

Expand Down
4 changes: 4 additions & 0 deletions vortex-array/src/array/typed/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use crate::compress::{CompressConfig, CompressCtx, EncodingCompression};
use crate::error::VortexResult;

impl EncodingCompression for TypedEncoding {
fn cost(&self) -> u8 {
0
}

fn can_compress(
&self,
array: &dyn Array,
Expand Down
42 changes: 42 additions & 0 deletions vortex-array/src/array/varbin/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::varbin::{VarBinArray, VarBinEncoding};
use crate::array::{Array, ArrayRef};
use crate::compress::{CompressConfig, CompressCtx, EncodingCompression};
use crate::error::VortexResult;

impl EncodingCompression for VarBinEncoding {
fn cost(&self) -> u8 {
0 // We simply destructure.
}

fn can_compress(
&self,
array: &dyn Array,
_config: &CompressConfig,
) -> Option<&dyn EncodingCompression> {
(array.encoding().id() == &Self::ID).then_some(self)
}

fn compress(
&self,
array: &dyn Array,
like: Option<&dyn Array>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
let vb = array.as_varbin();
let vblike = like.map(|a| a.as_varbin());
Ok(VarBinArray::new(
ctx.auxiliary("offsets")
.compress(vb.offsets(), vblike.map(|l| l.offsets()))?,
dyn_clone::clone_box(vb.bytes()),
vb.dtype().clone(),
vb.validity()
.map(|v| {
ctx.auxiliary("validity")
.compress(v, vblike.and_then(|l| l.validity()))
})
.transpose()?,
)
.boxed())
}
}
6 changes: 6 additions & 0 deletions vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::array::{
EncodingId, EncodingRef, ENCODINGS,
};
use crate::arrow::CombineChunks;
use crate::compress::EncodingCompression;
use crate::compute::scalar_at::scalar_at;
use crate::dtype::{DType, IntWidth, Nullability, Signedness};
use crate::error::{VortexError, VortexResult};
Expand All @@ -25,6 +26,7 @@ use crate::ptype::NativePType;
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsSet};

mod compress;
mod compute;
mod serde;
mod stats;
Expand Down Expand Up @@ -323,6 +325,10 @@ impl Encoding for VarBinEncoding {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ where
for i in 0..self.len() {
let next_val = self.bytes_at(i).unwrap();
if next_val < min {
min = next_val.clone();
min.clone_from(&next_val);
}
if next_val > max {
max = next_val.clone();
max.clone_from(&next_val);
}
match next_val.cmp(&last_value) {
Ordering::Less => is_sorted = false,
Expand Down
Loading

0 comments on commit 1da315e

Please sign in to comment.