From 6181d443e65cceb37ad30539aa9951a6cc9adc87 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 20 May 2024 10:44:09 +0100 Subject: [PATCH] Add Page message to Vortex IPC (#333) A page holds arbitrary bytes. This can be used by consumers to add custom data into a Vortex stream / file. --- vortex-ipc/flatbuffers/message.fbs | 6 ++++++ vortex-ipc/src/lib.rs | 15 +++++++-------- vortex-ipc/src/message_reader.rs | 17 +++++++++++++++++ vortex-ipc/src/message_writer.rs | 16 +++++++++++++++- vortex-ipc/src/messages.rs | 29 ++++++++++++++++++++++++++--- vortex-ipc/src/stream_reader/mod.rs | 18 ++++++++++++++++++ 6 files changed, 89 insertions(+), 12 deletions(-) diff --git a/vortex-ipc/flatbuffers/message.fbs b/vortex-ipc/flatbuffers/message.fbs index 9b95e1c3d6..418b3e959e 100644 --- a/vortex-ipc/flatbuffers/message.fbs +++ b/vortex-ipc/flatbuffers/message.fbs @@ -35,10 +35,16 @@ table Chunk { buffer_size: uint64; } +table Page { + buffer_size: uint32; + padding: uint16; +} + union MessageHeader { Context, Schema, Chunk, + Page, } table Message { diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index 039855aad9..2e2001006f 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -3,6 +3,13 @@ extern crate core; pub use message_reader::*; pub use message_writer::*; use vortex_error::{vortex_err, VortexError}; +pub mod chunked_reader; +pub mod io; +mod message_reader; +mod message_writer; +mod messages; +pub mod stream_reader; +pub mod writer; pub const ALIGNMENT: usize = 64; @@ -33,14 +40,6 @@ pub mod flatbuffers { } } -pub mod chunked_reader; -pub mod io; -mod message_reader; -mod message_writer; -mod messages; -pub mod stream_reader; -pub mod writer; - pub(crate) const fn missing(field: &'static str) -> impl FnOnce() -> VortexError { move || vortex_err!(InvalidSerde: "missing field: {}", field) } diff --git a/vortex-ipc/src/message_reader.rs b/vortex-ipc/src/message_reader.rs index 20879fed05..9c04d47cda 100644 --- a/vortex-ipc/src/message_reader.rs +++ b/vortex-ipc/src/message_reader.rs @@ -271,4 +271,21 @@ impl MessageReader { }), ) } + + pub async fn maybe_read_page(&mut self) -> VortexResult> { + if self.peek().and_then(|m| m.header_as_page()).is_none() { + return Ok(None); + } + let page_msg = self.next().await?.header_as_page().unwrap(); + + let buffer_len = page_msg.buffer_size() as usize; + let total_len = buffer_len + (page_msg.padding() as usize); + + let mut buffer = self + .read + .read_into(BytesMut::with_capacity(total_len)) + .await?; + buffer.truncate(buffer_len); + Ok(Some(Buffer::from(buffer.freeze()))) + } } diff --git a/vortex-ipc/src/message_writer.rs b/vortex-ipc/src/message_writer.rs index b458ea62be..883c6dcac9 100644 --- a/vortex-ipc/src/message_writer.rs +++ b/vortex-ipc/src/message_writer.rs @@ -4,11 +4,12 @@ use flatbuffers::FlatBufferBuilder; use itertools::Itertools; use vortex::{ArrayData, ViewContext}; use vortex_buffer::io_buf::IoBuf; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_flatbuffers::WriteFlatBuffer; use crate::io::VortexWrite; -use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCSchema}; +use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCPage, IPCSchema}; use crate::ALIGNMENT; const ZEROS: [u8; 512] = [0u8; 512]; @@ -81,6 +82,19 @@ impl MessageWriter { Ok(()) } + pub async fn write_page(&mut self, buffer: Buffer) -> io::Result<()> { + self.write_message(IPCMessage::Page(IPCPage(&buffer))) + .await?; + let buffer_len = buffer.len(); + self.write_all(buffer).await?; + + let aligned_size = (buffer_len + (self.alignment - 1)) & !(self.alignment - 1); + let padding = aligned_size - buffer_len; + self.write_all(&ZEROS[0..padding]).await?; + + Ok(()) + } + async fn write_message(&mut self, flatbuffer: F) -> io::Result<()> { // We reuse the scratch buffer each time and then replace it at the end. // The scratch buffer may be missing if a previous write failed. We could use scopeguard diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index 8ac451484e..57c492ade9 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -2,6 +2,7 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset}; use itertools::Itertools; use vortex::flatbuffers as fba; use vortex::{ArrayData, Context, ViewContext}; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexError}; use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer}; @@ -14,15 +15,14 @@ pub enum IPCMessage<'a> { Context(IPCContext<'a>), Schema(IPCSchema<'a>), Chunk(IPCChunk<'a>), + Page(IPCPage<'a>), } pub struct IPCContext<'a>(pub &'a ViewContext); - pub struct IPCSchema<'a>(pub &'a DType); - pub struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData); - pub struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData); +pub struct IPCPage<'a>(pub &'a Buffer); impl FlatBufferRoot for IPCMessage<'_> {} @@ -37,6 +37,7 @@ impl WriteFlatBuffer for IPCMessage<'_> { Self::Context(f) => f.write_flatbuffer(fbb).as_union_value(), Self::Schema(f) => f.write_flatbuffer(fbb).as_union_value(), Self::Chunk(f) => f.write_flatbuffer(fbb).as_union_value(), + Self::Page(f) => f.write_flatbuffer(fbb).as_union_value(), }; let mut msg = fb::MessageBuilder::new(fbb); @@ -45,6 +46,7 @@ impl WriteFlatBuffer for IPCMessage<'_> { Self::Context(_) => fb::MessageHeader::Context, Self::Schema(_) => fb::MessageHeader::Schema, Self::Chunk(_) => fb::MessageHeader::Chunk, + Self::Page(_) => fb::MessageHeader::Page, }); msg.add_header(header); msg.finish() @@ -205,3 +207,24 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { ) } } + +impl<'a> WriteFlatBuffer for IPCPage<'a> { + type Target<'t> = fb::Page<'t>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let buffer_size = self.0.len(); + let aligned_size = (buffer_size + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); + let padding_size = aligned_size - buffer_size; + + fb::Page::create( + fbb, + &fb::PageArgs { + buffer_size: buffer_size as u32, + padding: padding_size as u16, + }, + ) + } +} diff --git a/vortex-ipc/src/stream_reader/mod.rs b/vortex-ipc/src/stream_reader/mod.rs index 59471c7843..a9ebd8b2fa 100644 --- a/vortex-ipc/src/stream_reader/mod.rs +++ b/vortex-ipc/src/stream_reader/mod.rs @@ -1,8 +1,11 @@ use std::ops::Deref; use std::sync::Arc; +use futures_util::stream::try_unfold; +use futures_util::Stream; use vortex::stream::ArrayStream; use vortex::{Context, ViewContext}; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -64,4 +67,19 @@ impl StreamArrayReader { let dtype = self.dtype.as_ref().expect("DType not set").deref().clone(); self.msgs.array_stream(view_context, dtype) } + + /// Reads a single page from the stream. + pub async fn next_page(&mut self) -> VortexResult> { + self.msgs.maybe_read_page().await + } + + /// Reads consecutive pages from the stream until the message type changes. + pub async fn page_stream(&mut self) -> impl Stream> + '_ { + try_unfold(self, |reader| async { + match reader.next_page().await? { + Some(page) => Ok(Some((page, reader))), + None => Ok(None), + } + }) + } }