Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use vortex-buffer over bytes::Bytes #1713

Merged
merged 43 commits into from
Dec 18, 2024
Merged
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ff69266
Message cleanup
gatesn Dec 14, 2024
d1cd362
Move expr
gatesn Dec 14, 2024
54ddfd8
Clean up
gatesn Dec 14, 2024
b368ec0
Clean up
gatesn Dec 14, 2024
34edbf5
Clean up
gatesn Dec 14, 2024
4d02906
message reader
gatesn Dec 14, 2024
cc65637
Message codec
gatesn Dec 16, 2024
21ce1b9
Message codec
gatesn Dec 16, 2024
831d41e
Message codec
gatesn Dec 16, 2024
0010f00
Message codec
gatesn Dec 16, 2024
8c55100
Message codec
gatesn Dec 16, 2024
29a78a1
Message codec
gatesn Dec 16, 2024
bbda9e6
Message Codecs
gatesn Dec 16, 2024
5a7c8ab
Message Codecs
gatesn Dec 16, 2024
75fd1e0
Message Codecs
gatesn Dec 16, 2024
0c333b9
merge
gatesn Dec 16, 2024
92a7d6e
merge
gatesn Dec 16, 2024
b474b4b
Merge
gatesn Dec 16, 2024
7e1c80f
Merge
gatesn Dec 16, 2024
92062d8
Fix docs
gatesn Dec 16, 2024
87ba232
Assert alignment
gatesn Dec 16, 2024
f4d6fdf
Merge branch 'develop' into ngates/message-reader
gatesn Dec 17, 2024
1b285e6
Address comments
gatesn Dec 17, 2024
e532fa6
Address comments
gatesn Dec 17, 2024
ca4e632
Address comments
gatesn Dec 17, 2024
d7c38be
Address comments
gatesn Dec 17, 2024
fe46f61
Address comments
gatesn Dec 17, 2024
80c6fcc
Address comments
gatesn Dec 17, 2024
20f8b18
Address comments
gatesn Dec 17, 2024
78d9f0d
Fix decoder
gatesn Dec 18, 2024
94ae305
Fix decoder
gatesn Dec 18, 2024
5b21ae7
Remove unpin bounds
gatesn Dec 18, 2024
c5277d0
Remove unpin bounds
gatesn Dec 18, 2024
24648c9
Remove bytes
gatesn Dec 18, 2024
6313dca
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
2040b0d
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
641d1c7
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
5633dd3
Merge branch 'develop' into ngates/message-reader
gatesn Dec 18, 2024
e73ef79
Merge branch 'ngates/message-reader' into ngates/read-at-alignment
gatesn Dec 18, 2024
bc128b0
Some fixes
gatesn Dec 18, 2024
124441a
Some fixes
gatesn Dec 18, 2024
e77ff30
Buffer alignment
gatesn Dec 18, 2024
ddbd714
Buffer alignment
gatesn Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
message reader
gatesn committed Dec 14, 2024
commit 4d02906a9b36ab72e28728d1e9612f76e7401ab6
46 changes: 45 additions & 1 deletion vortex-flatbuffers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -114,7 +114,11 @@ pub mod footer;
/// ```
pub mod message;

use flatbuffers::{root, FlatBufferBuilder, Follow, InvalidFlatbuffer, Verifiable, WIPOffset};
use std::marker::PhantomData;

use flatbuffers::{
root, FlatBufferBuilder, Follow, InvalidFlatbuffer, Table, Verifiable, WIPOffset,
};
use vortex_buffer::Buffer;

pub trait FlatBufferRoot {}
@@ -162,3 +166,43 @@ impl<F: WriteFlatBuffer + FlatBufferRoot> FlatBufferToBytes for F {
Buffer::from(vec).slice(start..end)
}
}

pub struct Flat<T> {
buffer: Buffer,
loc: usize,
_marker: PhantomData<T>,
}

impl<'buf, T> Flat<T>
where
T: for<'a> Follow<'a>,
{
pub fn new(buffer: Buffer) -> Result<Self, InvalidFlatbuffer>
where
T: Verifiable,
{
drop(root::<T>(buffer.as_slice())?);
Ok(Self {
buffer,
loc: 0,
_marker: PhantomData,
})
}

pub fn as_(&self) -> <T as Follow<'_>>::Inner {
// SAFETY: The validity of the buffer is checked during construction.
unsafe { <T as Follow<'_>>::follow(self.buffer.as_ref(), self.loc) }
}

pub fn map<R>(self, f: impl FnOnce(<T as Follow<'_>>::Inner) -> Table<'buf>) -> Flat<R>
where
R: Follow<'buf> + 'buf,
{
let result = f(self.as_());
Flat {
buffer: self.buffer,
loc: result.loc(),
_marker: Default::default(),
}
}
}
1 change: 1 addition & 0 deletions vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
//! This crate provides both in-memory message representations for holding IPC messages
//! before/after serialization, as well as streaming readers and writers that sit on top
//! of any type implementing `VortexRead` or `VortexWrite` respectively.
#![allow(dead_code)]

pub mod messages;
pub mod stream_reader;
189 changes: 102 additions & 87 deletions vortex-ipc/src/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{flatbuffers as fba, ArrayData};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::VortexExpect;
use vortex_flatbuffers::{message as fb, FlatBufferRoot, WriteFlatBuffer};
use vortex_flatbuffers::{message as fb, Flat, FlatBufferRoot, WriteFlatBuffer};

use crate::ALIGNMENT;

pub mod reader;
mod reader2;
pub mod writer;

pub enum IPCMessage {
Array(ArrayData),
Array(ArrayComponents),
Buffer(Buffer),
DType(DType),
}

/// The serializable components of an array.
/// This object can be read from a stream of IPCMessage without any additional context.
pub struct ArrayComponents {
array_data: Flat<fba::ArrayData<'static>>,
buffers: Vec<Buffer>,
}

impl From<ArrayData> for ArrayComponents {
fn from(_array: ArrayData) -> Self {
todo!()
}
}

impl FlatBufferRoot for IPCMessage {}

impl WriteFlatBuffer for IPCMessage {
@@ -28,7 +40,7 @@ impl WriteFlatBuffer for IPCMessage {
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let header = match self {
Self::Array(array) => ArrayDataWriter { array }
Self::Array(components) => ArrayDataWriter { components }
.write_flatbuffer(fbb)
.as_union_value(),
Self::Buffer(buffer) => {
@@ -59,7 +71,7 @@ impl WriteFlatBuffer for IPCMessage {
}

struct ArrayDataWriter<'a> {
array: &'a ArrayData,
components: &'a ArrayComponents,
}

impl WriteFlatBuffer for ArrayDataWriter<'_> {
@@ -69,44 +81,46 @@ impl WriteFlatBuffer for ArrayDataWriter<'_> {
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let array = Some(
let _array = Some(
ArrayWriter {
array: self.array,
components: self.components,
buffer_idx: 0,
}
.write_flatbuffer(fbb),
);

// Walk the ColumnData depth-first to compute the buffer lengths.
let mut buffers = vec![];
for array_data in self.array.depth_first_traversal() {
if let Some(buffer) = array_data.buffer() {
let aligned_size = buffer.len().next_multiple_of(ALIGNMENT);
let padding = aligned_size - buffer.len();
buffers.push(fba::Buffer::create(
fbb,
&fba::BufferArgs {
length: buffer.len() as u64,
padding: padding.try_into().vortex_expect("padding must fit in u16"),
},
));
}
}
let buffers = Some(fbb.create_vector(&buffers));

fba::ArrayData::create(
fbb,
&fba::ArrayDataArgs {
array,
row_count: self.array.len() as u64,
buffers,
},
)
todo!()
//
// // Walk the ColumnData depth-first to compute the buffer lengths.
// let mut buffers = vec![];
// for array_data in self.array.depth_first_traversal() {
// if let Some(buffer) = array_data.buffer() {
// let aligned_size = buffer.len().next_multiple_of(ALIGNMENT);
// let padding = aligned_size - buffer.len();
// buffers.push(fba::Buffer::create(
// fbb,
// &fba::BufferArgs {
// length: buffer.len() as u64,
// padding: padding.try_into().vortex_expect("padding must fit in u16"),
// },
// ));
// }
// }
// let buffers = Some(fbb.create_vector(&buffers));
//
// fba::ArrayData::create(
// fbb,
// &fba::ArrayDataArgs {
// array,
// row_count: self.array.len() as u64,
// buffers,
// },
// )
}
}

struct ArrayWriter<'a> {
array: &'a ArrayData,
components: &'a ArrayComponents,
buffer_idx: u16,
}

@@ -115,59 +129,60 @@ impl WriteFlatBuffer for ArrayWriter<'_> {

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
_fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let encoding = self.array.encoding().id().code();
let metadata = self
.array
.metadata_bytes()
.vortex_expect("IPCArray is missing metadata during serialization");
let metadata = Some(fbb.create_vector(metadata.as_ref()));

// Assign buffer indices for all child arrays.
// The second tuple element holds the buffer_index for this Array subtree. If this array
// has a buffer, that is its buffer index. If it does not, that buffer index belongs
// to one of the children.
let child_buffer_idx = self.buffer_idx + if self.array.buffer().is_some() { 1 } else { 0 };

let children = self
.array
.children()
.iter()
.scan(child_buffer_idx, |buffer_idx, child| {
// Update the number of buffers required.
let msg = ArrayWriter {
array: child,
buffer_idx: *buffer_idx,
}
.write_flatbuffer(fbb);
*buffer_idx = u16::try_from(child.cumulative_nbuffers())
.ok()
.and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
.vortex_expect("Too many buffers (u16) for ArrayData");
Some(msg)
})
.collect_vec();
let children = Some(fbb.create_vector(&children));

let buffers = self
.array
.buffer()
.is_some()
.then_some(self.buffer_idx)
.map(|buffer_idx| fbb.create_vector_from_iter(std::iter::once(buffer_idx)));

let stats = Some(self.array.statistics().write_flatbuffer(fbb));

fba::Array::create(
fbb,
&fba::ArrayArgs {
encoding,
metadata,
children,
buffers,
stats,
},
)
todo!()
// let encoding = self.array.encoding().id().code();
// let metadata = self
// .array
// .metadata_bytes()
// .vortex_expect("IPCArray is missing metadata during serialization");
// let metadata = Some(fbb.create_vector(metadata.as_ref()));
//
// // Assign buffer indices for all child arrays.
// // The second tuple element holds the buffer_index for this Array subtree. If this array
// // has a buffer, that is its buffer index. If it does not, that buffer index belongs
// // to one of the children.
// let child_buffer_idx = self.buffer_idx + if self.array.buffer().is_some() { 1 } else { 0 };
//
// let children = self
// .array
// .children()
// .iter()
// .scan(child_buffer_idx, |buffer_idx, child| {
// // Update the number of buffers required.
// let msg = ArrayWriter {
// array: child,
// buffer_idx: *buffer_idx,
// }
// .write_flatbuffer(fbb);
// *buffer_idx = u16::try_from(child.cumulative_nbuffers())
// .ok()
// .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
// .vortex_expect("Too many buffers (u16) for ArrayData");
// Some(msg)
// })
// .collect_vec();
// let children = Some(fbb.create_vector(&children));
//
// let buffers = self
// .array
// .buffer()
// .is_some()
// .then_some(self.buffer_idx)
// .map(|buffer_idx| fbb.create_vector_from_iter(std::iter::once(buffer_idx)));
//
// let stats = Some(self.array.statistics().write_flatbuffer(fbb));
//
// fba::Array::create(
// fbb,
// &fba::ArrayArgs {
// encoding,
// metadata,
// children,
// buffers,
// stats,
// },
// )
}
}
152 changes: 152 additions & 0 deletions vortex-ipc/src/messages/reader2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use bytes::{Buf, Bytes};
use flatbuffers::{root, root_unchecked};
use itertools::Itertools;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_flatbuffers::message as fb;
use vortex_flatbuffers::message::{MessageHeader, MessageVersion};

use crate::messages::IPCMessage;

enum State {
ReadingLength,
ReadingHeader(usize),
ReadingArray(ReadingArray),
ReadingBuffer(ReadingBuffer),
}

struct ReadingArray {
header: Bytes,
buffers_length: usize,
}

struct ReadingBuffer {
length: usize,
length_with_padding: usize,
}

pub enum NextMessage {
Some(IPCMessage),
NeedMore(usize),
}

pub struct MessageReader {
state: State,
}

impl MessageReader {
/// Attempt to read the next message from the bytes object.
/// If the message is incomplete, the function will return `NeedMore` with the _total_ number
/// of bytes needed. The next call to read_next _should_ provide at least this number of bytes.
pub fn read_next(&mut self, mut bytes: Bytes) -> VortexResult<NextMessage> {
match &self.state {
State::ReadingLength => {
if bytes.len() < 4 {
return Ok(NextMessage::NeedMore(4));
}
let msg_length = bytes.get_u32_le();
self.state = State::ReadingHeader(msg_length as usize);
Ok(NextMessage::NeedMore(msg_length as usize))
}
State::ReadingHeader(msg_length) => {
if bytes.len() < *msg_length {
return Ok(NextMessage::NeedMore(*msg_length));
}
let msg_bytes = bytes.split_to(*msg_length);

let msg = root::<fb::Message>(msg_bytes.as_ref())?;
if msg.version() != MessageVersion::V0 {
vortex_bail!("Unsupported message version {:?}", msg.version());
}

match msg.header_type() {
MessageHeader::ArrayData => {
let array_data = msg
.header_as_array_data()
.vortex_expect("array data header");
let buffers_length: u64 = array_data
.buffers()
.unwrap_or_default()
.iter()
.map(|buffer| buffer.length() + (buffer.padding() as u64))
.sum();

let buffers_length = usize::try_from(buffers_length)
.map_err(|_| vortex_err!("buffers length is too large for usize"))?;

self.state = State::ReadingArray(ReadingArray {
header: msg_bytes,
buffers_length,
});
Ok(NextMessage::NeedMore(buffers_length))
}
MessageHeader::Buffer => {
let buffer = msg.header_as_buffer().vortex_expect("buffer header");
let length = buffer.length() as usize;
let length_with_padding = length + buffer.padding() as usize;

self.state = State::ReadingBuffer(ReadingBuffer {
length,
length_with_padding,
});
Ok(NextMessage::NeedMore(length_with_padding))
}
MessageHeader::DType => {
let dtype = msg.header_as_dtype().vortex_expect("dtype header");

self.state = State::ReadingLength;
Ok(NextMessage::Some(IPCMessage::DType(DType::try_from(
dtype,
)?)))
}
_ => {
vortex_bail!("Unsupported message header type {:?}", msg.header_type());
}
}
}
State::ReadingBuffer(ReadingBuffer {
length,
length_with_padding,
}) => {
if bytes.len() < *length_with_padding {
return Ok(NextMessage::NeedMore(*length_with_padding));
}

let buffer = bytes.split_to(*length);
let msg = IPCMessage::Buffer(Buffer::from(buffer));
let _padding = bytes.split_to(length_with_padding - length);
self.state = State::ReadingLength;
Ok(NextMessage::Some(msg))
}
State::ReadingArray(ReadingArray {
header,
buffers_length,
}) => {
if bytes.len() < *buffers_length {
return Ok(NextMessage::NeedMore(*buffers_length));
}

// SAFETY: we've already validated the header
let msg = unsafe { root_unchecked::<fb::Message>(header.as_ref()) };
let array_data_msg = msg
.header_as_array_data()
.vortex_expect("array data header");

let mut all_buffers = bytes.split_to(*buffers_length);
let _buffers = array_data_msg
.buffers()
.unwrap_or_default()
.iter()
.map(|buffer_msg| {
let buffer = all_buffers.split_to(buffer_msg.length() as usize);
let _padding = all_buffers.split_to(buffer_msg.padding() as usize);
buffer
})
.collect_vec();

todo!()
}
}
}
}
3 changes: 2 additions & 1 deletion vortex-ipc/src/messages/writer.rs
Original file line number Diff line number Diff line change
@@ -63,7 +63,8 @@ impl<W: VortexWrite> MessageWriter<W> {
}

pub async fn write_array(&mut self, array: ArrayData) -> io::Result<()> {
self.write_message(IPCMessage::Array(array.clone())).await?;
self.write_message(IPCMessage::Array(array.clone().into()))
.await?;

for array in array.depth_first_traversal() {
if let Some(buffer) = array.buffer() {