Skip to content

Commit

Permalink
Compressor recursion (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Mar 6, 2024
1 parent 5be41eb commit cf03b37
Show file tree
Hide file tree
Showing 33 changed files with 271 additions and 358 deletions.
1 change: 1 addition & 0 deletions bench-vortex/benches/compress_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fn download_taxi_data() -> &'static Path {
fn compress(array: ArrayRef) -> usize {
CompressCtx::default()
.compress(array.as_ref(), None)
.unwrap()
.nbytes()
}

Expand Down
5 changes: 4 additions & 1 deletion bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mod test {
.unwrap();
}

#[ignore]
#[test]
fn compression_ratio() {
setup_logger();
Expand Down Expand Up @@ -118,7 +119,9 @@ mod test {
HashSet::default(),
);
println!("Compression config {cfg:?}");
let compressed = CompressCtx::new(&cfg).compress(array.as_ref(), None);
let compressed = CompressCtx::new(&cfg)
.compress(array.as_ref(), None)
.unwrap();
println!("Compressed array {compressed}");
println!(
"NBytes {}, Ratio {}",
Expand Down
5 changes: 4 additions & 1 deletion pyvortex/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use pyo3::{pyclass, pyfunction, pymethods, Py, PyResult, Python};
use vortex::compress::{CompressConfig, CompressCtx};

use crate::array::PyArray;
use crate::error::PyVortexError;

#[derive(Clone)]
#[pyclass(name = "CompressConfig", module = "vortex")]
Expand Down Expand Up @@ -33,6 +34,8 @@ pub fn compress(
) -> PyResult<Py<PyArray>> {
let compress_opts = opts.map(|o| o.inner).unwrap_or_default();
let ctx = CompressCtx::new(&compress_opts);
let compressed = py.allow_threads(|| ctx.compress(arr.unwrap(), None));
let compressed = py
.allow_threads(|| ctx.compress(arr.unwrap(), None))
.map_err(PyVortexError::map_err)?;
PyArray::wrap(py, compressed)
}
20 changes: 13 additions & 7 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ impl EncodingCompression for ALPEncoding {
}
}

fn alp_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef {
fn alp_compressor(
array: &dyn Array,
like: Option<&dyn Array>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
let like_alp = like.map(|like_array| like_array.as_alp());

// TODO(ngates): fill forward nulls
Expand All @@ -49,14 +53,16 @@ fn alp_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx)

let compressed_encoded = ctx
.next_level()
.compress(encoded.as_ref(), like_alp.map(|a| a.encoded()));
.compress(encoded.as_ref(), like_alp.map(|a| a.encoded()))?;

let compressed_patches = patches.map(|p| {
ctx.next_level()
.compress(p.as_ref(), like_alp.and_then(|a| a.patches()))
});
let compressed_patches = patches
.map(|p| {
ctx.next_level()
.compress(p.as_ref(), like_alp.and_then(|a| a.patches()))
})
.transpose()?;

ALPArray::new(compressed_encoded, exponents, compressed_patches).boxed()
Ok(ALPArray::new(compressed_encoded, exponents, compressed_patches).boxed())
}

fn encode_to_array<T>(
Expand Down
23 changes: 0 additions & 23 deletions vortex-array/src/array/bool/compress.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use arrow::buffer::{BooleanBuffer, NullBuffer};
use linkme::distributed_slice;

use crate::arrow::CombineChunks;
use crate::compress::EncodingCompression;
use crate::compute::scalar_at::scalar_at;
use crate::dtype::{DType, Nullability};
use crate::error::VortexResult;
Expand All @@ -20,7 +19,6 @@ use super::{
EncodingId, EncodingRef, ENCODINGS,
};

mod compress;
mod compute;
mod serde;
mod stats;
Expand Down Expand Up @@ -166,10 +164,6 @@ impl Encoding for BoolEncoding {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
Expand Down
44 changes: 21 additions & 23 deletions vortex-array/src/array/chunked/compress.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,41 @@
use itertools::Itertools;
use std::ops::Deref;

use crate::array::chunked::{ChunkedArray, ChunkedEncoding};
use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::{Array, ArrayRef};
use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression};
use itertools::Itertools;
use crate::error::VortexResult;

impl EncodingCompression for ChunkedEncoding {
fn compressor(
&self,
array: &dyn Array,
_config: &CompressConfig,
) -> Option<&'static Compressor> {
if array.encoding().id() == &Self::ID {
Some(&(chunked_compressor as Compressor))
} else {
None
}
(array.encoding().id() == &Self::ID).then_some(&(chunked_compressor as Compressor))
}
}

fn chunked_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef {
fn chunked_compressor(
array: &dyn Array,
like: Option<&dyn Array>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
let chunked_array = array.as_chunked();
let chunked_like = like.map(|like_array| like_array.as_chunked());

let compressed_chunks = chunked_like
.map(|c_like| {
chunked_array
.chunks()
.iter()
.zip_eq(c_like.chunks())
.map(|(chunk, chunk_like)| ctx.compress(chunk.as_ref(), Some(chunk_like.as_ref())))
.collect()
let compressed_chunks = chunked_array
.chunks()
.iter()
.enumerate()
.map(|(i, chunk)| {
let like_chunk = chunked_like
.and_then(|c_like| c_like.chunks().get(i))
.map(Deref::deref);
ctx.compress(chunk.deref(), like_chunk)
})
.unwrap_or_else(|| {
chunked_array
.chunks()
.iter()
.map(|chunk| ctx.compress(chunk.as_ref(), None))
.collect()
});
.try_collect()?;

ChunkedArray::new(compressed_chunks, array.dtype().clone()).boxed()
Ok(ChunkedArray::new(compressed_chunks, array.dtype().clone()).boxed())
}
27 changes: 0 additions & 27 deletions vortex-array/src/array/constant/compress.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ use crate::array::{
ENCODINGS,
};
use crate::arrow::compute::repeat;
use crate::compress::EncodingCompression;
use crate::dtype::DType;
use crate::error::VortexResult;
use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::scalar::{Scalar, ScalarRef};
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsSet};

mod compress;
mod compute;
mod serde;
mod stats;
Expand Down Expand Up @@ -132,10 +130,6 @@ impl Encoding for ConstantEncoding {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
Expand Down
8 changes: 6 additions & 2 deletions vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,14 @@ pub trait Encoding: Debug + Send + Sync + 'static {
fn id(&self) -> &EncodingId;

/// Implementation of the array compression trait
fn compression(&self) -> Option<&dyn EncodingCompression>;
fn compression(&self) -> Option<&dyn EncodingCompression> {
None
}

/// Array serialization
fn serde(&self) -> Option<&dyn EncodingSerde>;
fn serde(&self) -> Option<&dyn EncodingSerde> {
None
}
}

pub type EncodingRef = &'static dyn Encoding;
Expand Down
44 changes: 0 additions & 44 deletions vortex-array/src/array/primitive/compress.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::array::{
EncodingId, EncodingRef, ENCODINGS,
};
use crate::arrow::CombineChunks;
use crate::compress::EncodingCompression;
use crate::compute::scalar_at::scalar_at;
use crate::dtype::DType;
use crate::error::VortexResult;
Expand All @@ -27,7 +26,6 @@ use crate::ptype::{match_each_native_ptype, NativePType, PType};
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsSet};

mod compress;
mod compute;
mod serde;
mod stats;
Expand Down Expand Up @@ -247,10 +245,6 @@ impl Encoding for PrimitiveEncoding {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
Expand Down
21 changes: 11 additions & 10 deletions vortex-array/src/array/sparse/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@ use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::sparse::{SparseArray, SparseEncoding};
use crate::array::{Array, ArrayRef};
use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression};
use crate::error::VortexResult;

impl EncodingCompression for SparseEncoding {
fn compressor(
&self,
array: &dyn Array,
_config: &CompressConfig,
) -> Option<&'static Compressor> {
if array.encoding().id() == &Self::ID {
Some(&(sparse_compressor as Compressor))
} else {
None
}
(array.encoding().id() == &Self::ID).then_some(&(sparse_compressor as Compressor))
}
}

fn sparse_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef {
fn sparse_compressor(
array: &dyn Array,
like: Option<&dyn Array>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
let sparse_array = array.as_sparse();
let sparse_like = like.map(|la| la.as_sparse());
SparseArray::new(
ctx.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices())),
ctx.compress(sparse_array.values(), sparse_like.map(|sa| sa.values())),
Ok(SparseArray::new(
ctx.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices()))?,
ctx.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?,
sparse_array.len(),
)
.boxed()
.boxed())
}
Loading

0 comments on commit cf03b37

Please sign in to comment.