Skip to content

Commit

Permalink
Remove buffer -> dtype dependency (#309)
Browse files Browse the repository at this point in the history
Part of my refactor to turn vortex-buffer into our equivalent of the
"bytes" crate

TODO: remove match_integers_by_width which previously was used for
unchecked transmute
  • Loading branch information
gatesn authored May 11, 2024
1 parent f1242b5 commit fd3a72b
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 134 deletions.
49 changes: 0 additions & 49 deletions .github/workflows/bench-pr.yml

This file was deleted.

1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 32 additions & 5 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::mem::size_of;

use arrow_buffer::{ArrowNativeType, ScalarBuffer};
use itertools::Itertools;
use num_traits::AsPrimitive;
Expand Down Expand Up @@ -90,7 +92,35 @@ impl PrimitiveArray<'_> {
T::PTYPE,
self.ptype(),
);
self.buffer().typed_data::<T>()

let (prefix, offsets, suffix) = unsafe { self.buffer().as_ref().align_to::<T>() };
assert!(prefix.is_empty() && suffix.is_empty());
offsets
}

/// Convert the array into a mutable vec of the given type.
/// If possible, this will be zero-copy.
pub fn into_typed_data<T: NativePType>(self) -> Vec<T> {
assert_eq!(
T::PTYPE,
self.ptype(),
"Attempted to get typed_data of type {} from array of type {}",
T::PTYPE,
self.ptype(),
);
let bytes = self
.into_buffer()
.into_vec()
.unwrap_or_else(|b| Vec::from(b.as_ref()));

unsafe {
let mut bytes = std::mem::ManuallyDrop::new(bytes);
Vec::from_raw_parts(
bytes.as_mut_ptr() as *mut T,
bytes.len() / size_of::<T>(),
bytes.capacity() / size_of::<T>(),
)
}
}

pub fn reinterpret_cast(&self, ptype: PType) -> Self {
Expand Down Expand Up @@ -124,10 +154,7 @@ impl PrimitiveArray<'_> {

let validity = self.validity().to_static();

let mut own_values = self
.into_buffer()
.into_vec::<T>()
.unwrap_or_else(|b| Vec::from(b.typed_data::<T>()));
let mut own_values = self.into_typed_data();
// TODO(robert): Also patch validity
for (idx, value) in positions.iter().zip_eq(values.iter()) {
own_values[(*idx).as_()] = *value;
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl ScalarAtFn for VarBinArray<'_> {
Ok(varbin_scalar(
self.bytes_at(index)?
// TODO(ngates): update to use buffer when we refactor scalars.
.into_vec::<u8>()
.into_vec()
.unwrap_or_else(|b| b.as_ref().to_vec()),
self.dtype(),
))
Expand Down
1 change: 0 additions & 1 deletion vortex-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ rust-version.workspace = true
arrow-buffer = { workspace = true }
bytes = { workspace = true }
flexbuffers = { workspace = true, optional = true }
vortex-dtype = { path = "../vortex-dtype" }

[lints]
workspace = true
28 changes: 3 additions & 25 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::ops::{Deref, Range};

use arrow_buffer::Buffer as ArrowBuffer;
pub use string::*;
use vortex_dtype::{match_each_native_ptype, NativePType};

#[derive(Debug, Clone)]
pub enum Buffer {
Expand Down Expand Up @@ -42,31 +41,10 @@ impl Buffer {
}
}

pub fn typed_data<T: NativePType>(&self) -> &[T] {
pub fn into_vec(self) -> Result<Vec<u8>, Buffer> {
match self {
Buffer::Arrow(buffer) => unsafe {
match_each_native_ptype!(T::PTYPE, |$T| {
std::mem::transmute(buffer.typed_data::<$T>())
})
},
Buffer::Bytes(bytes) => {
// From ArrowBuffer::typed_data
let (prefix, offsets, suffix) = unsafe { bytes.align_to::<T>() };
assert!(prefix.is_empty() && suffix.is_empty());
offsets
}
}
}

pub fn into_vec<T: NativePType>(self) -> Result<Vec<T>, Buffer> {
match self {
Buffer::Arrow(buffer) => match_each_native_ptype!(T::PTYPE, |$T| {
buffer
.into_vec()
.map(|vec| unsafe { std::mem::transmute::<Vec<$T>, Vec<T>>(vec) })
.map_err(Buffer::Arrow)
}),
// Cannot always convert bytes into a mutable vec
Buffer::Arrow(buffer) => buffer.into_vec::<u8>().map_err(Buffer::Arrow),
// Cannot convert bytes into a mutable vec
Buffer::Bytes(_) => Err(self),
}
}
Expand Down
16 changes: 5 additions & 11 deletions vortex-dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ mod test {
fn encode_primitive() {
let arr = PrimitiveArray::from(vec![1, 1, 3, 3, 3]);
let (codes, values) = dict_encode_typed_primitive::<i32>(&arr);
assert_eq!(codes.buffer().typed_data::<u64>(), &[0, 0, 1, 1, 1]);
assert_eq!(values.buffer().typed_data::<i32>(), &[1, 3]);
assert_eq!(codes.typed_data::<u64>(), &[0, 0, 1, 1, 1]);
assert_eq!(values.typed_data::<i32>(), &[1, 3]);
}

#[test]
Expand All @@ -281,10 +281,7 @@ mod test {
None,
]);
let (codes, values) = dict_encode_typed_primitive::<i32>(&arr);
assert_eq!(
codes.buffer().typed_data::<u64>(),
&[1, 1, 0, 2, 2, 0, 2, 0]
);
assert_eq!(codes.typed_data::<u64>(), &[1, 1, 0, 2, 2, 0, 2, 0]);
assert_eq!(
scalar_at(&values.to_array(), 0).unwrap(),
Scalar::null(DType::Primitive(PType::I32, Nullable))
Expand All @@ -303,7 +300,7 @@ mod test {
fn encode_varbin() {
let arr = VarBinArray::from(vec!["hello", "world", "hello", "again", "world"]);
let (codes, values) = dict_encode_varbin(&arr);
assert_eq!(codes.buffer().typed_data::<u64>(), &[0, 1, 0, 2, 1]);
assert_eq!(codes.typed_data::<u64>(), &[0, 1, 0, 2, 1]);
values
.with_iterator(|iter| {
assert_eq!(
Expand Down Expand Up @@ -331,10 +328,7 @@ mod test {
.into_iter()
.collect();
let (codes, values) = dict_encode_varbin(&arr);
assert_eq!(
codes.buffer().typed_data::<u64>(),
&[1, 0, 2, 1, 0, 3, 2, 0]
);
assert_eq!(codes.typed_data::<u64>(), &[1, 0, 2, 1, 0, 3, 2, 0]);
assert_eq!(str::from_utf8(&values.bytes_at(0).unwrap()).unwrap(), "");
values
.with_iterator(|iter| {
Expand Down
15 changes: 15 additions & 0 deletions vortex-dtype/src/ptype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,21 @@ macro_rules! match_each_integer_ptype {
})
}

#[macro_export]
macro_rules! match_each_unsigned_integer_ptype {
($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({
macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )}
use $crate::PType;
match $self {
PType::U8 => __with__! { u8 },
PType::U16 => __with__! { u16 },
PType::U32 => __with__! { u32 },
PType::U64 => __with__! { u64 },
_ => panic!("Unsupported ptype {}", $self),
}
})
}

#[macro_export]
macro_rules! match_each_float_ptype {
($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({
Expand Down
18 changes: 10 additions & 8 deletions vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use vortex::stats::ArrayStatistics;
use vortex::validity::Validity;
use vortex::{Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, OwnedArray, ToStatic};
use vortex_dtype::PType::U8;
use vortex_dtype::{match_each_integer_ptype, NativePType, PType};
use vortex_dtype::{
match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType,
};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::{match_integers_by_width, BitPackedArray, BitPackedEncoding, OwnedBitPackedArray};
use crate::{BitPackedArray, BitPackedEncoding, OwnedBitPackedArray};

impl EncodingCompression for BitPackedEncoding {
fn cost(&self) -> u8 {
Expand Down Expand Up @@ -118,9 +120,9 @@ pub(crate) fn bitpack_encode(

pub(crate) fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> VortexResult<OwnedArray> {
// We know the min is > 0, so it's safe to re-interpret signed integers as unsigned.
// TODO(ngates): we should implement this using a vortex cast to centralize this hack.
let bytes = match_integers_by_width!(parray.ptype(), |$P| {
bitpack_primitive(parray.buffer().typed_data::<$P>(), bit_width)
let parray = parray.reinterpret_cast(parray.ptype().to_unsigned());
let bytes = match_each_unsigned_integer_ptype!(parray.ptype(), |$P| {
bitpack_primitive(parray.typed_data::<$P>(), bit_width)
});
Ok(PrimitiveArray::from(bytes).into_array())
}
Expand Down Expand Up @@ -163,7 +165,7 @@ fn bitpack_patches(
match_each_integer_ptype!(parray.ptype(), |$T| {
let mut indices: Vec<u64> = Vec::with_capacity(num_exceptions_hint);
let mut values: Vec<$T> = Vec::with_capacity(num_exceptions_hint);
for (i, v) in parray.buffer().typed_data::<$T>().iter().enumerate() {
for (i, v) in parray.typed_data::<$T>().iter().enumerate() {
if (v.leading_zeros() as usize) < parray.ptype().bit_width() - bit_width {
indices.push(i as u64);
values.push(*v);
Expand All @@ -185,7 +187,7 @@ pub fn unpack<'a>(array: BitPackedArray) -> VortexResult<PrimitiveArray<'a>> {
let encoded = cast(&array.packed(), U8.into())?.flatten_primitive()?;
let ptype: PType = array.dtype().try_into()?;

let mut unpacked = match_integers_by_width!(ptype, |$P| {
let mut unpacked = match_each_unsigned_integer_ptype!(ptype, |$P| {
PrimitiveArray::from_vec(
unpack_primitive::<$P>(encoded.typed_data::<u8>(), bit_width, offset, length),
array.validity(),
Expand Down Expand Up @@ -287,7 +289,7 @@ pub(crate) fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResul
let ptype: PType = array.dtype().try_into()?;
let index_in_encoded = index + array.offset();

let scalar: Scalar = match_integers_by_width!(ptype, |$P| {
let scalar: Scalar = match_each_unsigned_integer_ptype!(ptype, |$P| {
unsafe {
unpack_single_primitive::<$P>(encoded.typed_data::<u8>(), bit_width, index_in_encoded).map(|v| v.into())
}
Expand Down
8 changes: 4 additions & 4 deletions vortex-fastlanes/src/bitpacking/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use vortex::compute::slice::{slice, SliceFn};
use vortex::compute::take::{take, TakeFn};
use vortex::compute::ArrayCompute;
use vortex::{Array, ArrayDType, ArrayTrait, IntoArray, OwnedArray};
use vortex_dtype::{match_each_integer_ptype, NativePType};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_dtype::{match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType};
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::bitpacking::compress::unpack_single;
use crate::{match_integers_by_width, unpack_single_primitive, BitPackedArray};
use crate::{unpack_single_primitive, BitPackedArray};

mod slice;

Expand Down Expand Up @@ -66,7 +66,7 @@ impl TakeFn for BitPackedArray<'_> {
}

let indices = indices.clone().flatten_primitive()?;
let taken = match_integers_by_width!(ptype, |$T| {
let taken = match_each_unsigned_integer_ptype!(ptype, |$T| {
PrimitiveArray::from_vec(take_primitive::<$T>(self, &indices)?, taken_validity)
});
Ok(taken.reinterpret_cast(ptype).into_array())
Expand Down
16 changes: 0 additions & 16 deletions vortex-fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,6 @@ impl ArrayTrait for BitPackedArray<'_> {
}
}

#[macro_export]
macro_rules! match_integers_by_width {
($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({
macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )}
use vortex_dtype::PType;
use vortex_error::vortex_bail;
match $self {
PType::I8 | PType::U8 => __with__! { u8 },
PType::I16 | PType::U16 => __with__! { u16 },
PType::I32 | PType::U32 => __with__! { u32 },
PType::I64 | PType::U64 => __with__! { u64 },
_ => vortex_bail!("Unsupported ptype {}", $self),
}
})
}

#[cfg(test)]
mod test {
use vortex::array::primitive::PrimitiveArray;
Expand Down
4 changes: 2 additions & 2 deletions vortex-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,10 @@ mod tests {
#[test]
fn test_write_read_bitpacked() {
// NB: the order is reversed here to ensure we aren't grabbing indexes instead of values
let uncompressed = PrimitiveArray::from((0i64..3_000).rev().collect_vec());
let uncompressed = PrimitiveArray::from((0u64..3_000).rev().collect_vec());
let packed = BitPackedArray::encode(uncompressed.array(), 5).unwrap();

let expected = &[2989i64, 2988, 2987, 2986];
let expected = &[2989u64, 2988, 2987, 2986];
test_base_case(&packed.into_array(), expected, PrimitiveEncoding.id());
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-ree/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ mod test {
.unwrap();

assert_eq!(
decoded.buffer().typed_data::<i32>(),
decoded.typed_data::<i32>(),
vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3].as_slice()
);
assert_eq!(
Expand Down
14 changes: 4 additions & 10 deletions vortex-zigzag/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,10 @@ where
#[allow(dead_code)]
pub fn zigzag_decode<'a>(parray: &'a PrimitiveArray<'a>) -> PrimitiveArray<'a> {
match parray.ptype() {
PType::U8 => zigzag_decode_primitive::<i8>(parray.buffer().typed_data(), parray.validity()),
PType::U16 => {
zigzag_decode_primitive::<i16>(parray.buffer().typed_data(), parray.validity())
}
PType::U32 => {
zigzag_decode_primitive::<i32>(parray.buffer().typed_data(), parray.validity())
}
PType::U64 => {
zigzag_decode_primitive::<i64>(parray.buffer().typed_data(), parray.validity())
}
PType::U8 => zigzag_decode_primitive::<i8>(parray.typed_data(), parray.validity()),
PType::U16 => zigzag_decode_primitive::<i16>(parray.typed_data(), parray.validity()),
PType::U32 => zigzag_decode_primitive::<i32>(parray.typed_data(), parray.validity()),
PType::U64 => zigzag_decode_primitive::<i64>(parray.typed_data(), parray.validity()),
_ => panic!("Unsupported ptype {}", parray.ptype()),
}
}
Expand Down

0 comments on commit fd3a72b

Please sign in to comment.