Skip to content

Commit

Permalink
Rust ALP (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Mar 6, 2024
1 parent 013c43b commit 5a0c91f
Show file tree
Hide file tree
Showing 11 changed files with 627 additions and 361 deletions.
242 changes: 150 additions & 92 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 10 additions & 4 deletions vortex-alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ include = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }

[lints]
workspace = true

[dependencies]
arrow = { version = "50.0.0" }
vortex-array = { path = "../vortex-array" }
linkme = "0.3.22"
itertools = "0.12.1"
codecz = { path = "../codecz" }
num-traits = "0.2.18"
log = { version = "0.4.20", features = [] }

[lints]
workspace = true
[dev-dependencies]
divan = "0.1.14"

[[bench]]
name = "alp_compress"
harness = false
20 changes: 20 additions & 0 deletions vortex-alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use vortex::array::primitive::PrimitiveArray;
use vortex::array::ArrayRef;
use vortex_alp::{ALPArray, ALPFloat, Exponents};

fn main() {
divan::main();
}

#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])]
fn alp_compress<T: ALPFloat>(n: usize) -> (Exponents, Vec<T::ALPInt>, Vec<u64>, Vec<T>) {
let values: Vec<T> = vec![T::from(1.234).unwrap(); n];
T::encode(values.as_slice(), None)
}

// TODO(ngates): remove this
#[divan::bench(args = [100_000, 10_000_000])]
fn alp_compress_array(n: usize) -> ArrayRef {
let array = PrimitiveArray::from_vec(vec![1.234f64; n]);
ALPArray::encode(&array).unwrap()
}
332 changes: 180 additions & 152 deletions vortex-alp/src/alp.rs
Original file line number Diff line number Diff line change
@@ -1,171 +1,199 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

pub use codecz::alp::ALPExponents;
use vortex::array::{Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef};
use vortex::compress::EncodingCompression;
use vortex::dtype::{DType, IntWidth, Signedness};
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};

use crate::compress::alp_encode;

#[derive(Debug, Clone)]
pub struct ALPArray {
encoded: ArrayRef,
exponents: ALPExponents,
patches: Option<ArrayRef>,
dtype: DType,
stats: Arc<RwLock<StatsSet>>,
}

impl ALPArray {
pub fn new(encoded: ArrayRef, exponents: ALPExponents, patches: Option<ArrayRef>) -> Self {
Self::try_new(encoded, exponents, patches).unwrap()
}

pub fn try_new(
encoded: ArrayRef,
exponents: ALPExponents,
patches: Option<ArrayRef>,
) -> VortexResult<Self> {
let dtype = match encoded.dtype() {
d @ DType::Int(width, Signedness::Signed, nullability) => match width {
IntWidth::_32 => DType::Float(32.into(), *nullability),
IntWidth::_64 => DType::Float(64.into(), *nullability),
_ => return Err(VortexError::InvalidDType(d.clone())),
},
d => return Err(VortexError::InvalidDType(d.clone())),
};
Ok(Self {
encoded,
exponents,
patches,
dtype,
stats: Arc::new(RwLock::new(StatsSet::new())),
})
}

pub fn encode(array: &dyn Array) -> VortexResult<ArrayRef> {
match ArrayKind::from(array) {
ArrayKind::Primitive(p) => Ok(alp_encode(p).boxed()),
_ => Err(VortexError::InvalidEncoding(array.encoding().id().clone())),
}
}

pub fn encoded(&self) -> &dyn Array {
self.encoded.as_ref()
}
use itertools::Itertools;
use num_traits::{Float, NumCast, PrimInt, Zero};
use std::mem::size_of;

pub fn exponents(&self) -> ALPExponents {
self.exponents
}
const SAMPLE_SIZE: usize = 32;

pub fn patches(&self) -> Option<&dyn Array> {
self.patches.as_deref()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Exponents {
pub e: u8,
pub f: u8,
}

impl Array for ALPArray {
#[inline]
fn as_any(&self) -> &dyn Any {
self
}

#[inline]
fn boxed(self) -> ArrayRef {
Box::new(self)
}

#[inline]
fn into_any(self: Box<Self>) -> Box<dyn Any> {
self
}

#[inline]
fn len(&self) -> usize {
self.encoded.len()
}

#[inline]
fn is_empty(&self) -> bool {
self.encoded.is_empty()
}
pub trait ALPFloat: Float + 'static {
type ALPInt: PrimInt;

const FRACTIONAL_BITS: u8;
const MAX_EXPONENT: u8;
const SWEET: Self;
const F10: &'static [Self];
const IF10: &'static [Self];

/// Round to the nearest floating integer by shifting in and out of the low precision range.
fn fast_round(self) -> Self {
(self + Self::SWEET) - Self::SWEET
}

fn as_int(self) -> Option<Self::ALPInt> {
<Self::ALPInt as NumCast>::from(self)
}

fn find_best_exponents(values: &[Self]) -> Exponents {
let mut best_exp = Exponents { e: 0, f: 0 };
let mut best_nbytes: usize = usize::MAX;

let sample = (values.len() > SAMPLE_SIZE).then(|| {
values
.iter()
.step_by(values.len() / SAMPLE_SIZE)
.cloned()
.collect_vec()
});

// TODO(wmanning): idea, start with highest e, then find the best f
// after that, try e's in descending order, with a gap no larger than the original e - f
for e in 0..Self::MAX_EXPONENT {
for f in 0..e {
let (_, encoded, exc_pos, exc_patches) = Self::encode(
sample.as_deref().unwrap_or(values),
Some(&Exponents { e, f }),
);
let size =
(encoded.len() + exc_patches.len()) * size_of::<Self>() + (exc_pos.len() * 4);
if size < best_nbytes {
best_nbytes = size;
best_exp = Exponents { e, f };
} else if size == best_nbytes && e - f < best_exp.e - best_exp.f {
best_exp = Exponents { e, f };
}
}
}

#[inline]
fn dtype(&self) -> &DType {
&self.dtype
best_exp
}

#[inline]
fn stats(&self) -> Stats {
Stats::new(&self.stats, self)
}
fn encode(
values: &[Self],
exponents: Option<&Exponents>,
) -> (Exponents, Vec<Self::ALPInt>, Vec<u64>, Vec<Self>) {
let exp = exponents.map_or_else(|| Self::find_best_exponents(values), Exponents::clone);

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
let mut exc_pos = Vec::new();
let mut exc_value = Vec::new();
let mut prev = Self::ALPInt::zero();
let encoded = values
.iter()
.enumerate()
.map(|(i, v)| {
let encoded =
(*v * Self::F10[exp.e as usize] * Self::IF10[exp.f as usize]).fast_round();
let decoded = encoded * Self::F10[exp.f as usize] * Self::IF10[exp.e as usize];

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
Ok(Self::try_new(
self.encoded().slice(start, stop)?,
self.exponents(),
self.patches().map(|p| p.slice(start, stop)).transpose()?,
)?
.boxed())
}
if decoded == *v {
if let Some(e) = encoded.as_int() {
prev = e;
return e;
}
}

#[inline]
fn encoding(&self) -> EncodingRef {
&ALPEncoding
}
exc_pos.push(i as u64);
exc_value.push(*v);
// Emit the last known good value. This helps with run-end encoding.
prev
})
.collect_vec();

#[inline]
fn nbytes(&self) -> usize {
self.encoded().nbytes() + self.patches().map(|p| p.nbytes()).unwrap_or(0)
(exp, encoded, exc_pos, exc_value)
}

fn serde(&self) -> &dyn ArraySerde {
self
fn decode_single(encoded: Self::ALPInt, exponents: &Exponents) -> Self {
let encoded_float: Self = Self::from(encoded).unwrap();
encoded_float * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize]
}
}

impl<'arr> AsRef<(dyn Array + 'arr)> for ALPArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
}
impl ALPFloat for f32 {
type ALPInt = i32;
const FRACTIONAL_BITS: u8 = 23;
const MAX_EXPONENT: u8 = 10;
const SWEET: Self =
(1 << Self::FRACTIONAL_BITS) as Self + (1 << (Self::FRACTIONAL_BITS - 1)) as Self;

const F10: &'static [Self] = &[
1.0,
10.0,
100.0,
1000.0,
10000.0,
100000.0,
1000000.0,
10000000.0,
100000000.0,
1000000000.0,
10000000000.0,
];
const IF10: &'static [Self] = &[
1.0,
0.1,
0.01,
0.001,
0.0001,
0.00001,
0.000001,
0.0000001,
0.00000001,
0.000000001,
0.0000000001,
];
}

impl ArrayDisplay for ALPArray {
fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result {
f.writeln(format!("exponents: {}", self.exponents()))?;
if let Some(p) = self.patches() {
f.writeln("patches:")?;
f.indent(|indent| indent.array(p.as_ref()))?;
}
f.indent(|indent| indent.array(self.encoded()))
}
}

#[derive(Debug)]
pub struct ALPEncoding;

impl ALPEncoding {
pub const ID: EncodingId = EncodingId::new("vortex.alp");
}

impl Encoding for ALPEncoding {
fn id(&self) -> &EncodingId {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
impl ALPFloat for f64 {
type ALPInt = i64;
const FRACTIONAL_BITS: u8 = 52;
const MAX_EXPONENT: u8 = 18; // 10^18 is the maximum i64
const SWEET: Self =
(1u64 << Self::FRACTIONAL_BITS) as Self + (1u64 << (Self::FRACTIONAL_BITS - 1)) as Self;
const F10: &'static [Self] = &[
1.0,
10.0,
100.0,
1000.0,
10000.0,
100000.0,
1000000.0,
10000000.0,
100000000.0,
1000000000.0,
10000000000.0,
100000000000.0,
1000000000000.0,
10000000000000.0,
100000000000000.0,
1000000000000000.0,
10000000000000000.0,
100000000000000000.0,
1000000000000000000.0,
10000000000000000000.0,
100000000000000000000.0,
1000000000000000000000.0,
10000000000000000000000.0,
100000000000000000000000.0,
];

const IF10: &'static [Self] = &[
1.0,
0.1,
0.01,
0.001,
0.0001,
0.00001,
0.000001,
0.0000001,
0.00000001,
0.000000001,
0.0000000001,
0.00000000001,
0.000000000001,
0.0000000000001,
0.00000000000001,
0.000000000000001,
0.0000000000000001,
0.00000000000000001,
0.000000000000000001,
0.0000000000000000001,
0.00000000000000000001,
0.000000000000000000001,
0.0000000000000000000001,
0.00000000000000000000001,
];
}
Loading

0 comments on commit 5a0c91f

Please sign in to comment.