From 6ff8878639e252dd4a245bf5c8d25e0b1e6b0770 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Thu, 18 Apr 2024 09:19:08 +0100 Subject: [PATCH] Arrays optionally have a buffer --- vortex-array/flatbuffers/array.fbs | 2 +- vortex-array/src/array/bool/mod.rs | 24 ++++++++++++----------- vortex-array/src/array/chunked/mod.rs | 8 +------- vortex-array/src/array/composite/array.rs | 1 - vortex-array/src/array/constant/mod.rs | 1 - vortex-array/src/array/primitive/mod.rs | 22 +++++++++++---------- vortex-array/src/array/sparse/mod.rs | 1 - vortex-array/src/array/struct/mod.rs | 1 - vortex-array/src/array/varbin/mod.rs | 8 +------- vortex-array/src/array/varbinview/mod.rs | 8 +------- vortex-array/src/data.rs | 14 ++++++------- vortex-array/src/implementation.rs | 20 ++++++++++--------- vortex-array/src/lib.rs | 8 ++++---- vortex-array/src/typed.rs | 4 ++-- vortex-array/src/view.rs | 13 ++++++------ vortex-dict/src/dict.rs | 1 - vortex-ipc/src/messages.rs | 14 ++++++------- vortex-ipc/src/writer.rs | 2 +- vortex-ree/src/ree.rs | 8 +------- 19 files changed, 67 insertions(+), 93 deletions(-) diff --git a/vortex-array/flatbuffers/array.fbs b/vortex-array/flatbuffers/array.fbs index 2dedd3927c..86961eedc6 100644 --- a/vortex-array/flatbuffers/array.fbs +++ b/vortex-array/flatbuffers/array.fbs @@ -6,10 +6,10 @@ enum Version: uint8 { table Array { version: Version = V0; + has_buffer: bool; encoding: uint16; metadata: [ubyte]; children: [Array]; - nbuffers: uint16; } root_type Array; diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index 30c73d3fe9..946fb3bea0 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -22,7 +22,7 @@ pub struct BoolMetadata { impl BoolArray<'_> { pub fn buffer(&self) -> &Buffer { - self.array().buffer(0).expect("missing buffer") + self.array().buffer().expect("missing buffer") } pub fn boolean_buffer(&self) -> BooleanBuffer { @@ -38,16 +38,18 @@ impl BoolArray<'_> { impl BoolArray<'_> { pub fn try_new(buffer: BooleanBuffer, validity: Validity) -> VortexResult { - Self::try_from_parts( - DType::Bool(validity.nullability()), - BoolMetadata { - validity: validity.to_metadata(buffer.len())?, - length: buffer.len(), - }, - vec![Buffer::Owned(buffer.into_inner())].into(), - validity.into_array_data().into_iter().collect_vec().into(), - HashMap::default(), - ) + Ok(Self { + typed: TypedArray::try_from_parts( + DType::Bool(validity.nullability()), + BoolMetadata { + validity: validity.to_metadata(buffer.len())?, + length: buffer.len(), + }, + Some(Buffer::Owned(buffer.into_inner())), + validity.into_array_data().into_iter().collect_vec().into(), + HashMap::default(), + )?, + }) } pub fn from_vec(bools: Vec, validity: Validity) -> Self { diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 057212ae91..26d269a609 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -48,13 +48,7 @@ impl ChunkedArray<'_> { let mut children = vec![chunk_ends.into_array_data()]; children.extend(chunks.iter().map(|a| a.to_array_data())); - Self::try_from_parts( - dtype, - ChunkedMetadata, - vec![].into(), - children.into(), - HashMap::default(), - ) + Self::try_from_parts(dtype, ChunkedMetadata, children.into(), HashMap::default()) } #[inline] diff --git a/vortex-array/src/array/composite/array.rs b/vortex-array/src/array/composite/array.rs index d87b2f075b..7ba765fdde 100644 --- a/vortex-array/src/array/composite/array.rs +++ b/vortex-array/src/array/composite/array.rs @@ -87,7 +87,6 @@ impl<'a> CompositeArray<'a> { underlying_dtype: underlying.dtype().clone(), underlying_metadata: metadata, }, - vec![].into(), vec![underlying.into_array_data()].into(), HashMap::default(), ) diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index 45dea8e70d..c5519bab4f 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -34,7 +34,6 @@ impl ConstantArray<'_> { scalar.dtype().clone(), ConstantMetadata { scalar, length }, vec![].into(), - vec![].into(), stats, ) .unwrap() diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index 4d40ef41ff..d6f064098c 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -34,7 +34,7 @@ impl PrimitiveArray<'_> { } pub fn buffer(&self) -> &Buffer { - self.array().buffer(0).expect("missing buffer") + self.array().buffer().expect("missing buffer") } pub fn scalar_buffer(&self) -> ScalarBuffer { @@ -51,15 +51,17 @@ impl PrimitiveArray<'_> { buffer: ScalarBuffer, validity: Validity, ) -> VortexResult { - Self::try_from_parts( - DType::from(T::PTYPE).with_nullability(validity.nullability()), - PrimitiveMetadata { - validity: validity.to_metadata(buffer.len())?, - }, - vec![Buffer::Owned(buffer.into_inner())].into(), - validity.into_array_data().into_iter().collect_vec().into(), - HashMap::default(), - ) + Ok(Self { + typed: TypedArray::try_from_parts( + DType::from(T::PTYPE).with_nullability(validity.nullability()), + PrimitiveMetadata { + validity: validity.to_metadata(buffer.len())?, + }, + Some(Buffer::Owned(buffer.into_inner())), + validity.into_array_data().into_iter().collect_vec().into(), + HashMap::default(), + )?, + }) } pub fn from_vec(values: Vec, validity: Validity) -> Self { diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 359b68eaf1..2b35cc18a9 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -56,7 +56,6 @@ impl<'a> SparseArray<'a> { len, fill_value, }, - vec![].into(), vec![indices.to_array_data(), values.to_array_data()].into(), HashMap::default(), ) diff --git a/vortex-array/src/array/struct/mod.rs b/vortex-array/src/array/struct/mod.rs index 0064492c29..aa174b83c9 100644 --- a/vortex-array/src/array/struct/mod.rs +++ b/vortex-array/src/array/struct/mod.rs @@ -67,7 +67,6 @@ impl StructArray<'_> { Self::try_from_parts( DType::Struct(names, field_dtypes), StructMetadata { length }, - vec![].into(), fields.into(), HashMap::default(), ) diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index dfd9ea7e26..ce5e8115dc 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -64,13 +64,7 @@ impl VarBinArray<'_> { children.push(a) } - Self::try_from_parts( - dtype, - metadata, - vec![].into(), - children.into(), - HashMap::default(), - ) + Self::try_from_parts(dtype, metadata, children.into(), HashMap::default()) } #[inline] diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index dd37ef7701..6ee34c976d 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -147,13 +147,7 @@ impl VarBinViewArray<'_> { children.push(a) } - Self::try_from_parts( - dtype, - metadata, - vec![].into(), - children.into(), - HashMap::default(), - ) + Self::try_from_parts(dtype, metadata, children.into(), HashMap::default()) } fn view_slice(&self) -> &[BinaryView] { diff --git a/vortex-array/src/data.rs b/vortex-array/src/data.rs index ab5bc61c9f..708d4853ce 100644 --- a/vortex-array/src/data.rs +++ b/vortex-array/src/data.rs @@ -16,7 +16,7 @@ pub struct ArrayData { encoding: EncodingRef, dtype: DType, metadata: Arc, - buffers: Arc<[OwnedBuffer]>, // Should this just be an Option, not an Arc? How many multi-buffer arrays are there? + buffer: Option, children: Arc<[ArrayData]>, stats_map: Arc>>, } @@ -26,7 +26,7 @@ impl ArrayData { encoding: EncodingRef, dtype: DType, metadata: Arc, - buffers: Arc<[OwnedBuffer]>, + buffer: Option, children: Arc<[ArrayData]>, statistics: HashMap, ) -> VortexResult { @@ -34,7 +34,7 @@ impl ArrayData { encoding, dtype, metadata, - buffers, + buffer, children, stats_map: Arc::new(RwLock::new(statistics)), }; @@ -59,8 +59,8 @@ impl ArrayData { &self.metadata } - pub fn buffers(&self) -> &[Buffer] { - &self.buffers + pub fn buffer(&self) -> Option<&Buffer> { + self.buffer.as_ref() } pub fn child(&self, index: usize, dtype: &DType) -> Option<&ArrayData> { @@ -88,11 +88,11 @@ impl ArrayData { /// Return the buffer offsets and the total length of all buffers, assuming the given alignment. /// This includes all child buffers. pub fn all_buffer_offsets(&self, alignment: usize) -> Vec { - let mut offsets = Vec::with_capacity(self.buffers.len() + 1); + let mut offsets = vec![]; let mut offset = 0; for col_data in self.depth_first_traversal() { - for buffer in col_data.buffers() { + if let Some(buffer) = col_data.buffer() { offsets.push(offset as u64); let buffer_size = buffer.len(); diff --git a/vortex-array/src/implementation.rs b/vortex-array/src/implementation.rs index 2ee0ad9a93..4e1e08ea18 100644 --- a/vortex-array/src/implementation.rs +++ b/vortex-array/src/implementation.rs @@ -1,4 +1,4 @@ -use vortex_error::{VortexError, VortexResult}; +use vortex_error::{vortex_bail, VortexError, VortexResult}; use vortex_schema::DType; use crate::buffer::{Buffer, OwnedBuffer}; @@ -50,7 +50,6 @@ macro_rules! impl_encoding { EncodingRef, VORTEX_ENCODINGS, }; - use $crate::buffer::OwnedBuffer; use $crate::stats::Stat; use $crate::scalar::Scalar; use std::any::Any; @@ -88,10 +87,10 @@ macro_rules! impl_encoding { pub fn try_from_parts( dtype: DType, metadata: [<$Name Metadata>], - buffers: Arc<[OwnedBuffer]>, children: Arc<[ArrayData]>, - stats: HashMap) -> VortexResult { - Ok(Self { typed: TypedArray::try_from_parts(dtype, metadata, buffers, children, stats)? }) + stats: HashMap, + ) -> VortexResult { + Ok(Self { typed: TypedArray::try_from_parts(dtype, metadata, None, children, stats)? }) } } impl<'a> GetArrayMetadata for [<$Name Array>]<'a> { @@ -229,7 +228,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat Array::DataRef(d) => d.clone(), Array::View(_) => { struct Visitor { - buffers: Vec, + buffer: Option, children: Vec, } impl ArrayVisitor for Visitor { @@ -239,12 +238,15 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat } fn visit_buffer(&mut self, buffer: &Buffer) -> VortexResult<()> { - self.buffers.push(buffer.to_static()); + if self.buffer.is_some() { + vortex_bail!("Multiple buffers found in view") + } + self.buffer = Some(buffer.to_static()); Ok(()) } } let mut visitor = Visitor { - buffers: vec![], + buffer: None, children: vec![], }; array.with_dyn(|a| a.accept(&mut visitor).unwrap()); @@ -252,7 +254,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat encoding, array.dtype().clone(), metadata, - visitor.buffers.into(), + visitor.buffer, visitor.children.into(), stats, ) diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index a93daa593a..ea83bd18cf 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -106,11 +106,11 @@ impl Array<'_> { } } - pub fn buffer(&self, idx: usize) -> Option<&Buffer> { + pub fn buffer(&self) -> Option<&Buffer> { match self { - Array::Data(d) => d.buffers().get(idx), - Array::DataRef(d) => d.buffers().get(idx), - Array::View(v) => v.buffers().get(idx), + Array::Data(d) => d.buffer(), + Array::DataRef(d) => d.buffer(), + Array::View(v) => v.buffer(), } } } diff --git a/vortex-array/src/typed.rs b/vortex-array/src/typed.rs index bbbd6894d7..24bd78b559 100644 --- a/vortex-array/src/typed.rs +++ b/vortex-array/src/typed.rs @@ -19,7 +19,7 @@ impl TypedArray<'_, D> { pub fn try_from_parts( dtype: DType, metadata: D::Metadata, - buffers: Arc<[OwnedBuffer]>, + buffer: Option, children: Arc<[ArrayData]>, stats: HashMap, ) -> VortexResult { @@ -27,7 +27,7 @@ impl TypedArray<'_, D> { D::ENCODING, dtype, Arc::new(metadata.clone()), - buffers, + buffer, children, stats, )?); diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index be59ca64ed..eeec5cd974 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -116,23 +116,22 @@ impl<'v> ArrayView<'v> { } } - /// The number of buffers used by the current Array. - pub fn nbuffers(&self) -> usize { - self.array.nbuffers() as usize + /// Whether the current Array makes use of a buffer + pub fn has_buffer(&self) -> bool { + self.array.has_buffer() } /// The number of buffers used by the current Array and all its children. fn cumulative_nbuffers(array: fb::Array) -> usize { - let mut nbuffers = array.nbuffers() as usize; + let mut nbuffers = if array.has_buffer() { 1 } else { 0 }; for child in array.children().unwrap_or_default() { nbuffers += Self::cumulative_nbuffers(child) } nbuffers } - pub fn buffers(&self) -> &'v [Buffer] { - // This is only true for the immediate current node? - self.buffers[0..self.nbuffers()].as_ref() + pub fn buffer(&self) -> Option<&'v Buffer<'v>> { + self.has_buffer().then(|| &self.buffers[0]) } pub fn statistics(&self) -> &dyn Statistics { diff --git a/vortex-dict/src/dict.rs b/vortex-dict/src/dict.rs index 659d894b98..127113dddc 100644 --- a/vortex-dict/src/dict.rs +++ b/vortex-dict/src/dict.rs @@ -27,7 +27,6 @@ impl DictArray<'_> { DictMetadata { codes_dtype: codes.dtype().clone(), }, - vec![].into(), vec![values.to_array_data(), codes.to_array_data()].into(), HashMap::new(), ) diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index 4bbd5cead7..f3d8ae9620 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -117,14 +117,14 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> { &self, fbb: &mut FlatBufferBuilder<'fb>, ) -> WIPOffset> { - let col_data = self.1; - let array = Some(IPCArray(self.0, col_data).write_flatbuffer(fbb)); + let array_data = self.1; + let array = Some(IPCArray(self.0, array_data).write_flatbuffer(fbb)); // Walk the ColumnData depth-first to compute the buffer offsets. - let mut buffers = Vec::with_capacity(col_data.buffers().len()); + let mut buffers = vec![]; let mut offset = 0; - for col_data in col_data.depth_first_traversal() { - for buffer in col_data.buffers() { + for array_data in array_data.depth_first_traversal() { + if let Some(buffer) = array_data.buffer() { buffers.push(fb::Buffer::new( offset as u64, buffer.len() as u64, @@ -180,16 +180,14 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { .collect_vec(); let children = Some(fbb.create_vector(&children)); - let nbuffers = column_data.buffers().len() as u16; // TODO(ngates): checked cast - fba::Array::create( fbb, &fba::ArrayArgs { version: Default::default(), + has_buffer: column_data.buffer().is_some(), encoding, metadata, children, - nbuffers, }, ) } diff --git a/vortex-ipc/src/writer.rs b/vortex-ipc/src/writer.rs index ce3529f106..f99d724d18 100644 --- a/vortex-ipc/src/writer.rs +++ b/vortex-ipc/src/writer.rs @@ -61,7 +61,7 @@ impl StreamWriter { let mut current_offset = 0; for (buffer, &buffer_end) in data .depth_first_traversal() - .flat_map(|data| data.buffers().iter()) + .flat_map(|data| data.buffer().into_iter()) .zip_eq(buffer_offsets.iter().skip(1)) { self.write.write_all(buffer.as_slice())?; diff --git a/vortex-ree/src/ree.rs b/vortex-ree/src/ree.rs index 29ca2d6c8a..971b264da8 100644 --- a/vortex-ree/src/ree.rs +++ b/vortex-ree/src/ree.rs @@ -59,13 +59,7 @@ impl REEArray<'_> { children.push(a) } - Self::try_from_parts( - dtype, - metadata, - vec![].into(), - children.into(), - HashMap::new(), - ) + Self::try_from_parts(dtype, metadata, children.into(), HashMap::new()) } pub fn find_physical_index(&self, index: usize) -> VortexResult {