From 7f19f5661b5bda44916d60ee065b5e31736159e8 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 May 2024 17:22:02 +0100 Subject: [PATCH 1/7] Scalar2 --- Cargo.lock | 1 + Cargo.toml | 1 + vortex-buffer/Cargo.toml | 6 +++- vortex-buffer/README.md | 5 ++++ vortex-buffer/src/lib.rs | 54 +++++++++--------------------------- vortex-scalar/src/lib.rs | 1 + vortex-scalar/src/scalar2.rs | 8 ++++++ 7 files changed, 34 insertions(+), 42 deletions(-) create mode 100644 vortex-buffer/README.md create mode 100644 vortex-scalar/src/scalar2.rs diff --git a/Cargo.lock b/Cargo.lock index 42da118507..232c793729 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4941,6 +4941,7 @@ name = "vortex-buffer" version = "0.1.0" dependencies = [ "arrow-buffer", + "bytes", "vortex-dtype", ] diff --git a/Cargo.toml b/Cargo.toml index 3d9ed5a12d..22e8c20135 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ arrow-ipc = "51.0.0" arrow-schema = "51.0.0" arrow-select = "51.0.0" bindgen = "0.69.4" +bytes = "1.6.0" bzip2 = "0.4.4" criterion = { version = "0.5.1", features = ["html_reports"] } croaring = "1.0.1" diff --git a/vortex-buffer/Cargo.toml b/vortex-buffer/Cargo.toml index b5a4256acd..28bbecc304 100644 --- a/vortex-buffer/Cargo.toml +++ b/vortex-buffer/Cargo.toml @@ -11,8 +11,12 @@ edition.workspace = true rust-version.workspace = true [dependencies] -arrow-buffer = { workspace = true } +arrow-buffer = { workspace = true, optional = true } +bytes = { workspace = true } vortex-dtype = { path = "../vortex-dtype" } [lints] workspace = true + +[features] +arrow = ['arrow-buffer'] \ No newline at end of file diff --git a/vortex-buffer/README.md b/vortex-buffer/README.md new file mode 100644 index 0000000000..428ee51084 --- /dev/null +++ b/vortex-buffer/README.md @@ -0,0 +1,5 @@ +# Vortex Buffer + +For now, a Vortex buffer is implemented as a very thin wrapper around the Tokio bytes crate. +In the future, we may re-implement this ourselves to have more control over alignment +(see https://github.com/tokio-rs/bytes/issues/437) diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index ed7b1b338e..ce015d42d6 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -1,59 +1,31 @@ -use arrow_buffer::Buffer as ArrowBuffer; +use bytes::Bytes; use vortex_dtype::{match_each_native_ptype, NativePType}; #[derive(Debug, Clone)] -pub enum Buffer<'a> { - Owned(ArrowBuffer), - View(&'a [u8]), -} +pub struct Buffer(Bytes); -pub type OwnedBuffer = Buffer<'static>; +unsafe impl Send for Buffer {} +unsafe impl Sync for Buffer {} -impl Buffer<'_> { +impl Buffer { + #[inline] pub fn len(&self) -> usize { - match self { - Buffer::Owned(buffer) => buffer.len(), - Buffer::View(slice) => slice.len(), - } + self.0.len() } pub fn is_empty(&self) -> bool { self.len() == 0 } - pub fn as_slice(&self) -> &[u8] { - match self { - Buffer::Owned(buffer) => buffer.as_slice(), - Buffer::View(slice) => slice, - } - } - pub fn typed_data(&self) -> &[T] { - match self { - Buffer::Owned(buffer) => unsafe { - match_each_native_ptype!(T::PTYPE, |$T| { - std::mem::transmute(buffer.typed_data::<$T>()) - }) - }, - Buffer::View(slice) => { - // From ArrowBuffer::typed_data - let (prefix, offsets, suffix) = unsafe { slice.align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - offsets - } - } + // Based on ArrowBuffer::typed_data + let (prefix, offsets, suffix) = unsafe { self.0.align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + offsets } - pub fn to_static(&self) -> OwnedBuffer { - match self { - Buffer::Owned(d) => Buffer::Owned(d.clone()), - Buffer::View(_) => Buffer::Owned(self.into()), - } - } -} - -impl<'a> Buffer<'a> { - pub fn into_vec(self) -> Result, Buffer<'a>> { + pub fn into_vec(self) -> Result, Buffer> { + let mut bytes: Bytes = self.0; match self { Buffer::Owned(buffer) => match_each_native_ptype!(T::PTYPE, |$T| { buffer diff --git a/vortex-scalar/src/lib.rs b/vortex-scalar/src/lib.rs index e6a4279169..f0b8cff8aa 100644 --- a/vortex-scalar/src/lib.rs +++ b/vortex-scalar/src/lib.rs @@ -18,6 +18,7 @@ mod extension; mod list; mod null; mod primitive; +mod scalar2; mod serde; mod struct_; mod utf8; diff --git a/vortex-scalar/src/scalar2.rs b/vortex-scalar/src/scalar2.rs new file mode 100644 index 0000000000..9e0a2d91e1 --- /dev/null +++ b/vortex-scalar/src/scalar2.rs @@ -0,0 +1,8 @@ +use vortex_dtype::DType; + +use crate::value::ScalarValue; + +pub struct Scalar<'a> { + dtype: DType, + value: ScalarValue<'a>, +} From 22df6c1484f98475f5fc11c39d0d88aa5cb0e787 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 May 2024 18:01:34 +0100 Subject: [PATCH 2/7] Owned buffer --- Cargo.lock | 2 + vortex-array/src/array/bool/mod.rs | 2 +- .../array/primitive/compute/as_contiguous.rs | 2 +- vortex-array/src/array/primitive/mod.rs | 6 +- vortex-array/src/array/varbin/mod.rs | 2 +- vortex-array/src/data.rs | 8 +- vortex-array/src/implementation.rs | 6 +- vortex-array/src/lib.rs | 4 +- vortex-array/src/typed.rs | 4 +- vortex-array/src/view.rs | 4 +- vortex-buffer/Cargo.toml | 5 +- vortex-buffer/src/lib.rs | 79 +++++++++++-------- vortex-ipc/src/reader.rs | 4 +- vortex-ipc/src/writer.rs | 2 +- vortex-roaring/src/boolean/mod.rs | 4 +- vortex-roaring/src/integer/mod.rs | 4 +- vortex-scalar/Cargo.toml | 2 + vortex-scalar/src/scalar2.rs | 34 +++++++- 18 files changed, 108 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 232c793729..af8518321c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5091,9 +5091,11 @@ version = "0.1.0" dependencies = [ "flatbuffers", "flatc", + "flexbuffers", "itertools 0.12.1", "num-traits", "serde", + "vortex-buffer", "vortex-dtype", "vortex-error", "vortex-flatbuffers", diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index e878d6352f..55d8e4ebbc 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -44,7 +44,7 @@ impl BoolArray<'_> { validity: validity.to_metadata(buffer.len())?, length: buffer.len(), }, - Some(Buffer::Owned(buffer.into_inner())), + Some(Buffer::from(buffer.into_inner())), validity.into_array_data().into_iter().collect_vec().into(), StatsSet::new(), )?, diff --git a/vortex-array/src/array/primitive/compute/as_contiguous.rs b/vortex-array/src/array/primitive/compute/as_contiguous.rs index 583d652578..19d50b056e 100644 --- a/vortex-array/src/array/primitive/compute/as_contiguous.rs +++ b/vortex-array/src/array/primitive/compute/as_contiguous.rs @@ -20,7 +20,7 @@ impl AsContiguousFn for PrimitiveArray<'_> { arrays.iter().map(|a| a.len()).sum::() * self.ptype().byte_width(), ); for array in arrays { - buffer.extend_from_slice(array.as_primitive().buffer().as_slice()) + buffer.extend_from_slice(array.as_primitive().buffer().as_ref()) } match_each_native_ptype!(self.ptype(), |$T| { Ok(PrimitiveArray::try_new(ScalarBuffer::<$T>::from(buffer), validity) diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index 9768814b08..fc313926df 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -34,7 +34,7 @@ impl PrimitiveArray<'_> { PrimitiveMetadata { validity: validity.to_metadata(buffer.len())?, }, - Some(Buffer::Owned(buffer.into_inner())), + Some(Buffer::from(buffer.into_inner())), validity.into_array_data().into_iter().collect_vec().into(), StatsSet::new(), )?, @@ -134,10 +134,8 @@ impl PrimitiveArray<'_> { } Ok(Self::from_vec(own_values, validity)) } -} -impl<'a> PrimitiveArray<'a> { - pub fn into_buffer(self) -> Buffer<'a> { + pub fn into_buffer(self) -> Buffer { self.into_array().into_buffer().unwrap() } } diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index f97b128d46..a7239828c4 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -151,7 +151,7 @@ impl VarBinArray<'_> { let start = self.offset_at(index); let end = self.offset_at(index + 1); let sliced = slice(&self.bytes(), start, end)?; - Ok(sliced.flatten_primitive()?.buffer().as_slice().to_vec()) + Ok(sliced.flatten_primitive()?.buffer().as_ref().to_vec()) } } diff --git a/vortex-array/src/data.rs b/vortex-array/src/data.rs index fc944d4c0a..8b64dba74a 100644 --- a/vortex-array/src/data.rs +++ b/vortex-array/src/data.rs @@ -1,6 +1,6 @@ use std::sync::{Arc, RwLock}; -use vortex_buffer::{Buffer, OwnedBuffer}; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexResult}; use vortex_scalar::Scalar; @@ -14,7 +14,7 @@ pub struct ArrayData { encoding: EncodingRef, dtype: DType, // FIXME(ngates): Arc? metadata: Arc, - buffer: Option, + buffer: Option, children: Arc<[ArrayData]>, stats_map: Arc>, } @@ -24,7 +24,7 @@ impl ArrayData { encoding: EncodingRef, dtype: DType, metadata: Arc, - buffer: Option, + buffer: Option, children: Arc<[ArrayData]>, statistics: StatsSet, ) -> VortexResult { @@ -61,7 +61,7 @@ impl ArrayData { self.buffer.as_ref() } - pub fn into_buffer(self) -> Option { + pub fn into_buffer(self) -> Option { self.buffer } diff --git a/vortex-array/src/implementation.rs b/vortex-array/src/implementation.rs index c8dc73da20..6bf426dbbf 100644 --- a/vortex-array/src/implementation.rs +++ b/vortex-array/src/implementation.rs @@ -1,4 +1,4 @@ -use vortex_buffer::{Buffer, OwnedBuffer}; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexError, VortexResult}; @@ -221,7 +221,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat Array::Data(d) => d, Array::View(_) => { struct Visitor { - buffer: Option, + buffer: Option, children: Vec, } impl ArrayVisitor for Visitor { @@ -234,7 +234,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat if self.buffer.is_some() { vortex_bail!("Multiple buffers found in view") } - self.buffer = Some(buffer.to_static()); + self.buffer = Some(buffer.clone()); Ok(()) } } diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index e47a5c5d17..028038e860 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -105,10 +105,10 @@ impl Array<'_> { } impl<'a> Array<'a> { - pub fn into_buffer(self) -> Option> { + pub fn into_buffer(self) -> Option { match self { Array::Data(d) => d.into_buffer(), - Array::View(v) => v.buffer().map(|b| b.to_static()), + Array::View(v) => v.buffer().cloned(), } } } diff --git a/vortex-array/src/typed.rs b/vortex-array/src/typed.rs index 54f57d24ed..706cfbcbd4 100644 --- a/vortex-array/src/typed.rs +++ b/vortex-array/src/typed.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use vortex_buffer::OwnedBuffer; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexError, VortexResult}; @@ -17,7 +17,7 @@ impl TypedArray<'_, D> { pub fn try_from_parts( dtype: DType, metadata: D::Metadata, - buffer: Option, + buffer: Option, children: Arc<[ArrayData]>, stats: StatsSet, ) -> VortexResult { diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 801c68ba17..6a5f7388f6 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -16,7 +16,7 @@ pub struct ArrayView<'v> { encoding: EncodingRef, dtype: &'v DType, array: fb::Array<'v>, - buffers: &'v [Buffer<'v>], + buffers: &'v [Buffer], ctx: &'v ViewContext, // TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array // metadata, but only the buffers from the selected columns. Therefore we need to know @@ -131,7 +131,7 @@ impl<'v> ArrayView<'v> { nbuffers } - pub fn buffer(&self) -> Option<&'v Buffer<'v>> { + pub fn buffer(&self) -> Option<&'v Buffer> { self.has_buffer().then(|| &self.buffers[0]) } diff --git a/vortex-buffer/Cargo.toml b/vortex-buffer/Cargo.toml index 28bbecc304..b5f2f98033 100644 --- a/vortex-buffer/Cargo.toml +++ b/vortex-buffer/Cargo.toml @@ -11,12 +11,9 @@ edition.workspace = true rust-version.workspace = true [dependencies] -arrow-buffer = { workspace = true, optional = true } +arrow-buffer = { workspace = true } bytes = { workspace = true } vortex-dtype = { path = "../vortex-dtype" } [lints] workspace = true - -[features] -arrow = ['arrow-buffer'] \ No newline at end of file diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index ce015d42d6..cd036dad41 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -1,72 +1,87 @@ -use bytes::Bytes; +use arrow_buffer::Buffer as ArrowBuffer; use vortex_dtype::{match_each_native_ptype, NativePType}; #[derive(Debug, Clone)] -pub struct Buffer(Bytes); +pub enum Buffer { + // TODO(ngates): we could add Aligned(Arc) from aligned-vec package + Arrow(ArrowBuffer), + Bytes(bytes::Bytes), +} unsafe impl Send for Buffer {} unsafe impl Sync for Buffer {} impl Buffer { - #[inline] pub fn len(&self) -> usize { - self.0.len() + match self { + Buffer::Arrow(b) => b.len(), + Buffer::Bytes(b) => b.len(), + } } pub fn is_empty(&self) -> bool { - self.len() == 0 + match self { + Buffer::Arrow(b) => b.is_empty(), + Buffer::Bytes(b) => b.is_empty(), + } } pub fn typed_data(&self) -> &[T] { - // Based on ArrowBuffer::typed_data - let (prefix, offsets, suffix) = unsafe { self.0.align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - offsets + match self { + Buffer::Arrow(buffer) => unsafe { + match_each_native_ptype!(T::PTYPE, |$T| { + std::mem::transmute(buffer.typed_data::<$T>()) + }) + }, + Buffer::Bytes(bytes) => { + // From ArrowBuffer::typed_data + let (prefix, offsets, suffix) = unsafe { bytes.align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + offsets + } + } } pub fn into_vec(self) -> Result, Buffer> { - let mut bytes: Bytes = self.0; match self { - Buffer::Owned(buffer) => match_each_native_ptype!(T::PTYPE, |$T| { + Buffer::Arrow(buffer) => match_each_native_ptype!(T::PTYPE, |$T| { buffer .into_vec() .map(|vec| unsafe { std::mem::transmute::, Vec>(vec) }) - .map_err(Buffer::Owned) + .map_err(Buffer::Arrow) }), - Buffer::View(_) => Err(self), + // Cannot always convert bytes into a mutable vec + Buffer::Bytes(_) => Err(self), } } } -impl From for OwnedBuffer { - fn from(value: ArrowBuffer) -> Self { - Buffer::Owned(value) +impl AsRef<[u8]> for Buffer { + fn as_ref(&self) -> &[u8] { + match self { + Buffer::Arrow(b) => b.as_ref(), + Buffer::Bytes(b) => b.as_ref(), + } } } -impl From> for ArrowBuffer { - fn from(value: Buffer<'_>) -> Self { - match value { - Buffer::Owned(b) => b, - Buffer::View(_) => ArrowBuffer::from(&value), - } +impl From> for Buffer { + fn from(value: Vec) -> Self { + // We prefer Arrow since it retains mutability + Buffer::Arrow(ArrowBuffer::from_vec(value)) } } -impl From<&Buffer<'_>> for ArrowBuffer { - fn from(value: &Buffer<'_>) -> Self { - match value { - Buffer::Owned(b) => b.clone(), - // FIXME(ngates): this conversion loses alignment information since go via u8. - Buffer::View(v) => ArrowBuffer::from_vec(v.to_vec()), - } +impl From for Buffer { + fn from(value: ArrowBuffer) -> Self { + Buffer::Arrow(value) } } -impl PartialEq for Buffer<'_> { +impl PartialEq for Buffer { fn eq(&self, other: &Self) -> bool { - self.as_slice().eq(other.as_slice()) + self.as_ref().eq(other.as_ref()) } } -impl Eq for Buffer<'_> {} +impl Eq for Buffer {} diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index c278eb6b4c..7a5f2d6ab0 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -128,7 +128,7 @@ pub struct StreamArrayReader<'a, R: Read> { read: &'a mut R, messages: &'a mut StreamMessageReader, dtype: DType, - buffers: Vec>, + buffers: Vec, row_offset: usize, } @@ -263,7 +263,7 @@ impl<'iter, R: Read> FallibleLendingIterator for StreamArrayReader<'iter, R> { let mut bytes = Vec::with_capacity(buffer.length() as usize); self.read.read_into(buffer.length(), &mut bytes)?; let arrow_buffer = ArrowBuffer::from_vec(bytes); - self.buffers.push(Buffer::Owned(arrow_buffer)); + self.buffers.push(Buffer::from(arrow_buffer)); offset = buffer.offset() + buffer.length(); } diff --git a/vortex-ipc/src/writer.rs b/vortex-ipc/src/writer.rs index 799dadcb91..9bfbdb78f4 100644 --- a/vortex-ipc/src/writer.rs +++ b/vortex-ipc/src/writer.rs @@ -70,7 +70,7 @@ impl StreamWriter { .zip_eq(buffer_offsets.iter().skip(1)) { let buffer_len = buffer.len(); - self.write.write_all(buffer.as_slice())?; + self.write.write_all(buffer.as_ref())?; let padding = (buffer_end as usize) - current_offset - buffer_len; self.write.write_all(&vec![0; padding])?; current_offset = buffer_end as usize; diff --git a/vortex-roaring/src/boolean/mod.rs b/vortex-roaring/src/boolean/mod.rs index 3125bb4f1c..656688df7d 100644 --- a/vortex-roaring/src/boolean/mod.rs +++ b/vortex-roaring/src/boolean/mod.rs @@ -33,7 +33,7 @@ impl RoaringBoolArray<'_> { typed: TypedArray::try_from_parts( DType::Bool(NonNullable), RoaringBoolMetadata { length }, - Some(Buffer::Owned(bitmap.serialize::().into())), + Some(Buffer::from(bitmap.serialize::())), vec![].into(), StatsSet::new(), )?, @@ -47,7 +47,7 @@ impl RoaringBoolArray<'_> { self.array() .buffer() .expect("RoaringBoolArray buffer is missing") - .as_slice(), + .as_ref(), ) } diff --git a/vortex-roaring/src/integer/mod.rs b/vortex-roaring/src/integer/mod.rs index 01e3a7e96f..5d83aac4d1 100644 --- a/vortex-roaring/src/integer/mod.rs +++ b/vortex-roaring/src/integer/mod.rs @@ -40,7 +40,7 @@ impl RoaringIntArray<'_> { ptype, length: bitmap.statistics().cardinality as usize, }, - Some(Buffer::Owned(bitmap.serialize::().into())), + Some(Buffer::from(bitmap.serialize::())), vec![].into(), StatsSet::new(), )?, @@ -53,7 +53,7 @@ impl RoaringIntArray<'_> { self.array() .buffer() .expect("RoaringBoolArray buffer is missing") - .as_slice(), + .as_ref(), ) } diff --git a/vortex-scalar/Cargo.toml b/vortex-scalar/Cargo.toml index 079e7a5a8c..abc30ffc6a 100644 --- a/vortex-scalar/Cargo.toml +++ b/vortex-scalar/Cargo.toml @@ -13,9 +13,11 @@ rust-version = { workspace = true } [dependencies] flatbuffers = { workspace = true } +flexbuffers = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } serde = { workspace = true, optional = true } +vortex-buffer = { path = "../vortex-buffer" } vortex-dtype = { path = "../vortex-dtype" } vortex-error = { path = "../vortex-error" } vortex-flatbuffers = { path = "../vortex-flatbuffers" } diff --git a/vortex-scalar/src/scalar2.rs b/vortex-scalar/src/scalar2.rs index 9e0a2d91e1..b363bb87dd 100644 --- a/vortex-scalar/src/scalar2.rs +++ b/vortex-scalar/src/scalar2.rs @@ -1,8 +1,36 @@ +#![allow(dead_code)] + +use serde::Deserialize; +use vortex_buffer::Buffer; use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexError, VortexResult}; + +pub struct Scalar { + dtype: DType, + buffer: Buffer, +} -use crate::value::ScalarValue; +impl Scalar { + pub(crate) fn flexbuffer(&self) -> VortexResult> { + Ok(flexbuffers::Reader::get_root(self.buffer.as_ref())?) + } +} -pub struct Scalar<'a> { +pub struct BoolScalar { dtype: DType, - value: ScalarValue<'a>, + value: Option, +} + +impl TryFrom for BoolScalar { + type Error = VortexError; + + fn try_from(value: Scalar) -> Result { + if !matches!(&value.dtype, &DType::Bool(_)) { + vortex_bail!(MismatchedTypes: "bool", &value.dtype); + } + Ok(Self { + dtype: value.dtype.clone(), + value: Option::::deserialize(value.flexbuffer()?)?, + }) + } } From d1ba91972768389953f98d771a54db480ab82afb Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 May 2024 18:54:01 +0100 Subject: [PATCH 3/7] Buffer2 --- vortex-scalar/src/scalar2.rs | 36 ------------------------------------ 1 file changed, 36 deletions(-) delete mode 100644 vortex-scalar/src/scalar2.rs diff --git a/vortex-scalar/src/scalar2.rs b/vortex-scalar/src/scalar2.rs deleted file mode 100644 index b363bb87dd..0000000000 --- a/vortex-scalar/src/scalar2.rs +++ /dev/null @@ -1,36 +0,0 @@ -#![allow(dead_code)] - -use serde::Deserialize; -use vortex_buffer::Buffer; -use vortex_dtype::DType; -use vortex_error::{vortex_bail, VortexError, VortexResult}; - -pub struct Scalar { - dtype: DType, - buffer: Buffer, -} - -impl Scalar { - pub(crate) fn flexbuffer(&self) -> VortexResult> { - Ok(flexbuffers::Reader::get_root(self.buffer.as_ref())?) - } -} - -pub struct BoolScalar { - dtype: DType, - value: Option, -} - -impl TryFrom for BoolScalar { - type Error = VortexError; - - fn try_from(value: Scalar) -> Result { - if !matches!(&value.dtype, &DType::Bool(_)) { - vortex_bail!(MismatchedTypes: "bool", &value.dtype); - } - Ok(Self { - dtype: value.dtype.clone(), - value: Option::::deserialize(value.flexbuffer()?)?, - }) - } -} From 9015ce0638f98ec54f75fb9abb9cae34542e1782 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 May 2024 18:54:53 +0100 Subject: [PATCH 4/7] Buffer2 --- vortex-array/src/array/primitive/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index fc313926df..bc895cf6f5 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -136,7 +136,9 @@ impl PrimitiveArray<'_> { } pub fn into_buffer(self) -> Buffer { - self.into_array().into_buffer().unwrap() + self.into_array() + .into_buffer() + .expect("PrimitiveArray must have a buffer") } } From eeaf2d4ab028d2c2e0012be7143a64dec0629d94 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 May 2024 18:56:10 +0100 Subject: [PATCH 5/7] Buffer2 --- vortex-scalar/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/vortex-scalar/src/lib.rs b/vortex-scalar/src/lib.rs index f0b8cff8aa..e6a4279169 100644 --- a/vortex-scalar/src/lib.rs +++ b/vortex-scalar/src/lib.rs @@ -18,7 +18,6 @@ mod extension; mod list; mod null; mod primitive; -mod scalar2; mod serde; mod struct_; mod utf8; From f75d1ea233185d8bf30fb9c710d2973824231f99 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 May 2024 19:13:31 +0100 Subject: [PATCH 6/7] Buffer2 --- vortex-array/src/array/varbin/compute/mod.rs | 9 +++++++-- vortex-array/src/array/varbin/mod.rs | 5 +++-- vortex-dict/src/compress.rs | 5 ++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index 9ac0df46f5..8a56aa9535 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -136,8 +136,13 @@ impl AsArrowArray for VarBinArray<'_> { impl ScalarAtFn for VarBinArray<'_> { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { - self.bytes_at(index) - .map(|bytes| varbin_scalar(bytes, self.dtype())) + Ok(varbin_scalar( + self.bytes_at(index)? + // TODO(ngates): update to use buffer when we refactor scalars. + .into_vec::() + .unwrap_or_else(|b| b.as_ref().to_vec()), + self.dtype(), + )) } else { Ok(Scalar::null(self.dtype())) } diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index a7239828c4..20058f16b8 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -19,6 +19,7 @@ mod flatten; mod stats; pub use stats::compute_stats; +use vortex_buffer::Buffer; use crate::array::primitive::PrimitiveArray; @@ -147,11 +148,11 @@ impl VarBinArray<'_> { }) } - pub fn bytes_at(&self, index: usize) -> VortexResult> { + pub fn bytes_at(&self, index: usize) -> VortexResult { let start = self.offset_at(index); let end = self.offset_at(index + 1); let sliced = slice(&self.bytes(), start, end)?; - Ok(sliced.flatten_primitive()?.buffer().as_ref().to_vec()) + Ok(sliced.flatten_primitive()?.buffer().clone()) } } diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index 5a53c298bc..3d8657e2d0 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -334,7 +334,10 @@ mod test { codes.buffer().typed_data::(), &[1, 0, 2, 1, 0, 3, 2, 0] ); - assert_eq!(String::from_utf8(values.bytes_at(0).unwrap()).unwrap(), ""); + assert_eq!( + str::from_utf8(values.bytes_at(0).unwrap().as_ref()).unwrap(), + "" + ); values .with_iterator(|iter| { assert_eq!( From 5e25ee9a5c97a4554ba954c17594e2c2ba8e1352 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 6 May 2024 19:42:24 +0100 Subject: [PATCH 7/7] Buffer2 --- .../src/array/primitive/compute/as_contiguous.rs | 2 +- vortex-buffer/src/lib.rs | 13 +++++++++++++ vortex-dict/src/compress.rs | 5 +---- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/vortex-array/src/array/primitive/compute/as_contiguous.rs b/vortex-array/src/array/primitive/compute/as_contiguous.rs index 19d50b056e..ac2f0f6bd9 100644 --- a/vortex-array/src/array/primitive/compute/as_contiguous.rs +++ b/vortex-array/src/array/primitive/compute/as_contiguous.rs @@ -20,7 +20,7 @@ impl AsContiguousFn for PrimitiveArray<'_> { arrays.iter().map(|a| a.len()).sum::() * self.ptype().byte_width(), ); for array in arrays { - buffer.extend_from_slice(array.as_primitive().buffer().as_ref()) + buffer.extend_from_slice(array.as_primitive().buffer()) } match_each_native_ptype!(self.ptype(), |$T| { Ok(PrimitiveArray::try_new(ScalarBuffer::<$T>::from(buffer), validity) diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index cd036dad41..c0415e73d5 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -1,3 +1,5 @@ +use std::ops::Deref; + use arrow_buffer::Buffer as ArrowBuffer; use vortex_dtype::{match_each_native_ptype, NativePType}; @@ -56,6 +58,17 @@ impl Buffer { } } +impl Deref for Buffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + match self { + Buffer::Arrow(b) => b.deref(), + Buffer::Bytes(b) => b.deref(), + } + } +} + impl AsRef<[u8]> for Buffer { fn as_ref(&self) -> &[u8] { match self { diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index 3d8657e2d0..4f9059391c 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -334,10 +334,7 @@ mod test { codes.buffer().typed_data::(), &[1, 0, 2, 1, 0, 3, 2, 0] ); - assert_eq!( - str::from_utf8(values.bytes_at(0).unwrap().as_ref()).unwrap(), - "" - ); + assert_eq!(str::from_utf8(&values.bytes_at(0).unwrap()).unwrap(), ""); values .with_iterator(|iter| { assert_eq!(