Skip to content

Commit

Permalink
Add Page message to Vortex IPC (#333)
Browse files Browse the repository at this point in the history
A page holds arbitrary bytes. This can be used by consumers to add
custom data into a Vortex stream / file.
  • Loading branch information
gatesn authored May 20, 2024
1 parent f515492 commit 6181d44
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 12 deletions.
6 changes: 6 additions & 0 deletions vortex-ipc/flatbuffers/message.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@ table Chunk {
buffer_size: uint64;
}

table Page {
buffer_size: uint32;
padding: uint16;
}

union MessageHeader {
Context,
Schema,
Chunk,
Page,
}

table Message {
Expand Down
15 changes: 7 additions & 8 deletions vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ extern crate core;
pub use message_reader::*;
pub use message_writer::*;
use vortex_error::{vortex_err, VortexError};
pub mod chunked_reader;
pub mod io;
mod message_reader;
mod message_writer;
mod messages;
pub mod stream_reader;
pub mod writer;

pub const ALIGNMENT: usize = 64;

Expand Down Expand Up @@ -33,14 +40,6 @@ pub mod flatbuffers {
}
}

pub mod chunked_reader;
pub mod io;
mod message_reader;
mod message_writer;
mod messages;
pub mod stream_reader;
pub mod writer;

pub(crate) const fn missing(field: &'static str) -> impl FnOnce() -> VortexError {
move || vortex_err!(InvalidSerde: "missing field: {}", field)
}
Expand Down
17 changes: 17 additions & 0 deletions vortex-ipc/src/message_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,21 @@ impl<R: VortexRead> MessageReader<R> {
}),
)
}

pub async fn maybe_read_page(&mut self) -> VortexResult<Option<Buffer>> {
if self.peek().and_then(|m| m.header_as_page()).is_none() {
return Ok(None);
}
let page_msg = self.next().await?.header_as_page().unwrap();

let buffer_len = page_msg.buffer_size() as usize;
let total_len = buffer_len + (page_msg.padding() as usize);

let mut buffer = self
.read
.read_into(BytesMut::with_capacity(total_len))
.await?;
buffer.truncate(buffer_len);
Ok(Some(Buffer::from(buffer.freeze())))
}
}
16 changes: 15 additions & 1 deletion vortex-ipc/src/message_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use flatbuffers::FlatBufferBuilder;
use itertools::Itertools;
use vortex::{ArrayData, ViewContext};
use vortex_buffer::io_buf::IoBuf;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_flatbuffers::WriteFlatBuffer;

use crate::io::VortexWrite;
use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCSchema};
use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCPage, IPCSchema};
use crate::ALIGNMENT;

const ZEROS: [u8; 512] = [0u8; 512];
Expand Down Expand Up @@ -81,6 +82,19 @@ impl<W: VortexWrite> MessageWriter<W> {
Ok(())
}

pub async fn write_page(&mut self, buffer: Buffer) -> io::Result<()> {
self.write_message(IPCMessage::Page(IPCPage(&buffer)))
.await?;
let buffer_len = buffer.len();
self.write_all(buffer).await?;

let aligned_size = (buffer_len + (self.alignment - 1)) & !(self.alignment - 1);
let padding = aligned_size - buffer_len;
self.write_all(&ZEROS[0..padding]).await?;

Ok(())
}

async fn write_message<F: WriteFlatBuffer>(&mut self, flatbuffer: F) -> io::Result<()> {
// We reuse the scratch buffer each time and then replace it at the end.
// The scratch buffer may be missing if a previous write failed. We could use scopeguard
Expand Down
29 changes: 26 additions & 3 deletions vortex-ipc/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex::flatbuffers as fba;
use vortex::{ArrayData, Context, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError};
use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer};
Expand All @@ -14,15 +15,14 @@ pub enum IPCMessage<'a> {
Context(IPCContext<'a>),
Schema(IPCSchema<'a>),
Chunk(IPCChunk<'a>),
Page(IPCPage<'a>),
}

pub struct IPCContext<'a>(pub &'a ViewContext);

pub struct IPCSchema<'a>(pub &'a DType);

pub struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData);

pub struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData);
pub struct IPCPage<'a>(pub &'a Buffer);

impl FlatBufferRoot for IPCMessage<'_> {}

Expand All @@ -37,6 +37,7 @@ impl WriteFlatBuffer for IPCMessage<'_> {
Self::Context(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Schema(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Chunk(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Page(f) => f.write_flatbuffer(fbb).as_union_value(),
};

let mut msg = fb::MessageBuilder::new(fbb);
Expand All @@ -45,6 +46,7 @@ impl WriteFlatBuffer for IPCMessage<'_> {
Self::Context(_) => fb::MessageHeader::Context,
Self::Schema(_) => fb::MessageHeader::Schema,
Self::Chunk(_) => fb::MessageHeader::Chunk,
Self::Page(_) => fb::MessageHeader::Page,
});
msg.add_header(header);
msg.finish()
Expand Down Expand Up @@ -205,3 +207,24 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
)
}
}

impl<'a> WriteFlatBuffer for IPCPage<'a> {
type Target<'t> = fb::Page<'t>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let buffer_size = self.0.len();
let aligned_size = (buffer_size + (ALIGNMENT - 1)) & !(ALIGNMENT - 1);
let padding_size = aligned_size - buffer_size;

fb::Page::create(
fbb,
&fb::PageArgs {
buffer_size: buffer_size as u32,
padding: padding_size as u16,
},
)
}
}
18 changes: 18 additions & 0 deletions vortex-ipc/src/stream_reader/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::ops::Deref;
use std::sync::Arc;

use futures_util::stream::try_unfold;
use futures_util::Stream;
use vortex::stream::ArrayStream;
use vortex::{Context, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::VortexResult;

Expand Down Expand Up @@ -64,4 +67,19 @@ impl<R: VortexRead> StreamArrayReader<R> {
let dtype = self.dtype.as_ref().expect("DType not set").deref().clone();
self.msgs.array_stream(view_context, dtype)
}

/// Reads a single page from the stream.
pub async fn next_page(&mut self) -> VortexResult<Option<Buffer>> {
self.msgs.maybe_read_page().await
}

/// Reads consecutive pages from the stream until the message type changes.
pub async fn page_stream(&mut self) -> impl Stream<Item = VortexResult<Buffer>> + '_ {
try_unfold(self, |reader| async {
match reader.next_page().await? {
Some(page) => Ok(Some((page, reader))),
None => Ok(None),
}
})
}
}

0 comments on commit 6181d44

Please sign in to comment.