Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove buffer -> dtype dependency #309

Merged
merged 5 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions .github/workflows/bench-pr.yml

This file was deleted.

1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 32 additions & 5 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::mem::size_of;

use arrow_buffer::{ArrowNativeType, ScalarBuffer};
use itertools::Itertools;
use num_traits::AsPrimitive;
Expand Down Expand Up @@ -90,7 +92,35 @@ impl PrimitiveArray<'_> {
T::PTYPE,
self.ptype(),
);
self.buffer().typed_data::<T>()

let (prefix, offsets, suffix) = unsafe { self.buffer().as_ref().align_to::<T>() };
assert!(prefix.is_empty() && suffix.is_empty());
offsets
}

/// Convert the array into a mutable vec of the given type.
/// If possible, this will be zero-copy.
pub fn into_typed_data<T: NativePType>(self) -> Vec<T> {
assert_eq!(
T::PTYPE,
self.ptype(),
"Attempted to get typed_data of type {} from array of type {}",
T::PTYPE,
self.ptype(),
);
let bytes = self
.into_buffer()
.into_vec()
.unwrap_or_else(|b| Vec::from(b.as_ref()));

unsafe {
let mut bytes = std::mem::ManuallyDrop::new(bytes);
Vec::from_raw_parts(
bytes.as_mut_ptr() as *mut T,
bytes.len() / size_of::<T>(),
bytes.capacity() / size_of::<T>(),
)
}
}

pub fn reinterpret_cast(&self, ptype: PType) -> Self {
Expand Down Expand Up @@ -124,10 +154,7 @@ impl PrimitiveArray<'_> {

let validity = self.validity().to_static();

let mut own_values = self
.into_buffer()
.into_vec::<T>()
.unwrap_or_else(|b| Vec::from(b.typed_data::<T>()));
let mut own_values = self.into_typed_data();
// TODO(robert): Also patch validity
for (idx, value) in positions.iter().zip_eq(values.iter()) {
own_values[(*idx).as_()] = *value;
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl ScalarAtFn for VarBinArray<'_> {
Ok(varbin_scalar(
self.bytes_at(index)?
// TODO(ngates): update to use buffer when we refactor scalars.
.into_vec::<u8>()
.into_vec()
.unwrap_or_else(|b| b.as_ref().to_vec()),
self.dtype(),
))
Expand Down
1 change: 0 additions & 1 deletion vortex-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ rust-version.workspace = true
arrow-buffer = { workspace = true }
bytes = { workspace = true }
flexbuffers = { workspace = true, optional = true }
vortex-dtype = { path = "../vortex-dtype" }

[lints]
workspace = true
28 changes: 3 additions & 25 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::ops::{Deref, Range};

use arrow_buffer::Buffer as ArrowBuffer;
pub use string::*;
use vortex_dtype::{match_each_native_ptype, NativePType};

#[derive(Debug, Clone)]
pub enum Buffer {
Expand Down Expand Up @@ -42,31 +41,10 @@ impl Buffer {
}
}

pub fn typed_data<T: NativePType>(&self) -> &[T] {
pub fn into_vec(self) -> Result<Vec<u8>, Buffer> {
match self {
Buffer::Arrow(buffer) => unsafe {
match_each_native_ptype!(T::PTYPE, |$T| {
std::mem::transmute(buffer.typed_data::<$T>())
})
},
Buffer::Bytes(bytes) => {
// From ArrowBuffer::typed_data
let (prefix, offsets, suffix) = unsafe { bytes.align_to::<T>() };
assert!(prefix.is_empty() && suffix.is_empty());
offsets
}
}
}

pub fn into_vec<T: NativePType>(self) -> Result<Vec<T>, Buffer> {
match self {
Buffer::Arrow(buffer) => match_each_native_ptype!(T::PTYPE, |$T| {
buffer
.into_vec()
.map(|vec| unsafe { std::mem::transmute::<Vec<$T>, Vec<T>>(vec) })
.map_err(Buffer::Arrow)
}),
// Cannot always convert bytes into a mutable vec
Buffer::Arrow(buffer) => buffer.into_vec::<u8>().map_err(Buffer::Arrow),
// Cannot convert bytes into a mutable vec
Buffer::Bytes(_) => Err(self),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this an Err? If you are the only one holding the bytes then this should be possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can go from Bytes back to BytesMut

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://rustasync.github.io/runtime/bytes/struct.Bytes.html#method.try_mut seems to be the right signature and then you can use that to construct vec from parts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 I missed that. I thought it seemed odd

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm no, I'm not sure what docs those are but they're not up-to-date for the bytes crate. See also tokio-rs/bytes#350

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like someone is hosting their own fork or some outdated link that google served. You’re right. Maybe we need our own version as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do anyway to have more control over alignment. This crate tries to abstract that away for now.

}
}
Expand Down
16 changes: 5 additions & 11 deletions vortex-dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ mod test {
fn encode_primitive() {
let arr = PrimitiveArray::from(vec![1, 1, 3, 3, 3]);
let (codes, values) = dict_encode_typed_primitive::<i32>(&arr);
assert_eq!(codes.buffer().typed_data::<u64>(), &[0, 0, 1, 1, 1]);
assert_eq!(values.buffer().typed_data::<i32>(), &[1, 3]);
assert_eq!(codes.typed_data::<u64>(), &[0, 0, 1, 1, 1]);
assert_eq!(values.typed_data::<i32>(), &[1, 3]);
}

#[test]
Expand All @@ -281,10 +281,7 @@ mod test {
None,
]);
let (codes, values) = dict_encode_typed_primitive::<i32>(&arr);
assert_eq!(
codes.buffer().typed_data::<u64>(),
&[1, 1, 0, 2, 2, 0, 2, 0]
);
assert_eq!(codes.typed_data::<u64>(), &[1, 1, 0, 2, 2, 0, 2, 0]);
assert_eq!(
scalar_at(&values.to_array(), 0).unwrap(),
Scalar::null(DType::Primitive(PType::I32, Nullable))
Expand All @@ -303,7 +300,7 @@ mod test {
fn encode_varbin() {
let arr = VarBinArray::from(vec!["hello", "world", "hello", "again", "world"]);
let (codes, values) = dict_encode_varbin(&arr);
assert_eq!(codes.buffer().typed_data::<u64>(), &[0, 1, 0, 2, 1]);
assert_eq!(codes.typed_data::<u64>(), &[0, 1, 0, 2, 1]);
values
.with_iterator(|iter| {
assert_eq!(
Expand Down Expand Up @@ -331,10 +328,7 @@ mod test {
.into_iter()
.collect();
let (codes, values) = dict_encode_varbin(&arr);
assert_eq!(
codes.buffer().typed_data::<u64>(),
&[1, 0, 2, 1, 0, 3, 2, 0]
);
assert_eq!(codes.typed_data::<u64>(), &[1, 0, 2, 1, 0, 3, 2, 0]);
assert_eq!(str::from_utf8(&values.bytes_at(0).unwrap()).unwrap(), "");
values
.with_iterator(|iter| {
Expand Down
15 changes: 15 additions & 0 deletions vortex-dtype/src/ptype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,21 @@ macro_rules! match_each_integer_ptype {
})
}

#[macro_export]
macro_rules! match_each_unsigned_integer_ptype {
($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({
macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )}
use $crate::PType;
match $self {
PType::U8 => __with__! { u8 },
PType::U16 => __with__! { u16 },
PType::U32 => __with__! { u32 },
PType::U64 => __with__! { u64 },
_ => panic!("Unsupported ptype {}", $self),
}
})
}

#[macro_export]
macro_rules! match_each_float_ptype {
($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({
Expand Down
18 changes: 10 additions & 8 deletions vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use vortex::stats::ArrayStatistics;
use vortex::validity::Validity;
use vortex::{Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, OwnedArray, ToStatic};
use vortex_dtype::PType::U8;
use vortex_dtype::{match_each_integer_ptype, NativePType, PType};
use vortex_dtype::{
match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType,
};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::{match_integers_by_width, BitPackedArray, BitPackedEncoding, OwnedBitPackedArray};
use crate::{BitPackedArray, BitPackedEncoding, OwnedBitPackedArray};

impl EncodingCompression for BitPackedEncoding {
fn cost(&self) -> u8 {
Expand Down Expand Up @@ -118,9 +120,9 @@ pub(crate) fn bitpack_encode(

pub(crate) fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> VortexResult<OwnedArray> {
// We know the min is > 0, so it's safe to re-interpret signed integers as unsigned.
// TODO(ngates): we should implement this using a vortex cast to centralize this hack.
let bytes = match_integers_by_width!(parray.ptype(), |$P| {
bitpack_primitive(parray.buffer().typed_data::<$P>(), bit_width)
let parray = parray.reinterpret_cast(parray.ptype().to_unsigned());
let bytes = match_each_unsigned_integer_ptype!(parray.ptype(), |$P| {
bitpack_primitive(parray.typed_data::<$P>(), bit_width)
});
Ok(PrimitiveArray::from(bytes).into_array())
}
Expand Down Expand Up @@ -163,7 +165,7 @@ fn bitpack_patches(
match_each_integer_ptype!(parray.ptype(), |$T| {
let mut indices: Vec<u64> = Vec::with_capacity(num_exceptions_hint);
let mut values: Vec<$T> = Vec::with_capacity(num_exceptions_hint);
for (i, v) in parray.buffer().typed_data::<$T>().iter().enumerate() {
for (i, v) in parray.typed_data::<$T>().iter().enumerate() {
if (v.leading_zeros() as usize) < parray.ptype().bit_width() - bit_width {
indices.push(i as u64);
values.push(*v);
Expand All @@ -185,7 +187,7 @@ pub fn unpack<'a>(array: BitPackedArray) -> VortexResult<PrimitiveArray<'a>> {
let encoded = cast(&array.packed(), U8.into())?.flatten_primitive()?;
let ptype: PType = array.dtype().try_into()?;

let mut unpacked = match_integers_by_width!(ptype, |$P| {
let mut unpacked = match_each_unsigned_integer_ptype!(ptype, |$P| {
PrimitiveArray::from_vec(
unpack_primitive::<$P>(encoded.typed_data::<u8>(), bit_width, offset, length),
array.validity(),
Expand Down Expand Up @@ -287,7 +289,7 @@ pub(crate) fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResul
let ptype: PType = array.dtype().try_into()?;
let index_in_encoded = index + array.offset();

let scalar: Scalar = match_integers_by_width!(ptype, |$P| {
let scalar: Scalar = match_each_unsigned_integer_ptype!(ptype, |$P| {
unsafe {
unpack_single_primitive::<$P>(encoded.typed_data::<u8>(), bit_width, index_in_encoded).map(|v| v.into())
}
Expand Down
8 changes: 4 additions & 4 deletions vortex-fastlanes/src/bitpacking/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use vortex::compute::slice::{slice, SliceFn};
use vortex::compute::take::{take, TakeFn};
use vortex::compute::ArrayCompute;
use vortex::{Array, ArrayDType, ArrayTrait, IntoArray, OwnedArray};
use vortex_dtype::{match_each_integer_ptype, NativePType};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_dtype::{match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType};
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::bitpacking::compress::unpack_single;
use crate::{match_integers_by_width, unpack_single_primitive, BitPackedArray};
use crate::{unpack_single_primitive, BitPackedArray};

mod slice;

Expand Down Expand Up @@ -66,7 +66,7 @@ impl TakeFn for BitPackedArray<'_> {
}

let indices = indices.clone().flatten_primitive()?;
let taken = match_integers_by_width!(ptype, |$T| {
let taken = match_each_unsigned_integer_ptype!(ptype, |$T| {
PrimitiveArray::from_vec(take_primitive::<$T>(self, &indices)?, taken_validity)
});
Ok(taken.reinterpret_cast(ptype).into_array())
Expand Down
16 changes: 0 additions & 16 deletions vortex-fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,6 @@ impl ArrayTrait for BitPackedArray<'_> {
}
}

#[macro_export]
macro_rules! match_integers_by_width {
($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({
macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )}
use vortex_dtype::PType;
use vortex_error::vortex_bail;
match $self {
PType::I8 | PType::U8 => __with__! { u8 },
PType::I16 | PType::U16 => __with__! { u16 },
PType::I32 | PType::U32 => __with__! { u32 },
PType::I64 | PType::U64 => __with__! { u64 },
_ => vortex_bail!("Unsupported ptype {}", $self),
}
})
}

#[cfg(test)]
mod test {
use vortex::array::primitive::PrimitiveArray;
Expand Down
4 changes: 2 additions & 2 deletions vortex-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,10 @@ mod tests {
#[test]
fn test_write_read_bitpacked() {
// NB: the order is reversed here to ensure we aren't grabbing indexes instead of values
let uncompressed = PrimitiveArray::from((0i64..3_000).rev().collect_vec());
let uncompressed = PrimitiveArray::from((0u64..3_000).rev().collect_vec());
let packed = BitPackedArray::encode(uncompressed.array(), 5).unwrap();

let expected = &[2989i64, 2988, 2987, 2986];
let expected = &[2989u64, 2988, 2987, 2986];
test_base_case(&packed.into_array(), expected, PrimitiveEncoding.id());
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-ree/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ mod test {
.unwrap();

assert_eq!(
decoded.buffer().typed_data::<i32>(),
decoded.typed_data::<i32>(),
vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3].as_slice()
);
assert_eq!(
Expand Down
14 changes: 4 additions & 10 deletions vortex-zigzag/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,10 @@ where
#[allow(dead_code)]
pub fn zigzag_decode<'a>(parray: &'a PrimitiveArray<'a>) -> PrimitiveArray<'a> {
match parray.ptype() {
PType::U8 => zigzag_decode_primitive::<i8>(parray.buffer().typed_data(), parray.validity()),
PType::U16 => {
zigzag_decode_primitive::<i16>(parray.buffer().typed_data(), parray.validity())
}
PType::U32 => {
zigzag_decode_primitive::<i32>(parray.buffer().typed_data(), parray.validity())
}
PType::U64 => {
zigzag_decode_primitive::<i64>(parray.buffer().typed_data(), parray.validity())
}
PType::U8 => zigzag_decode_primitive::<i8>(parray.typed_data(), parray.validity()),
PType::U16 => zigzag_decode_primitive::<i16>(parray.typed_data(), parray.validity()),
PType::U32 => zigzag_decode_primitive::<i32>(parray.typed_data(), parray.validity()),
PType::U64 => zigzag_decode_primitive::<i64>(parray.typed_data(), parray.validity()),
_ => panic!("Unsupported ptype {}", parray.ptype()),
}
}
Expand Down