Skip to content

Commit

Permalink
Array buffers only store offset and padding (#489)
Browse files Browse the repository at this point in the history
Buffers are now 16 instead of 24 bytes
  • Loading branch information
robert3005 authored Jul 22, 2024
1 parent 964a2b5 commit c360035
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 37 deletions.
16 changes: 8 additions & 8 deletions vortex-ipc/flatbuffers/message.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ include "vortex-dtype/flatbuffers/dtype.fbs";
namespace vortex.ipc;

enum Version: uint8 {
V0 = 0,
V0 = 0,
}

table Schema {
Expand All @@ -17,11 +17,11 @@ enum Compression: uint8 {

struct Buffer {
offset: uint64;
length: uint64;
padding: uint16;
compression: Compression;
}

table Chunk {
table Batch {
array: vortex.array.Array;
length: uint64;
buffers: [Buffer];
Expand All @@ -34,14 +34,14 @@ table Page {
}

union MessageHeader {
Schema,
Chunk,
Page,
Schema,
Batch,
Page,
}

table Message {
version: Version = V0;
header: MessageHeader;
version: Version = V0;
header: MessageHeader;
}

root_type Message;
40 changes: 23 additions & 17 deletions vortex-ipc/src/message_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,39 +98,45 @@ impl<R: VortexRead> MessageReader<R> {

/// Fetch the buffers associated with this message.
async fn read_buffers(&mut self) -> VortexResult<Vec<Buffer>> {
let Some(chunk_msg) = self.peek().and_then(|m| m.header_as_chunk()) else {
let Some(chunk_msg) = self.peek().and_then(|m| m.header_as_batch()) else {
// We could return an error here?
return Ok(Vec::new());
};

// Issue a single read to grab all buffers
let mut all_buffers = BytesMut::with_capacity(chunk_msg.buffer_size() as usize);
unsafe { all_buffers.set_len(chunk_msg.buffer_size() as usize) };
let all_buffers_size = chunk_msg.buffer_size();
let mut all_buffers = BytesMut::with_capacity(all_buffers_size as usize);
unsafe { all_buffers.set_len(all_buffers_size as usize) };
let mut all_buffers = self.read.read_into(all_buffers).await?;

// Split out into individual buffers
// Initialize the column's buffers for a vectored read.
// To start with, we include the padding and then truncate the buffers after.
// TODO(ngates): improve the flatbuffer format instead of storing offset/len per buffer.
let buffers = self
let ipc_buffers = self
.peek()
.expect("Checked above in peek")
.header_as_chunk()
.header_as_batch()
.expect("Checked above in peek")
.buffers()
.unwrap_or_default()
.unwrap_or_default();
let buffers = ipc_buffers
.iter()
.scan(0, |offset, buffer| {
let len = buffer.length() as usize;
let padding_len = buffer.offset() as usize - *offset;
.zip(
ipc_buffers
.iter()
.map(|b| b.offset())
.skip(1)
.chain([all_buffers_size]),
)
.map(|(buffer, next_offset)| {
let len = next_offset - buffer.offset() - buffer.padding() as u64;

// Strip off any padding from the previous buffer
all_buffers.advance(padding_len);
// Grab the buffer
let buffer = all_buffers.split_to(len);
let data_buffer = all_buffers.split_to(len as usize);
// Strip off any padding from the previous buffer
all_buffers.advance(buffer.padding() as usize);

*offset += padding_len + len;
Some(Buffer::from(buffer.freeze()))
Buffer::from(data_buffer.freeze())
})
.collect_vec();

Expand Down Expand Up @@ -159,7 +165,7 @@ impl<R: VortexRead> MessageReader<R> {
ctx: Arc<Context>,
dtype: DType,
) -> VortexResult<Option<Array>> {
let length = match self.peek().and_then(|m| m.header_as_chunk()) {
let length = match self.peek().and_then(|m| m.header_as_batch()) {
None => return Ok(None),
Some(chunk) => chunk.length() as usize,
};
Expand All @@ -174,7 +180,7 @@ impl<R: VortexRead> MessageReader<R> {
flatbuffer,
|flatbuffer| {
unsafe { root_unchecked::<fb::Message>(flatbuffer) }
.header_as_chunk()
.header_as_batch()
.unwrap()
.array()
.ok_or_else(|| vortex_err!("Chunk missing Array"))
Expand Down
4 changes: 2 additions & 2 deletions vortex-ipc/src/message_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vortex_dtype::DType;
use vortex_flatbuffers::WriteFlatBuffer;

use crate::io::VortexWrite;
use crate::messages::{IPCChunk, IPCMessage, IPCPage, IPCSchema};
use crate::messages::{IPCBatch, IPCMessage, IPCPage, IPCSchema};
use crate::ALIGNMENT;

const ZEROS: [u8; 512] = [0u8; 512];
Expand Down Expand Up @@ -52,7 +52,7 @@ impl<W: VortexWrite> MessageWriter<W> {
let buffer_offsets = chunk.all_buffer_offsets(self.alignment);

// Serialize the Chunk message.
self.write_message(IPCMessage::Chunk(IPCChunk(&chunk)))
self.write_message(IPCMessage::Batch(IPCBatch(&chunk)))
.await?;

// Keep track of the offset to add padding after each buffer.
Expand Down
20 changes: 10 additions & 10 deletions vortex-ipc/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use crate::ALIGNMENT;

pub enum IPCMessage<'a> {
Schema(IPCSchema<'a>),
Chunk(IPCChunk<'a>),
Batch(IPCBatch<'a>),
Page(IPCPage<'a>),
}

pub struct IPCSchema<'a>(pub &'a DType);
pub struct IPCChunk<'a>(pub &'a Array);
pub struct IPCBatch<'a>(pub &'a Array);
pub struct IPCArray<'a>(pub &'a Array);
pub struct IPCPage<'a>(pub &'a Buffer);

Expand All @@ -32,15 +32,15 @@ impl WriteFlatBuffer for IPCMessage<'_> {
) -> WIPOffset<Self::Target<'fb>> {
let header = match self {
Self::Schema(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Chunk(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Batch(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Page(f) => f.write_flatbuffer(fbb).as_union_value(),
};

let mut msg = fb::MessageBuilder::new(fbb);
msg.add_version(Default::default());
msg.add_header_type(match self {
Self::Schema(_) => fb::MessageHeader::Schema,
Self::Chunk(_) => fb::MessageHeader::Chunk,
Self::Batch(_) => fb::MessageHeader::Batch,
Self::Page(_) => fb::MessageHeader::Page,
});
msg.add_header(header);
Expand All @@ -60,8 +60,8 @@ impl<'a> WriteFlatBuffer for IPCSchema<'a> {
}
}

impl<'a> WriteFlatBuffer for IPCChunk<'a> {
type Target<'t> = fb::Chunk<'t>;
impl<'a> WriteFlatBuffer for IPCBatch<'a> {
type Target<'t> = fb::Batch<'t>;

fn write_flatbuffer<'fb>(
&self,
Expand All @@ -77,20 +77,20 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> {
let mut offset = 0;
for array_data in array_data.depth_first_traversal() {
if let Some(buffer) = array_data.buffer() {
let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1);
buffers.push(fb::Buffer::new(
offset as u64,
buffer.len() as u64,
(aligned_size - buffer.len()) as u16,
Compression::None,
));
let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1);
offset += aligned_size;
}
}
let buffers = Some(fbb.create_vector(&buffers));

fb::Chunk::create(
fb::Batch::create(
fbb,
&fb::ChunkArgs {
&fb::BatchArgs {
array,
length,
buffers,
Expand Down

0 comments on commit c360035

Please sign in to comment.