Skip to content

Commit

Permalink
Use vortex-buffer over bytes::Bytes (#1713)
Browse files Browse the repository at this point in the history
And also remove the assumption that VortexReadAt returns aligned memory.
  • Loading branch information
gatesn authored Dec 18, 2024
1 parent d87001c commit 071b126
Show file tree
Hide file tree
Showing 38 changed files with 134 additions and 439 deletions.
6 changes: 0 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
backtrace = { workspace = true }
bytes = { workspace = true }
enum-iterator = { workspace = true }
flatbuffers = { workspace = true }
flexbuffers = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/constant/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn canonical_byte_view(
if scalar_bytes.len() >= BinaryView::MAX_INLINED_SIZE {
buffers.push(
PrimitiveArray::new(
Buffer::from(scalar_bytes),
Buffer::from(scalar_bytes.to_vec()),
PType::U8,
Validity::NonNullable,
)
Expand Down
8 changes: 0 additions & 8 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
mod accessor;

use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, Buffer as ArrowBuffer};
use bytes::Bytes;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use vortex_buffer::Buffer;
Expand Down Expand Up @@ -86,13 +85,6 @@ impl PrimitiveArray {
Self::from_vec(elems, validity)
}

/// Creates a new array of type U8
pub fn from_bytes(bytes: Bytes, validity: Validity) -> Self {
let buffer = Buffer::from(bytes);

PrimitiveArray::new(buffer, PType::U8, validity)
}

pub fn validity(&self) -> Validity {
self.metadata().validity.to_validity(|| {
self.as_ref()
Expand Down
18 changes: 9 additions & 9 deletions vortex-array/src/array/varbin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
use arrow_buffer::NullBufferBuilder;
use bytes::BytesMut;
use num_traits::AsPrimitive;
use vortex_dtype::{DType, NativePType};
use num_traits::{AsPrimitive, PrimInt};
use vortex_buffer::Buffer;
use vortex_dtype::{DType, NativePType, PType};
use vortex_error::{vortex_panic, VortexExpect as _};

use crate::array::primitive::PrimitiveArray;
use crate::array::varbin::VarBinArray;
use crate::validity::Validity;
use crate::IntoArrayData;

pub struct VarBinBuilder<O: NativePType> {
pub struct VarBinBuilder<O> {
offsets: Vec<O>,
data: BytesMut,
data: Vec<u8>,
validity: NullBufferBuilder,
}

impl<O: NativePType> Default for VarBinBuilder<O> {
impl<O: NativePType + PrimInt> Default for VarBinBuilder<O> {
fn default() -> Self {
Self::new()
}
}

impl<O: NativePType> VarBinBuilder<O> {
impl<O: NativePType + PrimInt> VarBinBuilder<O> {
pub fn new() -> Self {
Self::with_capacity(0)
}
Expand All @@ -31,7 +31,7 @@ impl<O: NativePType> VarBinBuilder<O> {
offsets.push(O::zero());
Self {
offsets,
data: BytesMut::new(),
data: Vec::new(),
validity: NullBufferBuilder::new(len),
}
}
Expand Down Expand Up @@ -80,7 +80,7 @@ impl<O: NativePType> VarBinBuilder<O> {

pub fn finish(mut self, dtype: DType) -> VarBinArray {
let offsets = PrimitiveArray::from(self.offsets);
let data = PrimitiveArray::from_bytes(self.data.freeze(), Validity::NonNullable);
let data = PrimitiveArray::new(Buffer::from(self.data), PType::U8, Validity::NonNullable);
let nulls = self.validity.finish();

let validity = if dtype.is_nullable() {
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/array/varbin/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use itertools::Itertools;
use num_traits::{AsPrimitive, Zero};
use num_traits::{AsPrimitive, PrimInt, Zero};
use vortex_dtype::{match_each_integer_ptype, DType, NativePType};
use vortex_error::{vortex_err, vortex_panic, VortexResult};

Expand Down Expand Up @@ -54,7 +54,7 @@ fn filter_select_var_bin_by_slice_primitive_offset<O>(
selection_count: usize,
) -> VortexResult<VarBinArray>
where
O: NativePType + 'static + Zero,
O: NativePType + PrimInt + 'static + Zero,
usize: AsPrimitive<O>,
{
let logical_validity = validity.to_logical(offsets.len() - 1);
Expand Down Expand Up @@ -107,7 +107,7 @@ fn update_non_nullable_slice<O>(
start: usize,
end: usize,
) where
O: NativePType + 'static + Zero + Copy,
O: NativePType + PrimInt + 'static + Zero + Copy,
usize: AsPrimitive<O>,
{
let new_data = {
Expand Down Expand Up @@ -145,7 +145,7 @@ fn filter_select_var_bin_by_index(
}

#[allow(deprecated)]
fn filter_select_var_bin_by_index_primitive_offset<O: NativePType>(
fn filter_select_var_bin_by_index_primitive_offset<O: NativePType + PrimInt>(
dtype: DType,
offsets: &[O],
data: &[u8],
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/varbin/compute/take.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow_buffer::NullBuffer;
use num_traits::PrimInt;
use vortex_dtype::{match_each_integer_ptype, DType, NativePType};
use vortex_error::{vortex_err, vortex_panic, VortexResult};

Expand Down Expand Up @@ -29,7 +30,7 @@ impl TakeFn<VarBinArray> for VarBinEncoding {
}
}

fn take<I: NativePType, O: NativePType>(
fn take<I: NativePType, O: NativePType + PrimInt>(
dtype: DType,
offsets: &[O],
data: &[u8],
Expand Down Expand Up @@ -57,7 +58,7 @@ fn take<I: NativePType, O: NativePType>(
Ok(builder.finish(dtype))
}

fn take_nullable<I: NativePType, O: NativePType>(
fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
dtype: DType,
offsets: &[O],
data: &[u8],
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::{Debug, Display};

use num_traits::AsPrimitive;
use num_traits::{AsPrimitive, PrimInt};
use serde::{Deserialize, Serialize};
pub use stats::compute_varbin_statistics;
use vortex_buffer::Buffer;
Expand Down Expand Up @@ -150,12 +150,12 @@ impl VarBinArray {
}
}

fn from_vec_sized<K, T>(vec: Vec<T>, dtype: DType) -> Self
fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
where
K: NativePType,
O: NativePType + PrimInt,
T: AsRef<[u8]>,
{
let mut builder = VarBinBuilder::<K>::with_capacity(vec.len());
let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
for v in vec {
builder.push_value(v.as_ref());
}
Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ fn compute_min_max<T: ArrayTrait + ArrayAccessor<[u8]>>(array: &T) -> VortexResu
let minmax = array.with_iterator(|iter| match iter.flatten().minmax() {
MinMaxResult::NoElements => None,
MinMaxResult::OneElement(value) => {
let scalar = varbin_scalar(Buffer::from(value), array.dtype());
let scalar = varbin_scalar(Buffer::from(value.to_vec()), array.dtype());
Some((scalar.clone(), scalar))
}
MinMaxResult::MinMax(min, max) => Some((
varbin_scalar(Buffer::from(min), array.dtype()),
varbin_scalar(Buffer::from(max), array.dtype()),
varbin_scalar(Buffer::from(min.to_vec()), array.dtype()),
varbin_scalar(Buffer::from(max.to_vec()), array.dtype()),
)),
})?;
let Some((min, max)) = minmax else {
Expand Down
41 changes: 26 additions & 15 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ impl Buffer {
}
}

impl PartialEq for Buffer {
fn eq(&self, other: &Self) -> bool {
self.as_slice().eq(other.as_slice())
}
}

impl Eq for Buffer {}

impl PartialOrd for Buffer {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.as_slice().partial_cmp(other.as_slice())
}
}

impl Deref for Buffer {
type Target = [u8];

Expand All @@ -134,10 +148,15 @@ impl AsRef<[u8]> for Buffer {
}
}

impl From<&[u8]> for Buffer {
fn from(value: &[u8]) -> Self {
// We prefer Arrow since it retains mutability
Buffer(Inner::Arrow(ArrowBuffer::from(value)))
impl From<&'static [u8]> for Buffer {
fn from(value: &'static [u8]) -> Self {
Buffer(Inner::Bytes(bytes::Bytes::from_static(value)))
}
}

impl From<&'static str> for Buffer {
fn from(slice: &'static str) -> Buffer {
Buffer(Inner::Bytes(bytes::Bytes::from_static(slice.as_bytes())))
}
}

Expand Down Expand Up @@ -166,16 +185,8 @@ impl From<ArrowMutableBuffer> for Buffer {
}
}

impl PartialEq for Buffer {
fn eq(&self, other: &Self) -> bool {
self.as_ref().eq(other.as_ref())
}
}

impl Eq for Buffer {}

impl PartialOrd for Buffer {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.as_ref().partial_cmp(other.as_ref())
impl FromIterator<u8> for Buffer {
fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
Buffer(Inner::Arrow(ArrowBuffer::from_iter(iter)))
}
}
2 changes: 0 additions & 2 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ path = "src/lib.rs"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand All @@ -37,7 +36,6 @@ object_store = { workspace = true }
pin-project = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
vortex-array = { workspace = true }
vortex-datetime-dtype = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
Expand Down
1 change: 0 additions & 1 deletion vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ readme = "README.md"
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
bytes = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["std"] }
futures-executor = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions vortex-file/src/read/builder/initial_read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::ops::Range;

use bytes::Bytes;
use flatbuffers::{root, root_unchecked};
use vortex_buffer::Buffer;
use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap};
use vortex_flatbuffers::{dtype as fbd, footer};
use vortex_io::VortexReadAt;
Expand All @@ -12,7 +12,7 @@ use crate::{LazyDType, EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION};
pub struct InitialRead {
/// The bytes from the initial read of the file, which is assumed (for now) to be sufficiently
/// large to contain the schema and layout.
pub buf: Bytes,
pub buf: Buffer,
/// The absolute byte offset representing the start of the initial read within the file.
pub initial_read_offset: u64,
/// The byte range within `buf` representing the Postscript flatbuffer.
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn read_initial_bytes<R: VortexReadAt>(
let read_size = INITIAL_READ_SIZE.min(file_size as usize);

let initial_read_offset = file_size - read_size as u64;
let buf: Bytes = read
let buf: Buffer = read
.read_byte_range(initial_read_offset, read_size as u64)
.await?;

Expand Down
Loading

0 comments on commit 071b126

Please sign in to comment.