Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use vortex-buffer over bytes::Bytes #1713

Merged
merged 43 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ff69266
Message cleanup
gatesn Dec 14, 2024
d1cd362
Move expr
gatesn Dec 14, 2024
54ddfd8
Clean up
gatesn Dec 14, 2024
b368ec0
Clean up
gatesn Dec 14, 2024
34edbf5
Clean up
gatesn Dec 14, 2024
4d02906
message reader
gatesn Dec 14, 2024
cc65637
Message codec
gatesn Dec 16, 2024
21ce1b9
Message codec
gatesn Dec 16, 2024
831d41e
Message codec
gatesn Dec 16, 2024
0010f00
Message codec
gatesn Dec 16, 2024
8c55100
Message codec
gatesn Dec 16, 2024
29a78a1
Message codec
gatesn Dec 16, 2024
bbda9e6
Message Codecs
gatesn Dec 16, 2024
5a7c8ab
Message Codecs
gatesn Dec 16, 2024
75fd1e0
Message Codecs
gatesn Dec 16, 2024
0c333b9
merge
gatesn Dec 16, 2024
92a7d6e
merge
gatesn Dec 16, 2024
b474b4b
Merge
gatesn Dec 16, 2024
7e1c80f
Merge
gatesn Dec 16, 2024
92062d8
Fix docs
gatesn Dec 16, 2024
87ba232
Assert alignment
gatesn Dec 16, 2024
f4d6fdf
Merge branch 'develop' into ngates/message-reader
gatesn Dec 17, 2024
1b285e6
Address comments
gatesn Dec 17, 2024
e532fa6
Address comments
gatesn Dec 17, 2024
ca4e632
Address comments
gatesn Dec 17, 2024
d7c38be
Address comments
gatesn Dec 17, 2024
fe46f61
Address comments
gatesn Dec 17, 2024
80c6fcc
Address comments
gatesn Dec 17, 2024
20f8b18
Address comments
gatesn Dec 17, 2024
78d9f0d
Fix decoder
gatesn Dec 18, 2024
94ae305
Fix decoder
gatesn Dec 18, 2024
5b21ae7
Remove unpin bounds
gatesn Dec 18, 2024
c5277d0
Remove unpin bounds
gatesn Dec 18, 2024
24648c9
Remove bytes
gatesn Dec 18, 2024
6313dca
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
2040b0d
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
641d1c7
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
5633dd3
Merge branch 'develop' into ngates/message-reader
gatesn Dec 18, 2024
e73ef79
Merge branch 'ngates/message-reader' into ngates/read-at-alignment
gatesn Dec 18, 2024
bc128b0
Some fixes
gatesn Dec 18, 2024
124441a
Some fixes
gatesn Dec 18, 2024
e77ff30
Buffer alignment
gatesn Dec 18, 2024
ddbd714
Buffer alignment
gatesn Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
backtrace = { workspace = true }
bytes = { workspace = true }
enum-iterator = { workspace = true }
flatbuffers = { workspace = true }
flexbuffers = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/constant/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn canonical_byte_view(
if scalar_bytes.len() >= BinaryView::MAX_INLINED_SIZE {
buffers.push(
PrimitiveArray::new(
Buffer::from(scalar_bytes),
Buffer::from(scalar_bytes.to_vec()),
PType::U8,
Validity::NonNullable,
)
Expand Down
8 changes: 0 additions & 8 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
mod accessor;

use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, Buffer as ArrowBuffer};
use bytes::Bytes;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use vortex_buffer::Buffer;
Expand Down Expand Up @@ -86,13 +85,6 @@ impl PrimitiveArray {
Self::from_vec(elems, validity)
}

/// Creates a new array of type U8
pub fn from_bytes(bytes: Bytes, validity: Validity) -> Self {
let buffer = Buffer::from(bytes);

PrimitiveArray::new(buffer, PType::U8, validity)
}

pub fn validity(&self) -> Validity {
self.metadata().validity.to_validity(|| {
self.as_ref()
Expand Down
18 changes: 9 additions & 9 deletions vortex-array/src/array/varbin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
use arrow_buffer::NullBufferBuilder;
use bytes::BytesMut;
use num_traits::AsPrimitive;
use vortex_dtype::{DType, NativePType};
use num_traits::{AsPrimitive, PrimInt};
use vortex_buffer::Buffer;
use vortex_dtype::{DType, NativePType, PType};
use vortex_error::{vortex_panic, VortexExpect as _};

use crate::array::primitive::PrimitiveArray;
use crate::array::varbin::VarBinArray;
use crate::validity::Validity;
use crate::IntoArrayData;

pub struct VarBinBuilder<O: NativePType> {
pub struct VarBinBuilder<O> {
offsets: Vec<O>,
data: BytesMut,
data: Vec<u8>,
validity: NullBufferBuilder,
}

impl<O: NativePType> Default for VarBinBuilder<O> {
impl<O: NativePType + PrimInt> Default for VarBinBuilder<O> {
fn default() -> Self {
Self::new()
}
}

impl<O: NativePType> VarBinBuilder<O> {
impl<O: NativePType + PrimInt> VarBinBuilder<O> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this ever anything except u8...?

pub fn new() -> Self {
Self::with_capacity(0)
}
Expand All @@ -31,7 +31,7 @@ impl<O: NativePType> VarBinBuilder<O> {
offsets.push(O::zero());
Self {
offsets,
data: BytesMut::new(),
data: Vec::new(),
validity: NullBufferBuilder::new(len),
}
}
Expand Down Expand Up @@ -80,7 +80,7 @@ impl<O: NativePType> VarBinBuilder<O> {

pub fn finish(mut self, dtype: DType) -> VarBinArray {
let offsets = PrimitiveArray::from(self.offsets);
let data = PrimitiveArray::from_bytes(self.data.freeze(), Validity::NonNullable);
let data = PrimitiveArray::new(Buffer::from(self.data), PType::U8, Validity::NonNullable);
let nulls = self.validity.finish();

let validity = if dtype.is_nullable() {
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/array/varbin/compute/filter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use itertools::Itertools;
use num_traits::{AsPrimitive, Zero};
use num_traits::{AsPrimitive, PrimInt, Zero};
use vortex_dtype::{match_each_integer_ptype, DType, NativePType};
use vortex_error::{vortex_err, vortex_panic, VortexResult};

Expand Down Expand Up @@ -54,7 +54,7 @@ fn filter_select_var_bin_by_slice_primitive_offset<O>(
selection_count: usize,
) -> VortexResult<VarBinArray>
where
O: NativePType + 'static + Zero,
O: NativePType + PrimInt + 'static + Zero,
usize: AsPrimitive<O>,
{
let logical_validity = validity.to_logical(offsets.len() - 1);
Expand Down Expand Up @@ -107,7 +107,7 @@ fn update_non_nullable_slice<O>(
start: usize,
end: usize,
) where
O: NativePType + 'static + Zero + Copy,
O: NativePType + PrimInt + 'static + Zero + Copy,
usize: AsPrimitive<O>,
{
let new_data = {
Expand Down Expand Up @@ -145,7 +145,7 @@ fn filter_select_var_bin_by_index(
}

#[allow(deprecated)]
fn filter_select_var_bin_by_index_primitive_offset<O: NativePType>(
fn filter_select_var_bin_by_index_primitive_offset<O: NativePType + PrimInt>(
dtype: DType,
offsets: &[O],
data: &[u8],
Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/varbin/compute/take.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow_buffer::NullBuffer;
use num_traits::PrimInt;
use vortex_dtype::{match_each_integer_ptype, DType, NativePType};
use vortex_error::{vortex_err, vortex_panic, VortexResult};

Expand Down Expand Up @@ -29,7 +30,7 @@ impl TakeFn<VarBinArray> for VarBinEncoding {
}
}

fn take<I: NativePType, O: NativePType>(
fn take<I: NativePType, O: NativePType + PrimInt>(
dtype: DType,
offsets: &[O],
data: &[u8],
Expand Down Expand Up @@ -57,7 +58,7 @@ fn take<I: NativePType, O: NativePType>(
Ok(builder.finish(dtype))
}

fn take_nullable<I: NativePType, O: NativePType>(
fn take_nullable<I: NativePType, O: NativePType + PrimInt>(
dtype: DType,
offsets: &[O],
data: &[u8],
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::{Debug, Display};

use num_traits::AsPrimitive;
use num_traits::{AsPrimitive, PrimInt};
use serde::{Deserialize, Serialize};
pub use stats::compute_varbin_statistics;
use vortex_buffer::Buffer;
Expand Down Expand Up @@ -150,12 +150,12 @@ impl VarBinArray {
}
}

fn from_vec_sized<K, T>(vec: Vec<T>, dtype: DType) -> Self
fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
where
K: NativePType,
O: NativePType + PrimInt,
T: AsRef<[u8]>,
{
let mut builder = VarBinBuilder::<K>::with_capacity(vec.len());
let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
for v in vec {
builder.push_value(v.as_ref());
}
Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ fn compute_min_max<T: ArrayTrait + ArrayAccessor<[u8]>>(array: &T) -> VortexResu
let minmax = array.with_iterator(|iter| match iter.flatten().minmax() {
MinMaxResult::NoElements => None,
MinMaxResult::OneElement(value) => {
let scalar = varbin_scalar(Buffer::from(value), array.dtype());
let scalar = varbin_scalar(Buffer::from(value.to_vec()), array.dtype());
Some((scalar.clone(), scalar))
}
MinMaxResult::MinMax(min, max) => Some((
varbin_scalar(Buffer::from(min), array.dtype()),
varbin_scalar(Buffer::from(max), array.dtype()),
varbin_scalar(Buffer::from(min.to_vec()), array.dtype()),
varbin_scalar(Buffer::from(max.to_vec()), array.dtype()),
)),
})?;
let Some((min, max)) = minmax else {
Expand Down
41 changes: 26 additions & 15 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ impl Buffer {
}
}

impl PartialEq for Buffer {
fn eq(&self, other: &Self) -> bool {
self.as_slice().eq(other.as_slice())
}
}

impl Eq for Buffer {}

impl PartialOrd for Buffer {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.as_slice().partial_cmp(other.as_slice())
}
}

impl Deref for Buffer {
type Target = [u8];

Expand All @@ -134,10 +148,15 @@ impl AsRef<[u8]> for Buffer {
}
}

impl From<&[u8]> for Buffer {
fn from(value: &[u8]) -> Self {
// We prefer Arrow since it retains mutability
Buffer(Inner::Arrow(ArrowBuffer::from(value)))
impl From<&'static [u8]> for Buffer {
gatesn marked this conversation as resolved.
Show resolved Hide resolved
fn from(value: &'static [u8]) -> Self {
Buffer(Inner::Bytes(bytes::Bytes::from_static(value)))
}
}

impl From<&'static str> for Buffer {
fn from(slice: &'static str) -> Buffer {
Buffer(Inner::Bytes(bytes::Bytes::from_static(slice.as_bytes())))
}
}

Expand Down Expand Up @@ -166,16 +185,8 @@ impl From<ArrowMutableBuffer> for Buffer {
}
}

impl PartialEq for Buffer {
fn eq(&self, other: &Self) -> bool {
self.as_ref().eq(other.as_ref())
}
}

impl Eq for Buffer {}

impl PartialOrd for Buffer {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.as_ref().partial_cmp(other.as_ref())
impl FromIterator<u8> for Buffer {
fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
Buffer(Inner::Arrow(ArrowBuffer::from_iter(iter)))
}
}
2 changes: 0 additions & 2 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ path = "src/lib.rs"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand All @@ -37,7 +36,6 @@ object_store = { workspace = true }
pin-project = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
vortex-array = { workspace = true }
vortex-datetime-dtype = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
Expand Down
1 change: 0 additions & 1 deletion vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ readme = "README.md"
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
bytes = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["std"] }
futures-executor = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions vortex-file/src/read/builder/initial_read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::ops::Range;

use bytes::Bytes;
use flatbuffers::{root, root_unchecked};
use vortex_buffer::Buffer;
use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap};
use vortex_flatbuffers::{dtype as fbd, footer};
use vortex_io::VortexReadAt;
Expand All @@ -12,7 +12,7 @@ use crate::{LazyDType, EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION};
pub struct InitialRead {
/// The bytes from the initial read of the file, which is assumed (for now) to be sufficiently
/// large to contain the schema and layout.
pub buf: Bytes,
pub buf: Buffer,
/// The absolute byte offset representing the start of the initial read within the file.
pub initial_read_offset: u64,
/// The byte range within `buf` representing the Postscript flatbuffer.
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn read_initial_bytes<R: VortexReadAt>(
let read_size = INITIAL_READ_SIZE.min(file_size as usize);

let initial_read_offset = file_size - read_size as u64;
let buf: Bytes = read
let buf: Buffer = read
.read_byte_range(initial_read_offset, read_size as u64)
.await?;

Expand Down
Loading
Loading