diff --git a/Cargo.lock b/Cargo.lock index 328978b6d..ba562e550 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4909,7 +4909,6 @@ dependencies = [ "arrow-select", "arrow-string", "backtrace", - "bytes", "criterion", "enum-iterator", "flatbuffers", @@ -4967,7 +4966,6 @@ dependencies = [ "arrow-array", "arrow-schema", "async-trait", - "bytes", "chrono", "datafusion", "datafusion-common", @@ -4985,7 +4983,6 @@ dependencies = [ "tokio", "url", "vortex-array", - "vortex-datetime-dtype", "vortex-dtype", "vortex-error", "vortex-expr", @@ -5111,7 +5108,6 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-schema", - "bytes", "flatbuffers", "futures", "futures-executor", @@ -5172,7 +5168,6 @@ dependencies = [ name = "vortex-io" version = "0.21.1" dependencies = [ - "bytes", "cfg-if", "compio", "flume", @@ -5294,7 +5289,6 @@ version = "0.21.1" dependencies = [ "arbitrary", "arrow-array", - "bytes", "datafusion-common", "flatbuffers", "flexbuffers", diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 6a6160675..aefe26b02 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -32,7 +32,6 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } backtrace = { workspace = true } -bytes = { workspace = true } enum-iterator = { workspace = true } flatbuffers = { workspace = true } flexbuffers = { workspace = true } diff --git a/vortex-array/src/array/constant/canonical.rs b/vortex-array/src/array/constant/canonical.rs index b36b0e9c0..fa51caa9b 100644 --- a/vortex-array/src/array/constant/canonical.rs +++ b/vortex-array/src/array/constant/canonical.rs @@ -90,7 +90,7 @@ fn canonical_byte_view( if scalar_bytes.len() >= BinaryView::MAX_INLINED_SIZE { buffers.push( PrimitiveArray::new( - Buffer::from(scalar_bytes), + Buffer::from(scalar_bytes.to_vec()), PType::U8, Validity::NonNullable, ) diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index b8dd2adfc..0467653e9 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -4,7 +4,6 @@ use std::sync::Arc; mod accessor; use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, Buffer as ArrowBuffer}; -use bytes::Bytes; use itertools::Itertools; use serde::{Deserialize, Serialize}; use vortex_buffer::Buffer; @@ -86,13 +85,6 @@ impl PrimitiveArray { Self::from_vec(elems, validity) } - /// Creates a new array of type U8 - pub fn from_bytes(bytes: Bytes, validity: Validity) -> Self { - let buffer = Buffer::from(bytes); - - PrimitiveArray::new(buffer, PType::U8, validity) - } - pub fn validity(&self) -> Validity { self.metadata().validity.to_validity(|| { self.as_ref() diff --git a/vortex-array/src/array/varbin/builder.rs b/vortex-array/src/array/varbin/builder.rs index 30a7087aa..82ef32f26 100644 --- a/vortex-array/src/array/varbin/builder.rs +++ b/vortex-array/src/array/varbin/builder.rs @@ -1,7 +1,7 @@ use arrow_buffer::NullBufferBuilder; -use bytes::BytesMut; -use num_traits::AsPrimitive; -use vortex_dtype::{DType, NativePType}; +use num_traits::{AsPrimitive, PrimInt}; +use vortex_buffer::Buffer; +use vortex_dtype::{DType, NativePType, PType}; use vortex_error::{vortex_panic, VortexExpect as _}; use crate::array::primitive::PrimitiveArray; @@ -9,19 +9,19 @@ use crate::array::varbin::VarBinArray; use crate::validity::Validity; use crate::IntoArrayData; -pub struct VarBinBuilder { +pub struct VarBinBuilder { offsets: Vec, - data: BytesMut, + data: Vec, validity: NullBufferBuilder, } -impl Default for VarBinBuilder { +impl Default for VarBinBuilder { fn default() -> Self { Self::new() } } -impl VarBinBuilder { +impl VarBinBuilder { pub fn new() -> Self { Self::with_capacity(0) } @@ -31,7 +31,7 @@ impl VarBinBuilder { offsets.push(O::zero()); Self { offsets, - data: BytesMut::new(), + data: Vec::new(), validity: NullBufferBuilder::new(len), } } @@ -80,7 +80,7 @@ impl VarBinBuilder { pub fn finish(mut self, dtype: DType) -> VarBinArray { let offsets = PrimitiveArray::from(self.offsets); - let data = PrimitiveArray::from_bytes(self.data.freeze(), Validity::NonNullable); + let data = PrimitiveArray::new(Buffer::from(self.data), PType::U8, Validity::NonNullable); let nulls = self.validity.finish(); let validity = if dtype.is_nullable() { diff --git a/vortex-array/src/array/varbin/compute/filter.rs b/vortex-array/src/array/varbin/compute/filter.rs index d785e8910..fd009233b 100644 --- a/vortex-array/src/array/varbin/compute/filter.rs +++ b/vortex-array/src/array/varbin/compute/filter.rs @@ -1,5 +1,5 @@ use itertools::Itertools; -use num_traits::{AsPrimitive, Zero}; +use num_traits::{AsPrimitive, PrimInt, Zero}; use vortex_dtype::{match_each_integer_ptype, DType, NativePType}; use vortex_error::{vortex_err, vortex_panic, VortexResult}; @@ -54,7 +54,7 @@ fn filter_select_var_bin_by_slice_primitive_offset( selection_count: usize, ) -> VortexResult where - O: NativePType + 'static + Zero, + O: NativePType + PrimInt + 'static + Zero, usize: AsPrimitive, { let logical_validity = validity.to_logical(offsets.len() - 1); @@ -107,7 +107,7 @@ fn update_non_nullable_slice( start: usize, end: usize, ) where - O: NativePType + 'static + Zero + Copy, + O: NativePType + PrimInt + 'static + Zero + Copy, usize: AsPrimitive, { let new_data = { @@ -145,7 +145,7 @@ fn filter_select_var_bin_by_index( } #[allow(deprecated)] -fn filter_select_var_bin_by_index_primitive_offset( +fn filter_select_var_bin_by_index_primitive_offset( dtype: DType, offsets: &[O], data: &[u8], diff --git a/vortex-array/src/array/varbin/compute/take.rs b/vortex-array/src/array/varbin/compute/take.rs index c3788e6c2..f5bd55522 100644 --- a/vortex-array/src/array/varbin/compute/take.rs +++ b/vortex-array/src/array/varbin/compute/take.rs @@ -1,4 +1,5 @@ use arrow_buffer::NullBuffer; +use num_traits::PrimInt; use vortex_dtype::{match_each_integer_ptype, DType, NativePType}; use vortex_error::{vortex_err, vortex_panic, VortexResult}; @@ -29,7 +30,7 @@ impl TakeFn for VarBinEncoding { } } -fn take( +fn take( dtype: DType, offsets: &[O], data: &[u8], @@ -57,7 +58,7 @@ fn take( Ok(builder.finish(dtype)) } -fn take_nullable( +fn take_nullable( dtype: DType, offsets: &[O], data: &[u8], diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 5c5726b65..2fec54bf8 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -1,6 +1,6 @@ use std::fmt::{Debug, Display}; -use num_traits::AsPrimitive; +use num_traits::{AsPrimitive, PrimInt}; use serde::{Deserialize, Serialize}; pub use stats::compute_varbin_statistics; use vortex_buffer::Buffer; @@ -150,12 +150,12 @@ impl VarBinArray { } } - fn from_vec_sized(vec: Vec, dtype: DType) -> Self + fn from_vec_sized(vec: Vec, dtype: DType) -> Self where - K: NativePType, + O: NativePType + PrimInt, T: AsRef<[u8]>, { - let mut builder = VarBinBuilder::::with_capacity(vec.len()); + let mut builder = VarBinBuilder::::with_capacity(vec.len()); for v in vec { builder.push_value(v.as_ref()); } diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 543391c56..f22d1fc6e 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -112,12 +112,12 @@ fn compute_min_max>(array: &T) -> VortexResu let minmax = array.with_iterator(|iter| match iter.flatten().minmax() { MinMaxResult::NoElements => None, MinMaxResult::OneElement(value) => { - let scalar = varbin_scalar(Buffer::from(value), array.dtype()); + let scalar = varbin_scalar(Buffer::from(value.to_vec()), array.dtype()); Some((scalar.clone(), scalar)) } MinMaxResult::MinMax(min, max) => Some(( - varbin_scalar(Buffer::from(min), array.dtype()), - varbin_scalar(Buffer::from(max), array.dtype()), + varbin_scalar(Buffer::from(min.to_vec()), array.dtype()), + varbin_scalar(Buffer::from(max.to_vec()), array.dtype()), )), })?; let Some((min, max)) = minmax else { diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index e22fa7d5b..ec6166332 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -120,6 +120,20 @@ impl Buffer { } } +impl PartialEq for Buffer { + fn eq(&self, other: &Self) -> bool { + self.as_slice().eq(other.as_slice()) + } +} + +impl Eq for Buffer {} + +impl PartialOrd for Buffer { + fn partial_cmp(&self, other: &Self) -> Option { + self.as_slice().partial_cmp(other.as_slice()) + } +} + impl Deref for Buffer { type Target = [u8]; @@ -134,10 +148,15 @@ impl AsRef<[u8]> for Buffer { } } -impl From<&[u8]> for Buffer { - fn from(value: &[u8]) -> Self { - // We prefer Arrow since it retains mutability - Buffer(Inner::Arrow(ArrowBuffer::from(value))) +impl From<&'static [u8]> for Buffer { + fn from(value: &'static [u8]) -> Self { + Buffer(Inner::Bytes(bytes::Bytes::from_static(value))) + } +} + +impl From<&'static str> for Buffer { + fn from(slice: &'static str) -> Buffer { + Buffer(Inner::Bytes(bytes::Bytes::from_static(slice.as_bytes()))) } } @@ -166,16 +185,8 @@ impl From for Buffer { } } -impl PartialEq for Buffer { - fn eq(&self, other: &Self) -> bool { - self.as_ref().eq(other.as_ref()) - } -} - -impl Eq for Buffer {} - -impl PartialOrd for Buffer { - fn partial_cmp(&self, other: &Self) -> Option { - self.as_ref().partial_cmp(other.as_ref()) +impl FromIterator for Buffer { + fn from_iter>(iter: T) -> Self { + Buffer(Inner::Arrow(ArrowBuffer::from_iter(iter))) } } diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index b8a85e138..3b21b98d5 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -21,7 +21,6 @@ path = "src/lib.rs" arrow-array = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } -bytes = { workspace = true } chrono = { workspace = true } datafusion = { workspace = true } datafusion-common = { workspace = true } @@ -37,7 +36,6 @@ object_store = { workspace = true } pin-project = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } vortex-array = { workspace = true } -vortex-datetime-dtype = { workspace = true } vortex-dtype = { workspace = true } vortex-error = { workspace = true, features = ["datafusion"] } vortex-expr = { workspace = true, features = ["datafusion"] } diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 48f07f46e..a00919dda 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -17,7 +17,6 @@ readme = "README.md" arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-schema = { workspace = true } -bytes = { workspace = true } flatbuffers = { workspace = true } futures = { workspace = true, features = ["std"] } futures-executor = { workspace = true } diff --git a/vortex-file/src/read/builder/initial_read.rs b/vortex-file/src/read/builder/initial_read.rs index c3d531d66..005481f32 100644 --- a/vortex-file/src/read/builder/initial_read.rs +++ b/vortex-file/src/read/builder/initial_read.rs @@ -1,7 +1,7 @@ use core::ops::Range; -use bytes::Bytes; use flatbuffers::{root, root_unchecked}; +use vortex_buffer::Buffer; use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap}; use vortex_flatbuffers::{dtype as fbd, footer}; use vortex_io::VortexReadAt; @@ -12,7 +12,7 @@ use crate::{LazyDType, EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION}; pub struct InitialRead { /// The bytes from the initial read of the file, which is assumed (for now) to be sufficiently /// large to contain the schema and layout. - pub buf: Bytes, + pub buf: Buffer, /// The absolute byte offset representing the start of the initial read within the file. pub initial_read_offset: u64, /// The byte range within `buf` representing the Postscript flatbuffer. @@ -68,7 +68,7 @@ pub async fn read_initial_bytes( let read_size = INITIAL_READ_SIZE.min(file_size as usize); let initial_read_offset = file_size - read_size as u64; - let buf: Bytes = read + let buf: Buffer = read .read_byte_range(initial_read_offset, read_size as u64) .await?; diff --git a/vortex-file/src/read/cache.rs b/vortex-file/src/read/cache.rs index 8e978ec3b..c2a46ab30 100644 --- a/vortex-file/src/read/cache.rs +++ b/vortex-file/src/read/cache.rs @@ -1,10 +1,10 @@ use std::fmt::Debug; use std::sync::{Arc, RwLock}; -use bytes::Bytes; use flatbuffers::root_unchecked; use once_cell::sync::OnceCell; use vortex_array::aliases::hash_map::HashMap; +use vortex_buffer::Buffer; use vortex_dtype::field::Field; use vortex_dtype::flatbuffers::{extract_field, project_and_deserialize, resolve_field}; use vortex_dtype::{DType, FieldNames}; @@ -16,7 +16,7 @@ use crate::read::{LayoutPartId, MessageId}; #[derive(Default, Debug)] pub struct LayoutMessageCache { - cache: HashMap, + cache: HashMap, } impl LayoutMessageCache { @@ -26,15 +26,15 @@ impl LayoutMessageCache { } } - pub fn get(&self, path: &[LayoutPartId]) -> Option { + pub fn get(&self, path: &[LayoutPartId]) -> Option { self.cache.get(path).cloned() } - pub fn remove(&mut self, path: &[LayoutPartId]) -> Option { + pub fn remove(&mut self, path: &[LayoutPartId]) -> Option { self.cache.remove(path) } - pub fn set(&mut self, path: MessageId, value: Bytes) { + pub fn set(&mut self, path: MessageId, value: Buffer) { self.cache.insert(path, value); } } @@ -86,7 +86,7 @@ impl SerializedDTypeField { #[derive(Debug)] enum LazyDTypeState { DType(DType), - Serialized(Bytes, OnceCell, SerializedDTypeField), + Serialized(Buffer, OnceCell, SerializedDTypeField), Unknown, } @@ -102,7 +102,7 @@ impl LazyDType { /// # Safety /// This function is unsafe because it trusts the caller to pass in a valid flatbuffer /// representing a message::Schema. - pub unsafe fn from_schema_bytes(dtype_bytes: Bytes) -> Self { + pub unsafe fn from_schema_bytes(dtype_bytes: Buffer) -> Self { Self { inner: LazyDTypeState::Serialized( dtype_bytes, @@ -288,7 +288,7 @@ impl RelativeLayoutCache { self.relative(id, Arc::new(LazyDType::unknown())) } - pub fn get(&self, path: &[LayoutPartId]) -> Option { + pub fn get(&self, path: &[LayoutPartId]) -> Option { self.root .read() .unwrap_or_else(|poison| { @@ -301,7 +301,7 @@ impl RelativeLayoutCache { .get(&self.absolute_id(path)) } - pub fn remove(&mut self, path: &[LayoutPartId]) -> Option { + pub fn remove(&mut self, path: &[LayoutPartId]) -> Option { self.root .write() .unwrap_or_else(|poison| { diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index fe49fee7e..18dfcd9bf 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -398,13 +398,13 @@ mod tests { use std::sync::{Arc, RwLock}; use arrow_buffer::BooleanBufferBuilder; - use bytes::Bytes; use flatbuffers::{root, FlatBufferBuilder}; use futures_util::io::Cursor; use futures_util::TryStreamExt; use vortex_array::array::{ChunkedArray, PrimitiveArray}; use vortex_array::compute::FilterMask; use vortex_array::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant}; + use vortex_buffer::Buffer; use vortex_dtype::PType; use vortex_expr::{BinaryExpr, Identity, Literal, Operator}; use vortex_flatbuffers::{footer, WriteFlatBuffer}; @@ -420,7 +420,7 @@ mod tests { async fn layout_and_bytes( cache: Arc>, scan: Scan, - ) -> (ChunkedLayoutReader, ChunkedLayoutReader, Bytes, usize) { + ) -> (ChunkedLayoutReader, ChunkedLayoutReader, Buffer, usize) { let mut writer = Cursor::new(Vec::new()); let array = PrimitiveArray::from((0..100).collect::>()).into_array(); let array_dtype = array.dtype().clone(); @@ -462,7 +462,7 @@ mod tests { let chunked_layout = write::LayoutSpec::chunked(flat_layouts.into(), len as u64, None); let flat_buf = chunked_layout.write_flatbuffer(&mut fb); fb.finish_minimal(flat_buf); - let fb_bytes = Bytes::copy_from_slice(fb.finished_data()); + let fb_bytes = Buffer::from(fb.finished_data().to_vec()); let layout = root::(&fb_bytes).unwrap(); let dtype = Arc::new(LazyDType::from_dtype(PType::I32.into())); @@ -484,7 +484,7 @@ mod tests { } .build() .unwrap(), - Bytes::from(written), + Buffer::from(written), len, ) } diff --git a/vortex-file/src/read/layouts/columnar.rs b/vortex-file/src/read/layouts/columnar.rs index c893afeb8..44f0d7a08 100644 --- a/vortex-file/src/read/layouts/columnar.rs +++ b/vortex-file/src/read/layouts/columnar.rs @@ -399,7 +399,6 @@ mod tests { use std::iter; use std::sync::{Arc, RwLock}; - use bytes::Bytes; use vortex_array::accessor::ArrayAccessor; use vortex_array::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray}; use vortex_array::validity::Validity; @@ -419,7 +418,7 @@ mod tests { async fn layout_and_bytes( cache: Arc>, scan: Scan, - ) -> (Box, Box, Bytes, usize) { + ) -> (Box, Box, Buffer, usize) { let int_array = PrimitiveArray::from((0..100).collect::>()).into_array(); let int2_array = PrimitiveArray::from((100..200).collect::>()).into_array(); let int_dtype = int_array.dtype().clone(); @@ -475,7 +474,7 @@ mod tests { RelativeLayoutCache::new(cache.clone(), dtype), ) .unwrap(), - Bytes::copy_from_slice(&written), + written, len, ) } diff --git a/vortex-file/src/read/layouts/flat.rs b/vortex-file/src/read/layouts/flat.rs index 32d818bef..d01838efb 100644 --- a/vortex-file/src/read/layouts/flat.rs +++ b/vortex-file/src/read/layouts/flat.rs @@ -2,8 +2,8 @@ use std::collections::BTreeSet; use std::io::Cursor; use std::sync::Arc; -use bytes::Bytes; use vortex_array::{ArrayData, Context}; +use vortex_buffer::Buffer; use vortex_error::{vortex_bail, VortexResult}; use vortex_flatbuffers::footer; use vortex_ipc::messages::{DecoderMessage, SyncMessageReader}; @@ -73,7 +73,7 @@ impl FlatLayoutReader { MessageLocator(self.message_cache.absolute_id(&[]), self.range) } - fn array_from_bytes(&self, buf: Bytes) -> VortexResult { + fn array_from_bytes(&self, buf: Buffer) -> VortexResult { let mut reader = SyncMessageReader::new(Cursor::new(buf)); match reader.next().transpose()? { Some(DecoderMessage::Array(array_parts)) => array_parts.into_array_data( @@ -118,9 +118,9 @@ impl LayoutReader for FlatLayoutReader { mod tests { use std::sync::{Arc, RwLock}; - use bytes::Bytes; use vortex_array::array::PrimitiveArray; use vortex_array::{Context, IntoArrayData, IntoArrayVariant, ToArrayData}; + use vortex_buffer::Buffer; use vortex_dtype::PType; use vortex_expr::{BinaryExpr, Identity, Literal, Operator}; use vortex_ipc::messages::{EncoderMessage, SyncMessageWriter}; @@ -133,7 +133,7 @@ mod tests { async fn read_only_layout( cache: Arc>, - ) -> (FlatLayoutReader, Bytes, usize, Arc) { + ) -> (FlatLayoutReader, Buffer, usize, Arc) { let array = PrimitiveArray::from((0..100).collect::>()).into_array(); let mut written = vec![]; @@ -151,7 +151,7 @@ mod tests { Arc::new(Context::default()), RelativeLayoutCache::new(cache, dtype.clone()), ), - Bytes::from(written), + Buffer::from(written), array.len(), dtype, ) @@ -160,7 +160,7 @@ mod tests { async fn layout_and_bytes( cache: Arc>, scan: Scan, - ) -> (FlatLayoutReader, FlatLayoutReader, Bytes, usize) { + ) -> (FlatLayoutReader, FlatLayoutReader, Buffer, usize) { let (read_layout, bytes, len, dtype) = read_only_layout(cache.clone()).await; ( diff --git a/vortex-file/src/read/layouts/test_read.rs b/vortex-file/src/read/layouts/test_read.rs index 416da248e..1e28833d8 100644 --- a/vortex-file/src/read/layouts/test_read.rs +++ b/vortex-file/src/read/layouts/test_read.rs @@ -1,8 +1,8 @@ use std::collections::{BTreeSet, VecDeque}; use std::sync::{Arc, RwLock}; -use bytes::Bytes; use vortex_array::ArrayData; +use vortex_buffer::Buffer; use vortex_error::VortexUnwrap; use crate::read::mask::RowMask; @@ -25,7 +25,7 @@ fn layout_splits( pub fn read_layout_data( layout: &mut dyn LayoutReader, cache: Arc>, - buf: &Bytes, + buf: &Buffer, selector: &RowMask, ) -> Option { while let Some(rr) = layout.poll_read(selector).unwrap() { @@ -45,7 +45,7 @@ pub fn read_layout_data( pub fn read_filters( layout: &mut dyn LayoutReader, cache: Arc>, - buf: &Bytes, + buf: &Buffer, selector: &RowMask, ) -> Option { while let Some(rr) = layout.poll_read(selector).unwrap() { @@ -71,7 +71,7 @@ pub fn filter_read_layout( filter_layout: &mut dyn LayoutReader, layout: &mut dyn LayoutReader, cache: Arc>, - buf: &Bytes, + buf: &Buffer, length: usize, ) -> VecDeque { layout_splits(&[filter_layout, layout], length) @@ -83,7 +83,7 @@ pub fn filter_read_layout( pub fn read_layout( layout: &mut dyn LayoutReader, cache: Arc>, - buf: &Bytes, + buf: &Buffer, length: usize, ) -> VecDeque { layout_splits(&[layout], length) diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index d89aaff81..f03519e6b 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -1,7 +1,6 @@ use std::collections::BTreeSet; use std::fmt::Debug; -use bytes::Bytes; use vortex_array::ArrayData; use vortex_error::VortexResult; @@ -28,6 +27,7 @@ pub use filtering::RowFilter; pub use projection::Projection; pub use recordbatchreader::{AsyncRuntime, VortexRecordBatchReader}; pub use stream::VortexFileArrayStream; +use vortex_buffer::Buffer; use vortex_expr::ExprRef; use crate::byte_range::ByteRange; @@ -69,7 +69,7 @@ pub type MessageId = Vec; pub struct MessageLocator(pub MessageId, pub ByteRange); /// A message that has had its bytes materialized onto the heap. #[derive(Debug, Clone)] -pub struct Message(pub MessageId, pub Bytes); +pub struct Message(pub MessageId, pub Buffer); /// A polling interface for reading a value from a [`LayoutReader`]. #[derive(Debug)] diff --git a/vortex-file/src/write/layout.rs b/vortex-file/src/write/layout.rs index b86066a1d..e041d3f80 100644 --- a/vortex-file/src/write/layout.rs +++ b/vortex-file/src/write/layout.rs @@ -1,5 +1,5 @@ -use bytes::Bytes; use flatbuffers::{FlatBufferBuilder, WIPOffset}; +use vortex_buffer::Buffer; use vortex_flatbuffers::{footer as fb, FlatBufferRoot, WriteFlatBuffer}; use crate::byte_range::ByteRange; @@ -11,7 +11,7 @@ pub struct LayoutSpec { buffers: Option>, children: Option>, row_count: u64, - metadata: Option, + metadata: Option, } impl LayoutSpec { @@ -28,7 +28,7 @@ impl LayoutSpec { /// Create a chunked layout with children. /// /// has_metadata indicates whether first child is a layout containing metadata about other children. - pub fn chunked(children: Vec, row_count: u64, metadata: Option) -> Self { + pub fn chunked(children: Vec, row_count: u64, metadata: Option) -> Self { Self { id: CHUNKED_LAYOUT_ID, buffers: None, diff --git a/vortex-file/src/write/writer.rs b/vortex-file/src/write/writer.rs index 53a31570d..558dc14f3 100644 --- a/vortex-file/src/write/writer.rs +++ b/vortex-file/src/write/writer.rs @@ -2,7 +2,6 @@ use std::{io, iter, mem}; -use bytes::Bytes; use futures::TryStreamExt; use futures_util::io::Cursor; use itertools::Itertools; @@ -10,6 +9,7 @@ use vortex_array::array::{ChunkedArray, StructArray}; use vortex_array::stats::{as_stat_bitset_bytes, ArrayStatistics, Stat}; use vortex_array::stream::ArrayStream; use vortex_array::{ArrayData, ArrayLen}; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult}; use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt}; @@ -295,7 +295,7 @@ impl ColumnWriter { Ok(LayoutSpec::chunked( layouts, row_count, - Some(Bytes::from(stat_bitset)), + Some(Buffer::from(stat_bitset)), )) } else { Ok(LayoutSpec::chunked(data_chunks.collect(), row_count, None)) diff --git a/vortex-io/Cargo.toml b/vortex-io/Cargo.toml index 4b29ad6f3..444db8579 100644 --- a/vortex-io/Cargo.toml +++ b/vortex-io/Cargo.toml @@ -14,7 +14,6 @@ rust-version.workspace = true categories.workspace = true [dependencies] -bytes = { workspace = true } cfg-if = { workspace = true } compio = { workspace = true, features = ["bytes", "macros"], optional = true } pin-project = { workspace = true } diff --git a/vortex-io/src/aligned.rs b/vortex-io/src/aligned.rs deleted file mode 100644 index c9b28b403..000000000 --- a/vortex-io/src/aligned.rs +++ /dev/null @@ -1,173 +0,0 @@ -#![allow(dead_code)] -use std::ops::{Deref, DerefMut}; - -use bytes::Bytes; - -pub trait PowerOfTwo {} -impl PowerOfTwo for usize where usize: sealed::Sealed {} - -mod sealed { - pub trait Sealed {} - - impl Sealed<1> for usize {} - impl Sealed<2> for usize {} - impl Sealed<4> for usize {} - impl Sealed<8> for usize {} - impl Sealed<16> for usize {} - impl Sealed<32> for usize {} - impl Sealed<64> for usize {} - impl Sealed<128> for usize {} - impl Sealed<256> for usize {} - impl Sealed<512> for usize {} -} - -/// A variant of [`BytesMut`][bytes::BytesMut] that freezes into a [`Bytes`] that is guaranteed -/// to begin at a multiple of a target byte-alignment. -/// -/// Internally, it accomplishes this by over-allocating by up to the alignment size, padding the -/// front as necessary. Reads and writes will only be able to access the region after the padding. -/// -/// It is required for the alignment to be a valid power of 2 <= 512, any other value will be -/// a compile-time failure. -pub(crate) struct AlignedBytesMut { - buf: Vec, - padding: usize, - capacity: usize, -} - -impl AlignedBytesMut -where - usize: PowerOfTwo, -{ - /// Allocate a new mutable buffer with capacity to hold at least `capacity` bytes. - /// - /// The mutable buffer may allocate more than the requested amount to pad the memory for - /// alignment. - pub fn with_capacity(capacity: usize) -> Self { - // Allocate up to `ALIGN` extra bytes, in case we need to pad the returned pointer. - let allocation_size = (capacity + ALIGN - 1).next_multiple_of(ALIGN); - let mut buf = Vec::::with_capacity(allocation_size); - let padding = buf.as_ptr().align_offset(ALIGN); - unsafe { - buf.set_len(padding); - } - - Self { - buf, - padding, - capacity, - } - } - - /// Usable capacity of this buffer. - pub fn capacity(&self) -> usize { - self.capacity - } - - /// Set the length of the mutable buffer directly. - /// - /// # Safety - /// - /// The caller is responsible for ensuring that the provided length fits within the original - /// capacity request. - /// - /// Failure to do so could cause uninitialized memory to be readable. - pub unsafe fn set_len(&mut self, len: usize) { - assert!( - len <= self.capacity, - "set_len call out of bounds: {} > {}", - len, - self.capacity - ); - unsafe { self.buf.set_len(len + self.padding) } - } - - /// Extend this mutable buffer with the contents of the provided slice. - pub fn extend_from_slice(&mut self, slice: &[u8]) { - let bytes_remaining = self.capacity - self.len(); - assert!( - slice.len() <= bytes_remaining, - "extend_from_slice cannot reallocate" - ); - - // The internal `buf` is padded, so appends will land after the padded region. - self.buf.extend_from_slice(slice) - } - - /// Freeze the existing allocation into a readonly [`Bytes`], guaranteed to be aligned to - /// the target [`ALIGN`] size. - pub fn freeze(self) -> Bytes { - // bytes_unaligned will contain the entire allocation, so that on Drop the entire buf - // is freed. - // - // bytes_aligned is a sliced view on top of bytes_unaligned. - // - // bytes_aligned - // | parent \ *ptr - // v | - // bytes_unaligned | - // | | - // | *ptr | - // v v - // +------------+------------------+----------------+ - // | padding | content | spare capacity | - // +------------+------------------+----------------+ - let bytes_unaligned = Bytes::from(self.buf); - let bytes_aligned = bytes_unaligned.slice(self.padding..); - - assert_eq!( - bytes_aligned.as_ptr().align_offset(ALIGN), - 0, - "bytes_aligned must be aligned to {}", - ALIGN - ); - - bytes_aligned - } -} - -impl Deref for AlignedBytesMut -where - usize: PowerOfTwo, -{ - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - &self.buf[self.padding..] - } -} - -impl DerefMut for AlignedBytesMut -where - usize: PowerOfTwo, -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.buf[self.padding..] - } -} - -#[cfg(test)] -mod tests { - use crate::aligned::AlignedBytesMut; - - #[test] - fn test_align() { - let mut buf = AlignedBytesMut::<128>::with_capacity(1); - buf.extend_from_slice(b"a"); - - let data = buf.freeze(); - - assert_eq!(data.as_ref(), b"a"); - assert_eq!(data.as_ptr().align_offset(128), 0); - } - - #[test] - fn test_extend() { - let mut buf = AlignedBytesMut::<128>::with_capacity(256); - buf.extend_from_slice(b"a"); - buf.extend_from_slice(b"bcdefgh"); - - let data = buf.freeze(); - assert_eq!(data.as_ref(), b"abcdefgh"); - } -} diff --git a/vortex-io/src/buf.rs b/vortex-io/src/buf.rs index 19040b7b5..95070ce22 100644 --- a/vortex-io/src/buf.rs +++ b/vortex-io/src/buf.rs @@ -1,6 +1,6 @@ use std::io; -use bytes::Bytes; +use vortex_buffer::Buffer; use crate::VortexReadAt; @@ -37,7 +37,7 @@ impl VortexBufReader { /// [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error is returned. /// /// See also [`VortexReadAt::read_byte_range`]. - pub async fn read_bytes(&mut self, len: u64) -> io::Result { + pub async fn read_bytes(&mut self, len: u64) -> io::Result { let result = self.inner.read_byte_range(self.pos, len).await?; self.pos += len; Ok(result) @@ -48,13 +48,13 @@ impl VortexBufReader { mod tests { use std::io; - use bytes::Bytes; + use vortex_buffer::Buffer; use crate::VortexBufReader; #[tokio::test] async fn test_buf_reader() { - let reader = Bytes::from("0123456789".as_bytes()); + let reader = Buffer::from("0123456789".as_bytes()); let mut buf_reader = VortexBufReader::new(reader); let first2 = buf_reader.read_bytes(2).await.unwrap(); @@ -67,7 +67,7 @@ mod tests { #[tokio::test] async fn test_eof() { - let reader = Bytes::from("0123456789".as_bytes()); + let reader = Buffer::from("0123456789".as_bytes()); let mut buf_reader = VortexBufReader::new(reader); // Read past end of internal reader diff --git a/vortex-io/src/compio.rs b/vortex-io/src/compio.rs index 74bcbdc20..5f9df5cd6 100644 --- a/vortex-io/src/compio.rs +++ b/vortex-io/src/compio.rs @@ -1,69 +1,26 @@ use std::future::Future; use std::io; -use bytes::Bytes; -use compio::buf::{IoBuf, IoBufMut, SetBufInit}; use compio::fs::File; use compio::io::AsyncReadAtExt; use compio::BufResult; +use vortex_buffer::Buffer; use vortex_error::VortexUnwrap; -use crate::aligned::{AlignedBytesMut, PowerOfTwo}; -use crate::{VortexReadAt, ALIGNMENT}; - -unsafe impl IoBuf for AlignedBytesMut -where - usize: PowerOfTwo, -{ - fn as_buf_ptr(&self) -> *const u8 { - self.as_ptr() - } - - fn buf_len(&self) -> usize { - self.len() - } - - fn buf_capacity(&self) -> usize { - self.capacity() - } -} - -impl SetBufInit for AlignedBytesMut -where - usize: PowerOfTwo, -{ - unsafe fn set_buf_init(&mut self, len: usize) { - // The contract of this trait specifies that providing a `len` <= the current len should - // do nothing. AlignedBytesMut by default will set the len directly without checking this. - if self.len() < len { - unsafe { - self.set_len(len); - } - } - } -} - -unsafe impl IoBufMut for AlignedBytesMut -where - usize: PowerOfTwo, -{ - fn as_buf_mut_ptr(&mut self) -> *mut u8 { - self.as_mut_ptr() - } -} +use crate::VortexReadAt; impl VortexReadAt for File { fn read_byte_range( &self, pos: u64, len: u64, - ) -> impl Future> + 'static { + ) -> impl Future> + 'static { let this = self.clone(); - let buffer = AlignedBytesMut::::with_capacity(len.try_into().vortex_unwrap()); + let buffer = Vec::with_capacity(len.try_into().vortex_unwrap()); async move { // Turn the buffer into a static slice. let BufResult(res, buffer) = this.read_exact_at(buffer, pos).await; - res.map(|_| buffer.freeze()) + res.map(|_| Buffer::from(buffer)) } } @@ -93,6 +50,6 @@ mod tests { // Use the file as a VortexReadAt instance. let read = file.read_byte_range(2, 4).await.unwrap(); - assert_eq!(&read, "2345".as_bytes()); + assert_eq!(read.as_ref(), "2345".as_bytes()); } } diff --git a/vortex-io/src/io_buf.rs b/vortex-io/src/io_buf.rs index 71c433493..6bcbaa909 100644 --- a/vortex-io/src/io_buf.rs +++ b/vortex-io/src/io_buf.rs @@ -2,7 +2,6 @@ use std::ops::Range; -use bytes::Bytes; use vortex_buffer::Buffer; /// Trait for types that can provide a readonly byte buffer interface to I/O frameworks. @@ -98,20 +97,6 @@ unsafe impl IoBuf for Vec { } } -unsafe impl IoBuf for Bytes { - fn read_ptr(&self) -> *const u8 { - self.as_ptr() - } - - fn bytes_init(&self) -> usize { - self.len() - } - - fn as_slice(&self) -> &[u8] { - self.as_ref() - } -} - unsafe impl IoBuf for Slice { #[inline] fn read_ptr(&self) -> *const u8 { diff --git a/vortex-io/src/lib.rs b/vortex-io/src/lib.rs index 4f6d37d90..7dae15d83 100644 --- a/vortex-io/src/lib.rs +++ b/vortex-io/src/lib.rs @@ -19,7 +19,6 @@ pub use read_ranges::*; pub use tokio::*; pub use write::*; -mod aligned; mod buf; #[cfg(feature = "compio")] mod compio; diff --git a/vortex-io/src/limit.rs b/vortex-io/src/limit.rs index 3307fa7d5..7a9082cc5 100644 --- a/vortex-io/src/limit.rs +++ b/vortex-io/src/limit.rs @@ -139,13 +139,13 @@ where mod tests { use std::{future, io}; - use bytes::Bytes; use futures_util::future::BoxFuture; use futures_util::{FutureExt, StreamExt}; + use vortex_buffer::Buffer; use crate::limit::SizeLimitedStream; - async fn make_future(len: usize) -> Bytes { + async fn make_future(len: usize) -> Buffer { "a".as_bytes().iter().copied().cycle().take(len).collect() } @@ -165,11 +165,11 @@ mod tests { #[tokio::test] async fn test_does_not_leak_permits() { - let bad_fut: BoxFuture<'static, io::Result> = + let bad_fut: BoxFuture<'static, io::Result> = future::ready(Err(io::Error::new(io::ErrorKind::Other, "badness"))).boxed(); - let good_fut: BoxFuture<'static, io::Result> = - future::ready(Ok(Bytes::from_static("aaaaa".as_bytes()))).boxed(); + let good_fut: BoxFuture<'static, io::Result> = + future::ready(Ok(Buffer::from("aaaaa".as_bytes()))).boxed(); let mut size_limited = SizeLimitedStream::new(10); size_limited.push(bad_fut, 10).await; diff --git a/vortex-io/src/object_store.rs b/vortex-io/src/object_store.rs index dbf5dacc8..635f75f42 100644 --- a/vortex-io/src/object_store.rs +++ b/vortex-io/src/object_store.rs @@ -4,15 +4,13 @@ use std::os::unix::fs::FileExt; use std::sync::Arc; use std::{io, mem}; -use bytes::Bytes; use futures_util::StreamExt; use object_store::path::Path; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore, WriteMultipart}; use vortex_buffer::Buffer; -use vortex_error::{VortexExpect, VortexResult, VortexUnwrap}; +use vortex_error::{VortexExpect, VortexResult}; -use crate::aligned::AlignedBytesMut; -use crate::{IoBuf, VortexBufReader, VortexReadAt, VortexWrite, ALIGNMENT}; +use crate::{IoBuf, VortexBufReader, VortexReadAt, VortexWrite}; pub trait ObjectStoreExt { fn vortex_read( @@ -72,16 +70,13 @@ impl VortexReadAt for ObjectStoreReadAt { &self, pos: u64, len: u64, - ) -> impl Future> + 'static { + ) -> impl Future> + 'static { let object_store = self.object_store.clone(); let location = self.location.clone(); - Box::pin(async move { let read_start: usize = pos.try_into().vortex_expect("pos"); let read_end: usize = (pos + len).try_into().vortex_expect("pos + len"); - - let mut buf = - AlignedBytesMut::::with_capacity(len.try_into().vortex_unwrap()); + let len: usize = len.try_into().vortex_expect("len does not fit into usize"); let response = object_store .get_opts( @@ -97,21 +92,19 @@ impl VortexReadAt for ObjectStoreReadAt { // it's coming from a network stream. Internally they optimize the File implementation // to only perform a single allocation when calling `.bytes().await`, which we // replicate here by emitting the contents directly into our aligned buffer. + let mut buffer = Vec::with_capacity(len); match response.payload { GetResultPayload::File(file, _) => { - unsafe { - buf.set_len(len.try_into().vortex_unwrap()); - } - file.read_exact_at(&mut buf, pos)?; + unsafe { buffer.set_len(len) }; + file.read_exact_at(&mut buffer, pos)?; } GetResultPayload::Stream(mut byte_stream) => { while let Some(bytes) = byte_stream.next().await { - buf.extend_from_slice(&bytes?); + buffer.extend_from_slice(&bytes?); } } } - - Ok(buf.freeze()) + Ok(Buffer::from(buffer)) }) } diff --git a/vortex-io/src/offset.rs b/vortex-io/src/offset.rs index f63525ec3..e21de0016 100644 --- a/vortex-io/src/offset.rs +++ b/vortex-io/src/offset.rs @@ -1,8 +1,8 @@ use std::future::Future; use std::io; -use bytes::Bytes; use futures::FutureExt; +use vortex_buffer::Buffer; use crate::VortexReadAt; @@ -35,7 +35,7 @@ impl VortexReadAt for OffsetReadAt { &self, pos: u64, len: u64, - ) -> impl Future> + 'static { + ) -> impl Future> + 'static { self.read.read_byte_range(pos + self.offset, len) } diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index 7f81485b0..93abf0161 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -2,13 +2,9 @@ use std::future::{self, Future}; use std::io; use std::sync::Arc; -use bytes::Bytes; use vortex_buffer::Buffer; use vortex_error::{vortex_err, VortexUnwrap}; -use crate::aligned::AlignedBytesMut; -use crate::ALIGNMENT; - /// A trait for types that support asynchronous reads. /// /// References to the type must be safe to [share across threads][Send], but spawned @@ -16,7 +12,7 @@ use crate::ALIGNMENT; /// /// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads. pub trait VortexReadAt: Send + Sync + Clone + 'static { - /// Request an asynchronous positional read. Results will be returned as an owned [`Bytes`]. + /// Request an asynchronous positional read. Results will be returned as a [`Buffer`]. /// /// If the reader does not have the requested number of bytes, the returned Future will complete /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof]. @@ -29,7 +25,7 @@ pub trait VortexReadAt: Send + Sync + Clone + 'static { &self, pos: u64, len: u64, - ) -> impl Future> + 'static; + ) -> impl Future> + 'static; // TODO(ngates): the read implementation should be able to hint at its latency/throughput // allowing the caller to make better decisions about how to coalesce reads. @@ -49,7 +45,7 @@ impl VortexReadAt for Arc { &self, pos: u64, len: u64, - ) -> impl Future> + 'static { + ) -> impl Future> + 'static { T::read_byte_range(self, pos, len) } @@ -67,52 +63,19 @@ impl VortexReadAt for Buffer { &self, pos: u64, len: u64, - ) -> impl Future> + 'static { + ) -> impl Future> + 'static { let read_start: usize = pos.try_into().vortex_unwrap(); let read_end: usize = (len + pos).try_into().vortex_unwrap(); if read_end > self.len() { - future::ready(Err(io::Error::new( + return future::ready(Err(io::Error::new( io::ErrorKind::UnexpectedEof, vortex_err!("unexpected eof"), - ))) - } else { - let mut buffer = - AlignedBytesMut::::with_capacity(len.try_into().vortex_unwrap()); - unsafe { - buffer.set_len(len.try_into().vortex_unwrap()); - } - buffer.copy_from_slice(self.slice(read_start..read_end).as_slice()); - future::ready(Ok(buffer.freeze())) + ))); } + future::ready(Ok(self.slice(read_start..read_end))) } fn size(&self) -> impl Future> + 'static { future::ready(Ok(self.len() as u64)) } } - -impl VortexReadAt for Bytes { - fn read_byte_range( - &self, - pos: u64, - len: u64, - ) -> impl Future> + 'static { - let read_start: usize = pos.try_into().vortex_unwrap(); - let read_end: usize = (pos + len).try_into().vortex_unwrap(); - - if read_end > self.len() { - future::ready(Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - vortex_err!("unexpected eof"), - ))) - } else { - let sliced = self.slice(read_start..read_end); - future::ready(Ok(sliced)) - } - } - - fn size(&self) -> impl Future> + 'static { - let len = Ok(self.len() as u64); - future::ready(len) - } -} diff --git a/vortex-io/src/read_ranges.rs b/vortex-io/src/read_ranges.rs index dc1f41536..ae7d95516 100644 --- a/vortex-io/src/read_ranges.rs +++ b/vortex-io/src/read_ranges.rs @@ -5,8 +5,8 @@ use std::io::ErrorKind; use std::ops::Range; use std::sync::Arc; -use bytes::Bytes; use futures_util::{stream, FutureExt, StreamExt, TryStreamExt}; +use vortex_buffer::Buffer; use vortex_error::VortexExpect; use crate::{Dispatch, IoDispatcher, VortexReadAt}; @@ -34,7 +34,7 @@ impl VortexReadRanges { pub fn read_byte_ranges( &self, ranges: Vec>, - ) -> impl Future>> + Send + 'static { + ) -> impl Future>> + Send + 'static { let dispatcher = self.dispatcher.clone(); let reader = self.read.clone(); let max_gap = self.max_gap; @@ -115,7 +115,7 @@ fn merge_ranges(mut ranges: Vec>, max_gap: usize) -> Vec(pub IO); @@ -66,16 +65,12 @@ impl VortexReadAt for TokioFile { &self, pos: u64, len: u64, - ) -> impl Future> + 'static { + ) -> impl Future> + 'static { let this = self.clone(); - let mut buffer = - AlignedBytesMut::::with_capacity(len.try_into().vortex_unwrap()); - unsafe { - buffer.set_len(len.try_into().vortex_unwrap()); - } + let mut buffer = vec![0u8; len.try_into().vortex_unwrap()]; match this.read_exact_at(&mut buffer, pos) { - Ok(()) => future::ready(Ok(buffer.freeze())), + Ok(()) => future::ready(Ok(Buffer::from(buffer))), Err(e) => future::ready(Err(e)), } } @@ -125,8 +120,8 @@ mod tests { let second_half = shared_file.read_byte_range(5, 5).await.unwrap(); - assert_eq!(&first_half, "01234".as_bytes()); - assert_eq!(&second_half, "56789".as_bytes()); + assert_eq!(first_half.as_ref(), "01234".as_bytes()); + assert_eq!(second_half.as_ref(), "56789".as_bytes()); } #[test] diff --git a/vortex-scalar/Cargo.toml b/vortex-scalar/Cargo.toml index 0fa9bf883..ef62c6509 100644 --- a/vortex-scalar/Cargo.toml +++ b/vortex-scalar/Cargo.toml @@ -16,7 +16,6 @@ readme = { workspace = true } [dependencies] arbitrary = { workspace = true, optional = true } arrow-array = { workspace = true } -bytes = { workspace = true } datafusion-common = { workspace = true, optional = true } flatbuffers = { workspace = true, optional = true } flexbuffers = { workspace = true, optional = true } diff --git a/vortex-scalar/src/binary.rs b/vortex-scalar/src/binary.rs index 8aa9c4741..1b3745954 100644 --- a/vortex-scalar/src/binary.rs +++ b/vortex-scalar/src/binary.rs @@ -91,13 +91,7 @@ impl TryFrom for Option { impl From<&[u8]> for Scalar { fn from(value: &[u8]) -> Self { - Scalar::from(Buffer::from(value)) - } -} - -impl From for Scalar { - fn from(value: bytes::Bytes) -> Self { - Scalar::from(Buffer::from(value)) + Scalar::from(Buffer::from(value.to_vec())) } } diff --git a/vortex-scalar/src/lib.rs b/vortex-scalar/src/lib.rs index 9b398b1be..7514d8375 100644 --- a/vortex-scalar/src/lib.rs +++ b/vortex-scalar/src/lib.rs @@ -274,5 +274,4 @@ from_vec_for_scalar!(f32); from_vec_for_scalar!(f64); from_vec_for_scalar!(String); from_vec_for_scalar!(BufferString); -from_vec_for_scalar!(bytes::Bytes); from_vec_for_scalar!(Buffer); diff --git a/vortex-scalar/src/scalar_type.rs b/vortex-scalar/src/scalar_type.rs index 2c27a7398..304e2e0bd 100644 --- a/vortex-scalar/src/scalar_type.rs +++ b/vortex-scalar/src/scalar_type.rs @@ -81,14 +81,6 @@ impl ScalarType for BufferString { scalar_type_for_vec!(BufferString); -impl ScalarType for bytes::Bytes { - fn dtype() -> DType { - DType::Binary(Nullability::NonNullable) - } -} - -scalar_type_for_vec!(bytes::Bytes); - impl ScalarType for Buffer { fn dtype() -> DType { DType::Binary(Nullability::NonNullable) diff --git a/vortex-scalar/src/serde/proto.rs b/vortex-scalar/src/serde/proto.rs index c236b3cd1..6e0586f79 100644 --- a/vortex-scalar/src/serde/proto.rs +++ b/vortex-scalar/src/serde/proto.rs @@ -146,7 +146,7 @@ fn deserialize_scalar_value(dtype: &DType, value: &pb::ScalarValue) -> VortexRes BufferString::from(v.clone()), ))), Kind::BytesValue(v) => Ok(ScalarValue(InnerScalarValue::Buffer(Buffer::from( - v.as_slice(), + v.clone(), )))), Kind::ListValue(v) => { let mut values = Vec::with_capacity(v.values.len());