Skip to content

Commit

Permalink
OwnedBuffer (#300)
Browse files Browse the repository at this point in the history
Use bytes package for zero-copy owned immutable buffers instead of a
reference to a slice
  • Loading branch information
gatesn authored May 6, 2024
1 parent 19ad296 commit 5143c3f
Show file tree
Hide file tree
Showing 21 changed files with 97 additions and 79 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ arrow-ipc = "51.0.0"
arrow-schema = "51.0.0"
arrow-select = "51.0.0"
bindgen = "0.69.4"
bytes = "1.6.0"
bzip2 = "0.4.4"
criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "1.0.1"
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl BoolArray<'_> {
validity: validity.to_metadata(buffer.len())?,
length: buffer.len(),
},
Some(Buffer::Owned(buffer.into_inner())),
Some(Buffer::from(buffer.into_inner())),
validity.into_array_data().into_iter().collect_vec().into(),
StatsSet::new(),
)?,
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/primitive/compute/as_contiguous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl AsContiguousFn for PrimitiveArray<'_> {
arrays.iter().map(|a| a.len()).sum::<usize>() * self.ptype().byte_width(),
);
for array in arrays {
buffer.extend_from_slice(array.as_primitive().buffer().as_slice())
buffer.extend_from_slice(array.as_primitive().buffer())
}
match_each_native_ptype!(self.ptype(), |$T| {
Ok(PrimitiveArray::try_new(ScalarBuffer::<$T>::from(buffer), validity)
Expand Down
10 changes: 5 additions & 5 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl PrimitiveArray<'_> {
PrimitiveMetadata {
validity: validity.to_metadata(buffer.len())?,
},
Some(Buffer::Owned(buffer.into_inner())),
Some(Buffer::from(buffer.into_inner())),
validity.into_array_data().into_iter().collect_vec().into(),
StatsSet::new(),
)?,
Expand Down Expand Up @@ -134,11 +134,11 @@ impl PrimitiveArray<'_> {
}
Ok(Self::from_vec(own_values, validity))
}
}

impl<'a> PrimitiveArray<'a> {
pub fn into_buffer(self) -> Buffer<'a> {
self.into_array().into_buffer().unwrap()
pub fn into_buffer(self) -> Buffer {
self.into_array()
.into_buffer()
.expect("PrimitiveArray must have a buffer")
}
}

Expand Down
9 changes: 7 additions & 2 deletions vortex-array/src/array/varbin/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,13 @@ impl AsArrowArray for VarBinArray<'_> {
impl ScalarAtFn for VarBinArray<'_> {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
if self.is_valid(index) {
self.bytes_at(index)
.map(|bytes| varbin_scalar(bytes, self.dtype()))
Ok(varbin_scalar(
self.bytes_at(index)?
// TODO(ngates): update to use buffer when we refactor scalars.
.into_vec::<u8>()
.unwrap_or_else(|b| b.as_ref().to_vec()),
self.dtype(),
))
} else {
Ok(Scalar::null(self.dtype()))
}
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod flatten;
mod stats;

pub use stats::compute_stats;
use vortex_buffer::Buffer;

use crate::array::primitive::PrimitiveArray;

Expand Down Expand Up @@ -147,11 +148,11 @@ impl VarBinArray<'_> {
})
}

pub fn bytes_at(&self, index: usize) -> VortexResult<Vec<u8>> {
pub fn bytes_at(&self, index: usize) -> VortexResult<Buffer> {
let start = self.offset_at(index);
let end = self.offset_at(index + 1);
let sliced = slice(&self.bytes(), start, end)?;
Ok(sliced.flatten_primitive()?.buffer().as_slice().to_vec())
Ok(sliced.flatten_primitive()?.buffer().clone())
}
}

Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::{Arc, RwLock};

use vortex_buffer::{Buffer, OwnedBuffer};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;
Expand All @@ -14,7 +14,7 @@ pub struct ArrayData {
encoding: EncodingRef,
dtype: DType, // FIXME(ngates): Arc?
metadata: Arc<dyn ArrayMetadata>,
buffer: Option<OwnedBuffer>,
buffer: Option<Buffer>,
children: Arc<[ArrayData]>,
stats_map: Arc<RwLock<StatsSet>>,
}
Expand All @@ -24,7 +24,7 @@ impl ArrayData {
encoding: EncodingRef,
dtype: DType,
metadata: Arc<dyn ArrayMetadata>,
buffer: Option<OwnedBuffer>,
buffer: Option<Buffer>,
children: Arc<[ArrayData]>,
statistics: StatsSet,
) -> VortexResult<Self> {
Expand Down Expand Up @@ -61,7 +61,7 @@ impl ArrayData {
self.buffer.as_ref()
}

pub fn into_buffer(self) -> Option<OwnedBuffer> {
pub fn into_buffer(self) -> Option<Buffer> {
self.buffer
}

Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/implementation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_buffer::{Buffer, OwnedBuffer};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexError, VortexResult};

Expand Down Expand Up @@ -221,7 +221,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat
Array::Data(d) => d,
Array::View(_) => {
struct Visitor {
buffer: Option<OwnedBuffer>,
buffer: Option<Buffer>,
children: Vec<ArrayData>,
}
impl ArrayVisitor for Visitor {
Expand All @@ -234,7 +234,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat
if self.buffer.is_some() {
vortex_bail!("Multiple buffers found in view")
}
self.buffer = Some(buffer.to_static());
self.buffer = Some(buffer.clone());
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ impl Array<'_> {
}

impl<'a> Array<'a> {
pub fn into_buffer(self) -> Option<Buffer<'a>> {
pub fn into_buffer(self) -> Option<Buffer> {
match self {
Array::Data(d) => d.into_buffer(),
Array::View(v) => v.buffer().map(|b| b.to_static()),
Array::View(v) => v.buffer().cloned(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/typed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use vortex_buffer::OwnedBuffer;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError, VortexResult};

Expand All @@ -17,7 +17,7 @@ impl<D: ArrayDef> TypedArray<'_, D> {
pub fn try_from_parts(
dtype: DType,
metadata: D::Metadata,
buffer: Option<OwnedBuffer>,
buffer: Option<Buffer>,
children: Arc<[ArrayData]>,
stats: StatsSet,
) -> VortexResult<Self> {
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct ArrayView<'v> {
encoding: EncodingRef,
dtype: &'v DType,
array: fb::Array<'v>,
buffers: &'v [Buffer<'v>],
buffers: &'v [Buffer],
ctx: &'v ViewContext,
// TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array
// metadata, but only the buffers from the selected columns. Therefore we need to know
Expand Down Expand Up @@ -131,7 +131,7 @@ impl<'v> ArrayView<'v> {
nbuffers
}

pub fn buffer(&self) -> Option<&'v Buffer<'v>> {
pub fn buffer(&self) -> Option<&'v Buffer> {
self.has_buffer().then(|| &self.buffers[0])
}

Expand Down
1 change: 1 addition & 0 deletions vortex-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust-version.workspace = true

[dependencies]
arrow-buffer = { workspace = true }
bytes = { workspace = true }
vortex-dtype = { path = "../vortex-dtype" }

[lints]
Expand Down
5 changes: 5 additions & 0 deletions vortex-buffer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Vortex Buffer

For now, a Vortex buffer is implemented as a very thin wrapper around the Tokio bytes crate.
In the future, we may re-implement this ourselves to have more control over alignment
(see https://github.com/tokio-rs/bytes/issues/437)
94 changes: 47 additions & 47 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,100 +1,100 @@
use std::ops::Deref;

use arrow_buffer::Buffer as ArrowBuffer;
use vortex_dtype::{match_each_native_ptype, NativePType};

#[derive(Debug, Clone)]
pub enum Buffer<'a> {
Owned(ArrowBuffer),
View(&'a [u8]),
pub enum Buffer {
// TODO(ngates): we could add Aligned(Arc<AVec>) from aligned-vec package
Arrow(ArrowBuffer),
Bytes(bytes::Bytes),
}

pub type OwnedBuffer = Buffer<'static>;
unsafe impl Send for Buffer {}
unsafe impl Sync for Buffer {}

impl Buffer<'_> {
impl Buffer {
pub fn len(&self) -> usize {
match self {
Buffer::Owned(buffer) => buffer.len(),
Buffer::View(slice) => slice.len(),
Buffer::Arrow(b) => b.len(),
Buffer::Bytes(b) => b.len(),
}
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn as_slice(&self) -> &[u8] {
match self {
Buffer::Owned(buffer) => buffer.as_slice(),
Buffer::View(slice) => slice,
Buffer::Arrow(b) => b.is_empty(),
Buffer::Bytes(b) => b.is_empty(),
}
}

pub fn typed_data<T: NativePType>(&self) -> &[T] {
match self {
Buffer::Owned(buffer) => unsafe {
Buffer::Arrow(buffer) => unsafe {
match_each_native_ptype!(T::PTYPE, |$T| {
std::mem::transmute(buffer.typed_data::<$T>())
})
},
Buffer::View(slice) => {
Buffer::Bytes(bytes) => {
// From ArrowBuffer::typed_data
let (prefix, offsets, suffix) = unsafe { slice.align_to::<T>() };
let (prefix, offsets, suffix) = unsafe { bytes.align_to::<T>() };
assert!(prefix.is_empty() && suffix.is_empty());
offsets
}
}
}

pub fn to_static(&self) -> OwnedBuffer {
pub fn into_vec<T: NativePType>(self) -> Result<Vec<T>, Buffer> {
match self {
Buffer::Owned(d) => Buffer::Owned(d.clone()),
Buffer::View(_) => Buffer::Owned(self.into()),
}
}
}

impl<'a> Buffer<'a> {
pub fn into_vec<T: NativePType>(self) -> Result<Vec<T>, Buffer<'a>> {
match self {
Buffer::Owned(buffer) => match_each_native_ptype!(T::PTYPE, |$T| {
Buffer::Arrow(buffer) => match_each_native_ptype!(T::PTYPE, |$T| {
buffer
.into_vec()
.map(|vec| unsafe { std::mem::transmute::<Vec<$T>, Vec<T>>(vec) })
.map_err(Buffer::Owned)
.map_err(Buffer::Arrow)
}),
Buffer::View(_) => Err(self),
// Cannot always convert bytes into a mutable vec
Buffer::Bytes(_) => Err(self),
}
}
}

impl From<ArrowBuffer> for OwnedBuffer {
fn from(value: ArrowBuffer) -> Self {
Buffer::Owned(value)
impl Deref for Buffer {
type Target = [u8];

fn deref(&self) -> &Self::Target {
match self {
Buffer::Arrow(b) => b.deref(),
Buffer::Bytes(b) => b.deref(),
}
}
}

impl From<Buffer<'_>> for ArrowBuffer {
fn from(value: Buffer<'_>) -> Self {
match value {
Buffer::Owned(b) => b,
Buffer::View(_) => ArrowBuffer::from(&value),
impl AsRef<[u8]> for Buffer {
fn as_ref(&self) -> &[u8] {
match self {
Buffer::Arrow(b) => b.as_ref(),
Buffer::Bytes(b) => b.as_ref(),
}
}
}

impl From<&Buffer<'_>> for ArrowBuffer {
fn from(value: &Buffer<'_>) -> Self {
match value {
Buffer::Owned(b) => b.clone(),
// FIXME(ngates): this conversion loses alignment information since go via u8.
Buffer::View(v) => ArrowBuffer::from_vec(v.to_vec()),
}
impl From<Vec<u8>> for Buffer {
fn from(value: Vec<u8>) -> Self {
// We prefer Arrow since it retains mutability
Buffer::Arrow(ArrowBuffer::from_vec(value))
}
}

impl From<ArrowBuffer> for Buffer {
fn from(value: ArrowBuffer) -> Self {
Buffer::Arrow(value)
}
}

impl PartialEq for Buffer<'_> {
impl PartialEq for Buffer {
fn eq(&self, other: &Self) -> bool {
self.as_slice().eq(other.as_slice())
self.as_ref().eq(other.as_ref())
}
}

impl Eq for Buffer<'_> {}
impl Eq for Buffer {}
2 changes: 1 addition & 1 deletion vortex-dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ mod test {
codes.buffer().typed_data::<u64>(),
&[1, 0, 2, 1, 0, 3, 2, 0]
);
assert_eq!(String::from_utf8(values.bytes_at(0).unwrap()).unwrap(), "");
assert_eq!(str::from_utf8(&values.bytes_at(0).unwrap()).unwrap(), "");
values
.with_iterator(|iter| {
assert_eq!(
Expand Down
Loading

0 comments on commit 5143c3f

Please sign in to comment.