Skip to content

Commit

Permalink
Compression 2
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Mar 8, 2024
2 parents 9085962 + d4a67a1 commit 7905769
Show file tree
Hide file tree
Showing 31 changed files with 586 additions and 412 deletions.
14 changes: 7 additions & 7 deletions .github/actions/setup-rust/action.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
name: 'Setup Rust'
description: 'Toolchain setup and Initial compilation'
inputs:
rust-toolchain: # id of input
description: 'Rust toolchain version to use'
required: true
default: stable
runs:
using: "composite"
steps:
- name: Rust Version
id: rust-version
shell: bash
run: echo "version=$(cat rust-toolchain.toml | grep channel | awk -F'\"' '{print $2}')" >> $GITHUB_OUTPUT

- name: Rust Toolchain Cache
id: rustup-cache
uses: actions/cache@v4
with:
path: ~/.rustup
key: "rustup-${{ runner.os }}-${{ inputs.rust-toolchain }}"
key: "rustup-${{ runner.os }}-${{ steps.rust-version.outputs.version }}"

- name: Rust Toolchain
uses: dtolnay/rust-toolchain@stable
if: steps.rustup-cache.outputs.cache-hit != 'true'
with:
toolchain: "${{ inputs.rust-toolchain }}"
toolchain: "${{ steps.rust-version.outputs.version }}"
components: clippy, rustfmt
- name: Rust Dependency Cache
uses: Swatinem/rust-cache@v2
Expand Down
35 changes: 19 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ log = "0.4.20"
[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
simplelog = { version = "0.12.1", features = ["paris"] }
rand = "0.8.5"

[[bench]]
name = "compress_benchmark"
Expand Down
28 changes: 5 additions & 23 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
use std::fs::{create_dir_all, File};
use std::path::Path;

use bench_vortex::compress_taxi_data;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn download_taxi_data() -> &'static Path {
let download_path = Path::new("data/yellow-tripdata-2023-11.parquet");
if download_path.exists() {
return download_path;
}

create_dir_all(download_path.parent().unwrap()).unwrap();
let mut download_file = File::create(download_path).unwrap();
reqwest::blocking::get(
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet",
)
.unwrap()
.copy_to(&mut download_file)
.unwrap();

download_path
}
use bench_vortex::{compress_taxi_data, download_taxi_data};

fn enc_compress(c: &mut Criterion) {
download_taxi_data();

c.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data())));
let mut group = c.benchmark_group("end to end");
group.sample_size(10);
group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data())));
group.finish()
}

criterion_group!(benches, enc_compress);
Expand Down
43 changes: 27 additions & 16 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use parquet::arrow::ProjectionMask;
use std::collections::HashSet;
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use vortex::array::bool::BoolEncoding;
use vortex::array::chunked::{ChunkedArray, ChunkedEncoding};
use vortex::array::constant::ConstantEncoding;
Expand All @@ -23,7 +24,7 @@ use vortex::dtype::DType;
use vortex::formatter::display_tree;
use vortex_alp::ALPEncoding;
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
use vortex_ree::REEEncoding;
use vortex_roaring::RoaringBoolEncoding;

Expand All @@ -44,13 +45,14 @@ pub fn enumerate_arrays() -> Vec<&'static dyn Encoding> {
&ALPEncoding,
&DictEncoding,
&BitPackedEncoding,
&DeltaEncoding,
&FoREncoding,
// &DeltaEncoding,
// &FFoREncoding,
&REEEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
//&ZigZagEncoding,
// &ZigZagEncoding,
]
}

Expand All @@ -76,11 +78,17 @@ pub fn download_taxi_data() -> PathBuf {
pub fn compress_taxi_data() -> ArrayRef {
let file = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let _mask = ProjectionMask::roots(builder.parquet_schema(), [10]);
let _mask = ProjectionMask::roots(builder.parquet_schema(), [6]);
let _no_datetime_mask = ProjectionMask::roots(
builder.parquet_schema(),
[0, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
);
let reader = builder
// .with_projection(mask)
//.with_projection(mask)
//.with_projection(no_datetime_mask)
.with_batch_size(65_536)
//.with_limit(100_000)
// .with_batch_size(5_000_000)
// .with_limit(100_000)
.build()
.unwrap();

Expand All @@ -89,30 +97,27 @@ pub fn compress_taxi_data() -> ArrayRef {
HashSet::from_iter(enumerate_arrays().iter().map(|e| (*e).id())),
HashSet::default(),
);
println!("Compression config {cfg:?}");
let ctx = CompressCtx::new(&cfg);
info!("Compression config {cfg:?}");
let ctx = CompressCtx::new(Arc::new(cfg));

let schema = reader.schema();
let mut uncompressed_size = 0;
let chunks = reader
.into_iter()
//.skip(39)
//.take(1)
.map(|batch_result| batch_result.unwrap())
.map(ArrayRef::from)
.map(|array| {
uncompressed_size += array.nbytes();
ctx.compress(array.as_ref(), None).unwrap()
ctx.clone().compress(array.as_ref(), None).unwrap()
})
.collect_vec();

let dtype: DType = schema.clone().try_into().unwrap();
let compressed = ChunkedArray::new(chunks.clone(), dtype).boxed();

info!("Compressed array {}", display_tree(compressed.as_ref()));
info!(
"NBytes {}, Ratio {}",
compressed.nbytes(),
compressed.nbytes() as f32 / uncompressed_size as f32
);

let mut field_bytes = vec![0; schema.fields().len()];
for chunk in chunks {
Expand All @@ -122,8 +127,13 @@ pub fn compress_taxi_data() -> ArrayRef {
}
}
field_bytes.iter().enumerate().for_each(|(i, &nbytes)| {
info!("{},{}", schema.field(i).name(), nbytes);
println!("{},{}", schema.field(i).name(), nbytes);
});
println!(
"NBytes {}, Ratio {}",
compressed.nbytes(),
compressed.nbytes() as f32 / uncompressed_size as f32
);

compressed
}
Expand All @@ -146,9 +156,10 @@ mod test {
.unwrap();
}

#[ignore]
#[test]
fn compression_ratio() {
setup_logger(LevelFilter::Info);
setup_logger(LevelFilter::Warn);
_ = compress_taxi_data();
}
}
3 changes: 2 additions & 1 deletion pyvortex/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use pyo3::types::PyType;
use pyo3::{pyclass, pyfunction, pymethods, Py, PyResult, Python};
use std::sync::Arc;

use vortex::compress::{CompressConfig, CompressCtx};

Expand Down Expand Up @@ -33,7 +34,7 @@ pub fn compress(
opts: Option<PyCompressConfig>,
) -> PyResult<Py<PyArray>> {
let compress_opts = opts.map(|o| o.inner).unwrap_or_default();
let ctx = CompressCtx::new(&compress_opts);
let ctx = CompressCtx::new(Arc::new(compress_opts));
let compressed = py
.allow_threads(|| ctx.compress(arr.unwrap(), None))
.map_err(PyVortexError::map_err)?;
Expand Down
1 change: 0 additions & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@
channel = "nightly"
components = ["rust-src", "rustfmt", "clippy"]
profile = "minimal"

10 changes: 6 additions & 4 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,23 @@ impl EncodingCompression for ALPEncoding {

let (exponents, encoded, patches) = match parray.ptype() {
PType::F32 => {
encode_to_array(parray.typed_data::<f32>(), like_alp.map(|a| a.exponents()))
encode_to_array(parray.typed_data::<f32>(), like_alp.map(|l| l.exponents()))
}
PType::F64 => {
encode_to_array(parray.typed_data::<f64>(), like_alp.map(|a| a.exponents()))
encode_to_array(parray.typed_data::<f64>(), like_alp.map(|l| l.exponents()))
}
_ => panic!("Unsupported ptype"),
};

let compressed_encoded = ctx
.next_level()
.named("packed")
.excluding(&ALPEncoding::ID)
.compress(encoded.as_ref(), like_alp.map(|a| a.encoded()))?;

let compressed_patches = patches
.map(|p| {
ctx.next_level()
ctx.auxiliary("patches")
.excluding(&ALPEncoding::ID)
.compress(p.as_ref(), like_alp.and_then(|a| a.patches()))
})
.transpose()?;
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl<'arr> AsRef<(dyn Array + 'arr)> for ChunkedArray {
impl ArrayDisplay for ChunkedArray {
fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result {
for (i, c) in self.chunks().iter().enumerate() {
f.child(&format!("[{}]", i), c.as_ref())?
f.new_total_size(c.nbytes(), |f| f.child(&format!("[{}]", i), c.as_ref()))?;
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/primitive/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ fn cast<T: NativePType>(array: &PrimitiveArray) -> VortexResult<Vec<T>> {
.typed_data::<$E>()
.iter()
// TODO(ngates): allow configurable checked/unchecked casting
.map(|v| {
T::from(*v).ok_or_else(|| {
.map(|&v| {
T::from(v).ok_or_else(|| {
VortexError::ComputeError(format!("Failed to cast {} to {:?}", v, T::PTYPE).into())
})
})
Expand Down
9 changes: 1 addition & 8 deletions vortex-array/src/array/primitive/compute/scalar_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,7 @@ use crate::scalar::{NullableScalar, Scalar, ScalarRef};
impl ScalarAtFn for PrimitiveArray {
fn scalar_at(&self, index: usize) -> VortexResult<ScalarRef> {
if self.is_valid(index) {
Ok(
match_each_native_ptype!(self.ptype, |$T| self.buffer.typed_data::<$T>()
.get(index)
.unwrap()
.clone()
.into()
),
)
Ok(match_each_native_ptype!(self.ptype, |$T| self.typed_data::<$T>()[index].into()))
} else {
Ok(NullableScalar::none(self.dtype().clone()).boxed())
}
Expand Down
Loading

0 comments on commit 7905769

Please sign in to comment.