diff --git a/vortex-ipc/flatbuffers/message.fbs b/vortex-ipc/flatbuffers/message.fbs index 8836537d23..cfd6947843 100644 --- a/vortex-ipc/flatbuffers/message.fbs +++ b/vortex-ipc/flatbuffers/message.fbs @@ -4,7 +4,7 @@ include "vortex-dtype/flatbuffers/dtype.fbs"; namespace vortex.ipc; enum Version: uint8 { - V0 = 0, + V0 = 0, } table Schema { @@ -17,11 +17,11 @@ enum Compression: uint8 { struct Buffer { offset: uint64; - length: uint64; + padding: uint16; compression: Compression; } -table Chunk { +table Batch { array: vortex.array.Array; length: uint64; buffers: [Buffer]; @@ -34,14 +34,14 @@ table Page { } union MessageHeader { - Schema, - Chunk, - Page, + Schema, + Batch, + Page, } table Message { - version: Version = V0; - header: MessageHeader; + version: Version = V0; + header: MessageHeader; } root_type Message; diff --git a/vortex-ipc/src/message_reader.rs b/vortex-ipc/src/message_reader.rs index df07f1642e..64dcd9ae19 100644 --- a/vortex-ipc/src/message_reader.rs +++ b/vortex-ipc/src/message_reader.rs @@ -98,39 +98,45 @@ impl MessageReader { /// Fetch the buffers associated with this message. async fn read_buffers(&mut self) -> VortexResult> { - let Some(chunk_msg) = self.peek().and_then(|m| m.header_as_chunk()) else { + let Some(chunk_msg) = self.peek().and_then(|m| m.header_as_batch()) else { // We could return an error here? return Ok(Vec::new()); }; // Issue a single read to grab all buffers - let mut all_buffers = BytesMut::with_capacity(chunk_msg.buffer_size() as usize); - unsafe { all_buffers.set_len(chunk_msg.buffer_size() as usize) }; + let all_buffers_size = chunk_msg.buffer_size(); + let mut all_buffers = BytesMut::with_capacity(all_buffers_size as usize); + unsafe { all_buffers.set_len(all_buffers_size as usize) }; let mut all_buffers = self.read.read_into(all_buffers).await?; // Split out into individual buffers // Initialize the column's buffers for a vectored read. // To start with, we include the padding and then truncate the buffers after. - // TODO(ngates): improve the flatbuffer format instead of storing offset/len per buffer. - let buffers = self + let ipc_buffers = self .peek() .expect("Checked above in peek") - .header_as_chunk() + .header_as_batch() .expect("Checked above in peek") .buffers() - .unwrap_or_default() + .unwrap_or_default(); + let buffers = ipc_buffers .iter() - .scan(0, |offset, buffer| { - let len = buffer.length() as usize; - let padding_len = buffer.offset() as usize - *offset; + .zip( + ipc_buffers + .iter() + .map(|b| b.offset()) + .skip(1) + .chain([all_buffers_size]), + ) + .map(|(buffer, next_offset)| { + let len = next_offset - buffer.offset() - buffer.padding() as u64; - // Strip off any padding from the previous buffer - all_buffers.advance(padding_len); // Grab the buffer - let buffer = all_buffers.split_to(len); + let data_buffer = all_buffers.split_to(len as usize); + // Strip off any padding from the previous buffer + all_buffers.advance(buffer.padding() as usize); - *offset += padding_len + len; - Some(Buffer::from(buffer.freeze())) + Buffer::from(data_buffer.freeze()) }) .collect_vec(); @@ -159,7 +165,7 @@ impl MessageReader { ctx: Arc, dtype: DType, ) -> VortexResult> { - let length = match self.peek().and_then(|m| m.header_as_chunk()) { + let length = match self.peek().and_then(|m| m.header_as_batch()) { None => return Ok(None), Some(chunk) => chunk.length() as usize, }; @@ -174,7 +180,7 @@ impl MessageReader { flatbuffer, |flatbuffer| { unsafe { root_unchecked::(flatbuffer) } - .header_as_chunk() + .header_as_batch() .unwrap() .array() .ok_or_else(|| vortex_err!("Chunk missing Array")) diff --git a/vortex-ipc/src/message_writer.rs b/vortex-ipc/src/message_writer.rs index c057e8a9b6..f3a17cdd53 100644 --- a/vortex-ipc/src/message_writer.rs +++ b/vortex-ipc/src/message_writer.rs @@ -9,7 +9,7 @@ use vortex_dtype::DType; use vortex_flatbuffers::WriteFlatBuffer; use crate::io::VortexWrite; -use crate::messages::{IPCChunk, IPCMessage, IPCPage, IPCSchema}; +use crate::messages::{IPCBatch, IPCMessage, IPCPage, IPCSchema}; use crate::ALIGNMENT; const ZEROS: [u8; 512] = [0u8; 512]; @@ -52,7 +52,7 @@ impl MessageWriter { let buffer_offsets = chunk.all_buffer_offsets(self.alignment); // Serialize the Chunk message. - self.write_message(IPCMessage::Chunk(IPCChunk(&chunk))) + self.write_message(IPCMessage::Batch(IPCBatch(&chunk))) .await?; // Keep track of the offset to add padding after each buffer. diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index 5462b823da..e56bdba912 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -12,12 +12,12 @@ use crate::ALIGNMENT; pub enum IPCMessage<'a> { Schema(IPCSchema<'a>), - Chunk(IPCChunk<'a>), + Batch(IPCBatch<'a>), Page(IPCPage<'a>), } pub struct IPCSchema<'a>(pub &'a DType); -pub struct IPCChunk<'a>(pub &'a Array); +pub struct IPCBatch<'a>(pub &'a Array); pub struct IPCArray<'a>(pub &'a Array); pub struct IPCPage<'a>(pub &'a Buffer); @@ -32,7 +32,7 @@ impl WriteFlatBuffer for IPCMessage<'_> { ) -> WIPOffset> { let header = match self { Self::Schema(f) => f.write_flatbuffer(fbb).as_union_value(), - Self::Chunk(f) => f.write_flatbuffer(fbb).as_union_value(), + Self::Batch(f) => f.write_flatbuffer(fbb).as_union_value(), Self::Page(f) => f.write_flatbuffer(fbb).as_union_value(), }; @@ -40,7 +40,7 @@ impl WriteFlatBuffer for IPCMessage<'_> { msg.add_version(Default::default()); msg.add_header_type(match self { Self::Schema(_) => fb::MessageHeader::Schema, - Self::Chunk(_) => fb::MessageHeader::Chunk, + Self::Batch(_) => fb::MessageHeader::Batch, Self::Page(_) => fb::MessageHeader::Page, }); msg.add_header(header); @@ -60,8 +60,8 @@ impl<'a> WriteFlatBuffer for IPCSchema<'a> { } } -impl<'a> WriteFlatBuffer for IPCChunk<'a> { - type Target<'t> = fb::Chunk<'t>; +impl<'a> WriteFlatBuffer for IPCBatch<'a> { + type Target<'t> = fb::Batch<'t>; fn write_flatbuffer<'fb>( &self, @@ -77,20 +77,20 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> { let mut offset = 0; for array_data in array_data.depth_first_traversal() { if let Some(buffer) = array_data.buffer() { + let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); buffers.push(fb::Buffer::new( offset as u64, - buffer.len() as u64, + (aligned_size - buffer.len()) as u16, Compression::None, )); - let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); offset += aligned_size; } } let buffers = Some(fbb.create_vector(&buffers)); - fb::Chunk::create( + fb::Batch::create( fbb, - &fb::ChunkArgs { + &fb::BatchArgs { array, length, buffers,