Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust ALP #72

Merged
merged 12 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 150 additions & 92 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 10 additions & 4 deletions vortex-alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ include = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }

[lints]
workspace = true

[dependencies]
arrow = { version = "50.0.0" }
vortex-array = { path = "../vortex-array" }
linkme = "0.3.22"
itertools = "0.12.1"
codecz = { path = "../codecz" }
num-traits = "0.2.18"
log = { version = "0.4.20", features = [] }

[lints]
workspace = true
[dev-dependencies]
divan = "0.1.14"

[[bench]]
name = "alp_compress"
harness = false
20 changes: 20 additions & 0 deletions vortex-alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use vortex::array::primitive::PrimitiveArray;
use vortex::array::ArrayRef;
use vortex_alp::{ALPArray, ALPFloat, Exponents};

fn main() {
divan::main();
}

#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])]
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
fn alp_compress<T: ALPFloat>(n: usize) -> (Exponents, Vec<T::ALPInt>, Vec<u64>, Vec<T>) {
let values: Vec<T> = vec![T::from(1.234).unwrap(); n];
T::encode(values.as_slice(), None)
}

// TODO(ngates): remove this
#[divan::bench(args = [100_000, 10_000_000])]
fn alp_compress_array(n: usize) -> ArrayRef {
let array = PrimitiveArray::from_vec(vec![1.234f64; n]);
ALPArray::encode(&array).unwrap()
}
332 changes: 180 additions & 152 deletions vortex-alp/src/alp.rs
Original file line number Diff line number Diff line change
@@ -1,171 +1,199 @@
use std::any::Any;
use std::sync::{Arc, RwLock};

pub use codecz::alp::ALPExponents;
use vortex::array::{Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef};
use vortex::compress::EncodingCompression;
use vortex::dtype::{DType, IntWidth, Signedness};
use vortex::error::{VortexError, VortexResult};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};

use crate::compress::alp_encode;

#[derive(Debug, Clone)]
pub struct ALPArray {
encoded: ArrayRef,
exponents: ALPExponents,
patches: Option<ArrayRef>,
dtype: DType,
stats: Arc<RwLock<StatsSet>>,
}

impl ALPArray {
pub fn new(encoded: ArrayRef, exponents: ALPExponents, patches: Option<ArrayRef>) -> Self {
Self::try_new(encoded, exponents, patches).unwrap()
}

pub fn try_new(
encoded: ArrayRef,
exponents: ALPExponents,
patches: Option<ArrayRef>,
) -> VortexResult<Self> {
let dtype = match encoded.dtype() {
d @ DType::Int(width, Signedness::Signed, nullability) => match width {
IntWidth::_32 => DType::Float(32.into(), *nullability),
IntWidth::_64 => DType::Float(64.into(), *nullability),
_ => return Err(VortexError::InvalidDType(d.clone())),
},
d => return Err(VortexError::InvalidDType(d.clone())),
};
Ok(Self {
encoded,
exponents,
patches,
dtype,
stats: Arc::new(RwLock::new(StatsSet::new())),
})
}

pub fn encode(array: &dyn Array) -> VortexResult<ArrayRef> {
match ArrayKind::from(array) {
ArrayKind::Primitive(p) => Ok(alp_encode(p).boxed()),
_ => Err(VortexError::InvalidEncoding(array.encoding().id().clone())),
}
}

pub fn encoded(&self) -> &dyn Array {
self.encoded.as_ref()
}
use itertools::Itertools;
use num_traits::{Float, NumCast, PrimInt, Zero};
use std::mem::size_of;

pub fn exponents(&self) -> ALPExponents {
self.exponents
}
const SAMPLE_SIZE: usize = 32;

pub fn patches(&self) -> Option<&dyn Array> {
self.patches.as_deref()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Exponents {
pub e: u8,
pub f: u8,
}

impl Array for ALPArray {
#[inline]
fn as_any(&self) -> &dyn Any {
self
}

#[inline]
fn boxed(self) -> ArrayRef {
Box::new(self)
}

#[inline]
fn into_any(self: Box<Self>) -> Box<dyn Any> {
self
}

#[inline]
fn len(&self) -> usize {
self.encoded.len()
}

#[inline]
fn is_empty(&self) -> bool {
self.encoded.is_empty()
}
pub trait ALPFloat: Float + 'static {
type ALPInt: PrimInt;

const FRACTIONAL_BITS: u8;
const MAX_EXPONENT: u8;
const SWEET: Self;
const F10: &'static [Self];
const IF10: &'static [Self];

/// Round to the nearest floating integer by shifting in and out of the low precision range.
fn fast_round(self) -> Self {
(self + Self::SWEET) - Self::SWEET
}

fn as_int(self) -> Option<Self::ALPInt> {
<Self::ALPInt as NumCast>::from(self)
}

fn find_best_exponents(values: &[Self]) -> Exponents {
let mut best_exp = Exponents { e: 0, f: 0 };
let mut best_nbytes: usize = usize::MAX;

let sample = (values.len() > SAMPLE_SIZE).then(|| {
values
.iter()
.step_by(values.len() / SAMPLE_SIZE)
.cloned()
.collect_vec()
});

// TODO(wmanning): idea, start with highest e, then find the best f
// after that, try e's in descending order, with a gap no larger than the original e - f
for e in 0..Self::MAX_EXPONENT {
for f in 0..e {
let (_, encoded, exc_pos, exc_patches) = Self::encode(
sample.as_deref().unwrap_or(values),
Some(&Exponents { e, f }),
);
let size =
(encoded.len() + exc_patches.len()) * size_of::<Self>() + (exc_pos.len() * 4);
if size < best_nbytes {
best_nbytes = size;
best_exp = Exponents { e, f };
} else if size == best_nbytes && e - f < best_exp.e - best_exp.f {
best_exp = Exponents { e, f };
}
}
}

#[inline]
fn dtype(&self) -> &DType {
&self.dtype
best_exp
}

#[inline]
fn stats(&self) -> Stats {
Stats::new(&self.stats, self)
}
fn encode(
values: &[Self],
exponents: Option<&Exponents>,
) -> (Exponents, Vec<Self::ALPInt>, Vec<u64>, Vec<Self>) {
let exp = exponents.map_or_else(|| Self::find_best_exponents(values), Exponents::clone);

fn iter_arrow(&self) -> Box<ArrowIterator> {
todo!()
}
let mut exc_pos = Vec::new();
let mut exc_value = Vec::new();
let mut prev = Self::ALPInt::zero();
let encoded = values
.iter()
.enumerate()
.map(|(i, v)| {
let encoded =
(*v * Self::F10[exp.e as usize] * Self::IF10[exp.f as usize]).fast_round();
let decoded = encoded * Self::F10[exp.f as usize] * Self::IF10[exp.e as usize];

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
Ok(Self::try_new(
self.encoded().slice(start, stop)?,
self.exponents(),
self.patches().map(|p| p.slice(start, stop)).transpose()?,
)?
.boxed())
}
if decoded == *v {
if let Some(e) = encoded.as_int() {
prev = e;
return e;
}
}

#[inline]
fn encoding(&self) -> EncodingRef {
&ALPEncoding
}
exc_pos.push(i as u64);
exc_value.push(*v);
// Emit the last known good value. This helps with run-end encoding.
prev
})
.collect_vec();

#[inline]
fn nbytes(&self) -> usize {
self.encoded().nbytes() + self.patches().map(|p| p.nbytes()).unwrap_or(0)
(exp, encoded, exc_pos, exc_value)
}

fn serde(&self) -> &dyn ArraySerde {
self
fn decode_single(encoded: Self::ALPInt, exponents: &Exponents) -> Self {
let encoded_float: Self = Self::from(encoded).unwrap();
encoded_float * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize]
}
}

impl<'arr> AsRef<(dyn Array + 'arr)> for ALPArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
}
impl ALPFloat for f32 {
type ALPInt = i32;
const FRACTIONAL_BITS: u8 = 23;
const MAX_EXPONENT: u8 = 10;
const SWEET: Self =
(1 << Self::FRACTIONAL_BITS) as Self + (1 << (Self::FRACTIONAL_BITS - 1)) as Self;

const F10: &'static [Self] = &[
1.0,
10.0,
100.0,
1000.0,
10000.0,
100000.0,
1000000.0,
10000000.0,
100000000.0,
1000000000.0,
10000000000.0,
];
const IF10: &'static [Self] = &[
1.0,
0.1,
0.01,
0.001,
0.0001,
0.00001,
0.000001,
0.0000001,
0.00000001,
0.000000001,
0.0000000001,
];
}

impl ArrayDisplay for ALPArray {
fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result {
f.writeln(format!("exponents: {}", self.exponents()))?;
if let Some(p) = self.patches() {
f.writeln("patches:")?;
f.indent(|indent| indent.array(p.as_ref()))?;
}
f.indent(|indent| indent.array(self.encoded()))
}
}

#[derive(Debug)]
pub struct ALPEncoding;

impl ALPEncoding {
pub const ID: EncodingId = EncodingId::new("vortex.alp");
}

impl Encoding for ALPEncoding {
fn id(&self) -> &EncodingId {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
impl ALPFloat for f64 {
type ALPInt = i64;
const FRACTIONAL_BITS: u8 = 52;
const MAX_EXPONENT: u8 = 18; // 10^18 is the maximum i64
const SWEET: Self =
(1u64 << Self::FRACTIONAL_BITS) as Self + (1u64 << (Self::FRACTIONAL_BITS - 1)) as Self;
const F10: &'static [Self] = &[
1.0,
10.0,
100.0,
1000.0,
10000.0,
100000.0,
1000000.0,
10000000.0,
100000000.0,
1000000000.0,
10000000000.0,
100000000000.0,
1000000000000.0,
10000000000000.0,
100000000000000.0,
1000000000000000.0,
10000000000000000.0,
100000000000000000.0,
1000000000000000000.0,
10000000000000000000.0,
100000000000000000000.0,
1000000000000000000000.0,
10000000000000000000000.0,
100000000000000000000000.0,
];

const IF10: &'static [Self] = &[
1.0,
0.1,
0.01,
0.001,
0.0001,
0.00001,
0.000001,
0.0000001,
0.00000001,
0.000000001,
0.0000000001,
0.00000000001,
0.000000000001,
0.0000000000001,
0.00000000000001,
0.000000000000001,
0.0000000000000001,
0.00000000000000001,
0.000000000000000001,
0.0000000000000000001,
0.00000000000000000001,
0.000000000000000000001,
0.0000000000000000000001,
0.00000000000000000000001,
];
}
Loading
Loading