Skip to content

Commit

Permalink
Arrays optionally have a buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Apr 18, 2024
1 parent 2bcaa2e commit 6ff8878
Show file tree
Hide file tree
Showing 19 changed files with 67 additions and 93 deletions.
2 changes: 1 addition & 1 deletion vortex-array/flatbuffers/array.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ enum Version: uint8 {

table Array {
version: Version = V0;
has_buffer: bool;
encoding: uint16;
metadata: [ubyte];
children: [Array];
nbuffers: uint16;
}

root_type Array;
24 changes: 13 additions & 11 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct BoolMetadata {

impl BoolArray<'_> {
pub fn buffer(&self) -> &Buffer {
self.array().buffer(0).expect("missing buffer")
self.array().buffer().expect("missing buffer")
}

pub fn boolean_buffer(&self) -> BooleanBuffer {
Expand All @@ -38,16 +38,18 @@ impl BoolArray<'_> {

impl BoolArray<'_> {
pub fn try_new(buffer: BooleanBuffer, validity: Validity) -> VortexResult<Self> {
Self::try_from_parts(
DType::Bool(validity.nullability()),
BoolMetadata {
validity: validity.to_metadata(buffer.len())?,
length: buffer.len(),
},
vec![Buffer::Owned(buffer.into_inner())].into(),
validity.into_array_data().into_iter().collect_vec().into(),
HashMap::default(),
)
Ok(Self {
typed: TypedArray::try_from_parts(
DType::Bool(validity.nullability()),
BoolMetadata {
validity: validity.to_metadata(buffer.len())?,
length: buffer.len(),
},
Some(Buffer::Owned(buffer.into_inner())),
validity.into_array_data().into_iter().collect_vec().into(),
HashMap::default(),
)?,
})
}

pub fn from_vec(bools: Vec<bool>, validity: Validity) -> Self {
Expand Down
8 changes: 1 addition & 7 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,7 @@ impl ChunkedArray<'_> {
let mut children = vec![chunk_ends.into_array_data()];
children.extend(chunks.iter().map(|a| a.to_array_data()));

Self::try_from_parts(
dtype,
ChunkedMetadata,
vec![].into(),
children.into(),
HashMap::default(),
)
Self::try_from_parts(dtype, ChunkedMetadata, children.into(), HashMap::default())
}

#[inline]
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/array/composite/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ impl<'a> CompositeArray<'a> {
underlying_dtype: underlying.dtype().clone(),
underlying_metadata: metadata,
},
vec![].into(),
vec![underlying.into_array_data()].into(),
HashMap::default(),
)
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/array/constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ impl ConstantArray<'_> {
scalar.dtype().clone(),
ConstantMetadata { scalar, length },
vec![].into(),
vec![].into(),
stats,
)
.unwrap()
Expand Down
22 changes: 12 additions & 10 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl PrimitiveArray<'_> {
}

pub fn buffer(&self) -> &Buffer {
self.array().buffer(0).expect("missing buffer")
self.array().buffer().expect("missing buffer")
}

pub fn scalar_buffer<T: NativePType>(&self) -> ScalarBuffer<T> {
Expand All @@ -51,15 +51,17 @@ impl PrimitiveArray<'_> {
buffer: ScalarBuffer<T>,
validity: Validity,
) -> VortexResult<Self> {
Self::try_from_parts(
DType::from(T::PTYPE).with_nullability(validity.nullability()),
PrimitiveMetadata {
validity: validity.to_metadata(buffer.len())?,
},
vec![Buffer::Owned(buffer.into_inner())].into(),
validity.into_array_data().into_iter().collect_vec().into(),
HashMap::default(),
)
Ok(Self {
typed: TypedArray::try_from_parts(
DType::from(T::PTYPE).with_nullability(validity.nullability()),
PrimitiveMetadata {
validity: validity.to_metadata(buffer.len())?,
},
Some(Buffer::Owned(buffer.into_inner())),
validity.into_array_data().into_iter().collect_vec().into(),
HashMap::default(),
)?,
})
}

pub fn from_vec<T: NativePType + ArrowNativeType>(values: Vec<T>, validity: Validity) -> Self {
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl<'a> SparseArray<'a> {
len,
fill_value,
},
vec![].into(),
vec![indices.to_array_data(), values.to_array_data()].into(),
HashMap::default(),
)
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/array/struct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl StructArray<'_> {
Self::try_from_parts(
DType::Struct(names, field_dtypes),
StructMetadata { length },
vec![].into(),
fields.into(),
HashMap::default(),
)
Expand Down
8 changes: 1 addition & 7 deletions vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,7 @@ impl VarBinArray<'_> {
children.push(a)
}

Self::try_from_parts(
dtype,
metadata,
vec![].into(),
children.into(),
HashMap::default(),
)
Self::try_from_parts(dtype, metadata, children.into(), HashMap::default())
}

#[inline]
Expand Down
8 changes: 1 addition & 7 deletions vortex-array/src/array/varbinview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,7 @@ impl VarBinViewArray<'_> {
children.push(a)
}

Self::try_from_parts(
dtype,
metadata,
vec![].into(),
children.into(),
HashMap::default(),
)
Self::try_from_parts(dtype, metadata, children.into(), HashMap::default())
}

fn view_slice(&self) -> &[BinaryView] {
Expand Down
14 changes: 7 additions & 7 deletions vortex-array/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct ArrayData {
encoding: EncodingRef,
dtype: DType,
metadata: Arc<dyn ArrayMetadata>,
buffers: Arc<[OwnedBuffer]>, // Should this just be an Option, not an Arc? How many multi-buffer arrays are there?
buffer: Option<OwnedBuffer>,
children: Arc<[ArrayData]>,
stats_map: Arc<RwLock<HashMap<Stat, Scalar>>>,
}
Expand All @@ -26,15 +26,15 @@ impl ArrayData {
encoding: EncodingRef,
dtype: DType,
metadata: Arc<dyn ArrayMetadata>,
buffers: Arc<[OwnedBuffer]>,
buffer: Option<OwnedBuffer>,
children: Arc<[ArrayData]>,
statistics: HashMap<Stat, Scalar>,
) -> VortexResult<Self> {
let data = Self {
encoding,
dtype,
metadata,
buffers,
buffer,
children,
stats_map: Arc::new(RwLock::new(statistics)),
};
Expand All @@ -59,8 +59,8 @@ impl ArrayData {
&self.metadata
}

pub fn buffers(&self) -> &[Buffer] {
&self.buffers
pub fn buffer(&self) -> Option<&Buffer> {
self.buffer.as_ref()
}

pub fn child(&self, index: usize, dtype: &DType) -> Option<&ArrayData> {
Expand Down Expand Up @@ -88,11 +88,11 @@ impl ArrayData {
/// Return the buffer offsets and the total length of all buffers, assuming the given alignment.
/// This includes all child buffers.
pub fn all_buffer_offsets(&self, alignment: usize) -> Vec<u64> {
let mut offsets = Vec::with_capacity(self.buffers.len() + 1);
let mut offsets = vec![];
let mut offset = 0;

for col_data in self.depth_first_traversal() {
for buffer in col_data.buffers() {
if let Some(buffer) = col_data.buffer() {
offsets.push(offset as u64);

let buffer_size = buffer.len();
Expand Down
20 changes: 11 additions & 9 deletions vortex-array/src/implementation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use vortex_error::{VortexError, VortexResult};
use vortex_error::{vortex_bail, VortexError, VortexResult};
use vortex_schema::DType;

use crate::buffer::{Buffer, OwnedBuffer};
Expand Down Expand Up @@ -50,7 +50,6 @@ macro_rules! impl_encoding {
EncodingRef,
VORTEX_ENCODINGS,
};
use $crate::buffer::OwnedBuffer;
use $crate::stats::Stat;
use $crate::scalar::Scalar;
use std::any::Any;
Expand Down Expand Up @@ -88,10 +87,10 @@ macro_rules! impl_encoding {
pub fn try_from_parts(
dtype: DType,
metadata: [<$Name Metadata>],
buffers: Arc<[OwnedBuffer]>,
children: Arc<[ArrayData]>,
stats: HashMap<Stat, Scalar>) -> VortexResult<Self> {
Ok(Self { typed: TypedArray::try_from_parts(dtype, metadata, buffers, children, stats)? })
stats: HashMap<Stat, Scalar>,
) -> VortexResult<Self> {
Ok(Self { typed: TypedArray::try_from_parts(dtype, metadata, None, children, stats)? })
}
}
impl<'a> GetArrayMetadata for [<$Name Array>]<'a> {
Expand Down Expand Up @@ -229,7 +228,7 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat
Array::DataRef(d) => d.clone(),
Array::View(_) => {
struct Visitor {
buffers: Vec<OwnedBuffer>,
buffer: Option<OwnedBuffer>,
children: Vec<ArrayData>,
}
impl ArrayVisitor for Visitor {
Expand All @@ -239,20 +238,23 @@ impl<'a, T: IntoArray<'a> + ArrayEncodingRef + ArrayStatistics + GetArrayMetadat
}

fn visit_buffer(&mut self, buffer: &Buffer) -> VortexResult<()> {
self.buffers.push(buffer.to_static());
if self.buffer.is_some() {
vortex_bail!("Multiple buffers found in view")
}
self.buffer = Some(buffer.to_static());
Ok(())
}
}
let mut visitor = Visitor {
buffers: vec![],
buffer: None,
children: vec![],
};
array.with_dyn(|a| a.accept(&mut visitor).unwrap());
ArrayData::try_new(
encoding,
array.dtype().clone(),
metadata,
visitor.buffers.into(),
visitor.buffer,
visitor.children.into(),
stats,
)
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ impl Array<'_> {
}
}

pub fn buffer(&self, idx: usize) -> Option<&Buffer> {
pub fn buffer(&self) -> Option<&Buffer> {
match self {
Array::Data(d) => d.buffers().get(idx),
Array::DataRef(d) => d.buffers().get(idx),
Array::View(v) => v.buffers().get(idx),
Array::Data(d) => d.buffer(),
Array::DataRef(d) => d.buffer(),
Array::View(v) => v.buffer(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ impl<D: ArrayDef> TypedArray<'_, D> {
pub fn try_from_parts(
dtype: DType,
metadata: D::Metadata,
buffers: Arc<[OwnedBuffer]>,
buffer: Option<OwnedBuffer>,
children: Arc<[ArrayData]>,
stats: HashMap<Stat, Scalar>,
) -> VortexResult<Self> {
let array = Array::Data(ArrayData::try_new(
D::ENCODING,
dtype,
Arc::new(metadata.clone()),
buffers,
buffer,
children,
stats,
)?);
Expand Down
13 changes: 6 additions & 7 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,22 @@ impl<'v> ArrayView<'v> {
}
}

/// The number of buffers used by the current Array.
pub fn nbuffers(&self) -> usize {
self.array.nbuffers() as usize
/// Whether the current Array makes use of a buffer
pub fn has_buffer(&self) -> bool {
self.array.has_buffer()
}

/// The number of buffers used by the current Array and all its children.
fn cumulative_nbuffers(array: fb::Array) -> usize {
let mut nbuffers = array.nbuffers() as usize;
let mut nbuffers = if array.has_buffer() { 1 } else { 0 };
for child in array.children().unwrap_or_default() {
nbuffers += Self::cumulative_nbuffers(child)
}
nbuffers
}

pub fn buffers(&self) -> &'v [Buffer] {
// This is only true for the immediate current node?
self.buffers[0..self.nbuffers()].as_ref()
pub fn buffer(&self) -> Option<&'v Buffer<'v>> {
self.has_buffer().then(|| &self.buffers[0])
}

pub fn statistics(&self) -> &dyn Statistics {
Expand Down
1 change: 0 additions & 1 deletion vortex-dict/src/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ impl DictArray<'_> {
DictMetadata {
codes_dtype: codes.dtype().clone(),
},
vec![].into(),
vec![values.to_array_data(), codes.to_array_data()].into(),
HashMap::new(),
)
Expand Down
14 changes: 6 additions & 8 deletions vortex-ipc/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> {
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let col_data = self.1;
let array = Some(IPCArray(self.0, col_data).write_flatbuffer(fbb));
let array_data = self.1;
let array = Some(IPCArray(self.0, array_data).write_flatbuffer(fbb));

// Walk the ColumnData depth-first to compute the buffer offsets.
let mut buffers = Vec::with_capacity(col_data.buffers().len());
let mut buffers = vec![];
let mut offset = 0;
for col_data in col_data.depth_first_traversal() {
for buffer in col_data.buffers() {
for array_data in array_data.depth_first_traversal() {
if let Some(buffer) = array_data.buffer() {
buffers.push(fb::Buffer::new(
offset as u64,
buffer.len() as u64,
Expand Down Expand Up @@ -180,16 +180,14 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
.collect_vec();
let children = Some(fbb.create_vector(&children));

let nbuffers = column_data.buffers().len() as u16; // TODO(ngates): checked cast

fba::Array::create(
fbb,
&fba::ArrayArgs {
version: Default::default(),
has_buffer: column_data.buffer().is_some(),
encoding,
metadata,
children,
nbuffers,
},
)
}
Expand Down
2 changes: 1 addition & 1 deletion vortex-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<W: Write> StreamWriter<W> {
let mut current_offset = 0;
for (buffer, &buffer_end) in data
.depth_first_traversal()
.flat_map(|data| data.buffers().iter())
.flat_map(|data| data.buffer().into_iter())
.zip_eq(buffer_offsets.iter().skip(1))
{
self.write.write_all(buffer.as_slice())?;
Expand Down
Loading

0 comments on commit 6ff8878

Please sign in to comment.