Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OwnedBuffer #300

Merged
merged 7 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ arrow-ipc = "51.0.0"
arrow-schema = "51.0.0"
arrow-select = "51.0.0"
bindgen = "0.69.4"
bytes = "1.6.0"
bzip2 = "0.4.4"
criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "1.0.1"
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl BoolArray<'_> {
validity: validity.to_metadata(buffer.len())?,
length: buffer.len(),
},
Some(Buffer::Owned(buffer.into_inner())),
Some(Buffer::from(buffer.into_inner())),
validity.into_array_data().into_iter().collect_vec().into(),
StatsSet::new(),
)?,
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/primitive/compute/as_contiguous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl AsContiguousFn for PrimitiveArray<'_> {
arrays.iter().map(|a| a.len()).sum::<usize>() * self.ptype().byte_width(),
);
for array in arrays {
buffer.extend_from_slice(array.as_primitive().buffer().as_slice())
buffer.extend_from_slice(array.as_primitive().buffer().as_ref())
}
match_each_native_ptype!(self.ptype(), |$T| {
Ok(PrimitiveArray::try_new(ScalarBuffer::<$T>::from(buffer), validity)
Expand Down
10 changes: 5 additions & 5 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl PrimitiveArray<'_> {
PrimitiveMetadata {
validity: validity.to_metadata(buffer.len())?,
},
Some(Buffer::Owned(buffer.into_inner())),
Some(Buffer::from(buffer.into_inner())),
validity.into_array_data().into_iter().collect_vec().into(),
StatsSet::new(),
)?,
Expand Down Expand Up @@ -134,11 +134,11 @@ impl PrimitiveArray<'_> {
}
Ok(Self::from_vec(own_values, validity))
}
}

impl<'a> PrimitiveArray<'a> {
pub fn into_buffer(self) -> Buffer<'a> {
self.into_array().into_buffer().unwrap()
pub fn into_buffer(self) -> Buffer {
self.into_array()
.into_buffer()
.expect("PrimitiveArray must have a buffer")
}
}

Expand Down
9 changes: 7 additions & 2 deletions vortex-array/src/array/varbin/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,13 @@ impl AsArrowArray for VarBinArray<'_> {
impl ScalarAtFn for VarBinArray<'_> {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
if self.is_valid(index) {
self.bytes_at(index)
.map(|bytes| varbin_scalar(bytes, self.dtype()))
Ok(varbin_scalar(
self.bytes_at(index)?
// TODO(ngates): update to use buffer when we refactor scalars.
.into_vec::<u8>()
.unwrap_or_else(|b| b.as_ref().to_vec()),
self.dtype(),
))
} else {
Ok(Scalar::null(self.dtype()))
}
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod flatten;
mod stats;

pub use stats::compute_stats;
use vortex_buffer::Buffer;

use crate::array::primitive::PrimitiveArray;

Expand Down Expand Up @@ -147,11 +148,11 @@ impl VarBinArray<'_> {
})
}

pub fn bytes_at(&self, index: usize) -> VortexResult<Vec<u8>> {
pub fn bytes_at(&self, index: usize) -> VortexResult<Buffer> {
let start = self.offset_at(index);
let end = self.offset_at(index + 1);
let sliced = slice(&self.bytes(), start, end)?;
Ok(sliced.flatten_primitive()?.buffer().as_slice().to_vec())
Ok(sliced.flatten_primitive()?.buffer().clone())
}
}

Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::{Arc, RwLock};

use vortex_buffer::{Buffer, OwnedBuffer};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;
Expand All @@ -14,7 +14,7 @@ pub struct ArrayData {
encoding: EncodingRef,
dtype: DType, // FIXME(ngates): Arc?
metadata: Arc<dyn ArrayMetadata>,
buffer: Option<OwnedBuffer>,
buffer: Option<Buffer>,
children: Arc<[ArrayData]>,
stats_map: Arc<RwLock<StatsSet>>,
}
Expand All @@ -24,7 +24,7 @@ impl ArrayData {
encoding: EncodingRef,
dtype: DType,
metadata: Arc<dyn ArrayMetadata>,
buffer: Option<OwnedBuffer>,
buffer: Option<Buffer>,
children: Arc<[ArrayData]>,
statistics: StatsSet,
) -> VortexResult<Self> {
Expand Down Expand Up @@ -61,7 +61,7 @@ impl ArrayData {
self.buffer.as_ref()
}

pub fn into_buffer(self) -> Option<OwnedBuffer> {
pub fn into_buffer(self) -> Option<Buffer> {
self.buffer
}

Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/implementation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_buffer::{Buffer, OwnedBuffer};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexError, VortexResult};

Expand Down Expand Up @@ -221,7 +221,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat
Array::Data(d) => d,
Array::View(_) => {
struct Visitor {
buffer: Option<OwnedBuffer>,
buffer: Option<Buffer>,
children: Vec<ArrayData>,
}
impl ArrayVisitor for Visitor {
Expand All @@ -234,7 +234,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat
if self.buffer.is_some() {
vortex_bail!("Multiple buffers found in view")
}
self.buffer = Some(buffer.to_static());
self.buffer = Some(buffer.clone());
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ impl Array<'_> {
}

impl<'a> Array<'a> {
pub fn into_buffer(self) -> Option<Buffer<'a>> {
pub fn into_buffer(self) -> Option<Buffer> {
match self {
Array::Data(d) => d.into_buffer(),
Array::View(v) => v.buffer().map(|b| b.to_static()),
Array::View(v) => v.buffer().cloned(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/typed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use vortex_buffer::OwnedBuffer;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError, VortexResult};

Expand All @@ -17,7 +17,7 @@ impl<D: ArrayDef> TypedArray<'_, D> {
pub fn try_from_parts(
dtype: DType,
metadata: D::Metadata,
buffer: Option<OwnedBuffer>,
buffer: Option<Buffer>,
children: Arc<[ArrayData]>,
stats: StatsSet,
) -> VortexResult<Self> {
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct ArrayView<'v> {
encoding: EncodingRef,
dtype: &'v DType,
array: fb::Array<'v>,
buffers: &'v [Buffer<'v>],
buffers: &'v [Buffer],
ctx: &'v ViewContext,
// TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array
// metadata, but only the buffers from the selected columns. Therefore we need to know
Expand Down Expand Up @@ -131,7 +131,7 @@ impl<'v> ArrayView<'v> {
nbuffers
}

pub fn buffer(&self) -> Option<&'v Buffer<'v>> {
pub fn buffer(&self) -> Option<&'v Buffer> {
self.has_buffer().then(|| &self.buffers[0])
}

Expand Down
1 change: 1 addition & 0 deletions vortex-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust-version.workspace = true

[dependencies]
arrow-buffer = { workspace = true }
bytes = { workspace = true }
vortex-dtype = { path = "../vortex-dtype" }

[lints]
Expand Down
5 changes: 5 additions & 0 deletions vortex-buffer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Vortex Buffer

For now, a Vortex buffer is implemented as a very thin wrapper around the Tokio bytes crate.
In the future, we may re-implement this ourselves to have more control over alignment
(see https://github.com/tokio-rs/bytes/issues/437)
83 changes: 35 additions & 48 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,99 +2,86 @@ use arrow_buffer::Buffer as ArrowBuffer;
use vortex_dtype::{match_each_native_ptype, NativePType};

#[derive(Debug, Clone)]
pub enum Buffer<'a> {
Owned(ArrowBuffer),
View(&'a [u8]),
pub enum Buffer {
// TODO(ngates): we could add Aligned(Arc<AVec>) from aligned-vec package
Arrow(ArrowBuffer),
Bytes(bytes::Bytes),
}

pub type OwnedBuffer = Buffer<'static>;
unsafe impl Send for Buffer {}
unsafe impl Sync for Buffer {}

impl Buffer<'_> {
impl Buffer {
pub fn len(&self) -> usize {
match self {
Buffer::Owned(buffer) => buffer.len(),
Buffer::View(slice) => slice.len(),
Buffer::Arrow(b) => b.len(),
Buffer::Bytes(b) => b.len(),
}
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn as_slice(&self) -> &[u8] {
match self {
Buffer::Owned(buffer) => buffer.as_slice(),
Buffer::View(slice) => slice,
Buffer::Arrow(b) => b.is_empty(),
Buffer::Bytes(b) => b.is_empty(),
}
}

pub fn typed_data<T: NativePType>(&self) -> &[T] {
match self {
Buffer::Owned(buffer) => unsafe {
Buffer::Arrow(buffer) => unsafe {
match_each_native_ptype!(T::PTYPE, |$T| {
std::mem::transmute(buffer.typed_data::<$T>())
})
},
Buffer::View(slice) => {
Buffer::Bytes(bytes) => {
// From ArrowBuffer::typed_data
let (prefix, offsets, suffix) = unsafe { slice.align_to::<T>() };
let (prefix, offsets, suffix) = unsafe { bytes.align_to::<T>() };
assert!(prefix.is_empty() && suffix.is_empty());
offsets
}
}
}

pub fn to_static(&self) -> OwnedBuffer {
match self {
Buffer::Owned(d) => Buffer::Owned(d.clone()),
Buffer::View(_) => Buffer::Owned(self.into()),
}
}
}

impl<'a> Buffer<'a> {
pub fn into_vec<T: NativePType>(self) -> Result<Vec<T>, Buffer<'a>> {
pub fn into_vec<T: NativePType>(self) -> Result<Vec<T>, Buffer> {
match self {
Buffer::Owned(buffer) => match_each_native_ptype!(T::PTYPE, |$T| {
Buffer::Arrow(buffer) => match_each_native_ptype!(T::PTYPE, |$T| {
buffer
.into_vec()
.map(|vec| unsafe { std::mem::transmute::<Vec<$T>, Vec<T>>(vec) })
.map_err(Buffer::Owned)
.map_err(Buffer::Arrow)
}),
Buffer::View(_) => Err(self),
// Cannot always convert bytes into a mutable vec
Buffer::Bytes(_) => Err(self),
}
}
}

impl From<ArrowBuffer> for OwnedBuffer {
fn from(value: ArrowBuffer) -> Self {
Buffer::Owned(value)
impl AsRef<[u8]> for Buffer {
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
fn as_ref(&self) -> &[u8] {
match self {
Buffer::Arrow(b) => b.as_ref(),
Buffer::Bytes(b) => b.as_ref(),
}
}
}

impl From<Buffer<'_>> for ArrowBuffer {
fn from(value: Buffer<'_>) -> Self {
match value {
Buffer::Owned(b) => b,
Buffer::View(_) => ArrowBuffer::from(&value),
}
impl From<Vec<u8>> for Buffer {
fn from(value: Vec<u8>) -> Self {
// We prefer Arrow since it retains mutability
Buffer::Arrow(ArrowBuffer::from_vec(value))
}
}

impl From<&Buffer<'_>> for ArrowBuffer {
fn from(value: &Buffer<'_>) -> Self {
match value {
Buffer::Owned(b) => b.clone(),
// FIXME(ngates): this conversion loses alignment information since go via u8.
Buffer::View(v) => ArrowBuffer::from_vec(v.to_vec()),
}
impl From<ArrowBuffer> for Buffer {
fn from(value: ArrowBuffer) -> Self {
Buffer::Arrow(value)
}
}

impl PartialEq for Buffer<'_> {
impl PartialEq for Buffer {
fn eq(&self, other: &Self) -> bool {
self.as_slice().eq(other.as_slice())
self.as_ref().eq(other.as_ref())
}
}

impl Eq for Buffer<'_> {}
impl Eq for Buffer {}
5 changes: 4 additions & 1 deletion vortex-dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ mod test {
codes.buffer().typed_data::<u64>(),
&[1, 0, 2, 1, 0, 3, 2, 0]
);
assert_eq!(String::from_utf8(values.bytes_at(0).unwrap()).unwrap(), "");
assert_eq!(
str::from_utf8(values.bytes_at(0).unwrap().as_ref()).unwrap(),
""
);
values
.with_iterator(|iter| {
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions vortex-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub struct StreamArrayReader<'a, R: Read> {
read: &'a mut R,
messages: &'a mut StreamMessageReader<R>,
dtype: DType,
buffers: Vec<Buffer<'a>>,
buffers: Vec<Buffer>,
row_offset: usize,
}

Expand Down Expand Up @@ -263,7 +263,7 @@ impl<'iter, R: Read> FallibleLendingIterator for StreamArrayReader<'iter, R> {
let mut bytes = Vec::with_capacity(buffer.length() as usize);
self.read.read_into(buffer.length(), &mut bytes)?;
let arrow_buffer = ArrowBuffer::from_vec(bytes);
self.buffers.push(Buffer::Owned(arrow_buffer));
self.buffers.push(Buffer::from(arrow_buffer));

offset = buffer.offset() + buffer.length();
}
Expand Down
Loading