Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Page message to Vortex IPC #333

Merged
merged 4 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions vortex-ipc/flatbuffers/message.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@ table Chunk {
buffer_size: uint64;
}

table Page {
buffer_size: uint32;
padding: uint16;
}

union MessageHeader {
Context,
Schema,
Chunk,
Page,
}

table Message {
Expand Down
15 changes: 7 additions & 8 deletions vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ extern crate core;
pub use message_reader::*;
pub use message_writer::*;
use vortex_error::{vortex_err, VortexError};
pub mod chunked_reader;
pub mod io;
mod message_reader;
mod message_writer;
mod messages;
pub mod stream_reader;
pub mod writer;

pub const ALIGNMENT: usize = 64;

Expand Down Expand Up @@ -33,14 +40,6 @@ pub mod flatbuffers {
}
}

pub mod chunked_reader;
pub mod io;
mod message_reader;
mod message_writer;
mod messages;
pub mod stream_reader;
pub mod writer;

pub(crate) const fn missing(field: &'static str) -> impl FnOnce() -> VortexError {
move || vortex_err!(InvalidSerde: "missing field: {}", field)
}
Expand Down
17 changes: 17 additions & 0 deletions vortex-ipc/src/message_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,21 @@ impl<R: VortexRead> MessageReader<R> {
}),
)
}

pub async fn maybe_read_page(&mut self) -> VortexResult<Option<Buffer>> {
if self.peek().and_then(|m| m.header_as_page()).is_none() {
return Ok(None);
}
let page_msg = self.next().await?.header_as_page().unwrap();

let buffer_len = page_msg.buffer_size() as usize;
let total_len = buffer_len + (page_msg.padding() as usize);

let mut buffer = self
.read
.read_into(BytesMut::with_capacity(total_len))
.await?;
buffer.truncate(buffer_len);
Ok(Some(Buffer::from(buffer.freeze())))
}
}
16 changes: 15 additions & 1 deletion vortex-ipc/src/message_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use flatbuffers::FlatBufferBuilder;
use itertools::Itertools;
use vortex::{ArrayData, ViewContext};
use vortex_buffer::io_buf::IoBuf;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_flatbuffers::WriteFlatBuffer;

use crate::io::VortexWrite;
use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCSchema};
use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCPage, IPCSchema};
use crate::ALIGNMENT;

const ZEROS: [u8; 512] = [0u8; 512];
Expand Down Expand Up @@ -81,6 +82,19 @@ impl<W: VortexWrite> MessageWriter<W> {
Ok(())
}

pub async fn write_page(&mut self, buffer: Buffer) -> io::Result<()> {
self.write_message(IPCMessage::Page(IPCPage(&buffer)))
.await?;
let buffer_len = buffer.len();
self.write_all(buffer).await?;

let aligned_size = (buffer_len + (self.alignment - 1)) & !(self.alignment - 1);
let padding = aligned_size - buffer_len;
self.write_all(&ZEROS[0..padding]).await?;

Ok(())
}

async fn write_message<F: WriteFlatBuffer>(&mut self, flatbuffer: F) -> io::Result<()> {
// We reuse the scratch buffer each time and then replace it at the end.
// The scratch buffer may be missing if a previous write failed. We could use scopeguard
Expand Down
29 changes: 26 additions & 3 deletions vortex-ipc/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex::flatbuffers as fba;
use vortex::{ArrayData, Context, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError};
use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer};
Expand All @@ -14,15 +15,14 @@ pub enum IPCMessage<'a> {
Context(IPCContext<'a>),
Schema(IPCSchema<'a>),
Chunk(IPCChunk<'a>),
Page(IPCPage<'a>),
}

pub struct IPCContext<'a>(pub &'a ViewContext);

pub struct IPCSchema<'a>(pub &'a DType);

pub struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData);

pub struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData);
pub struct IPCPage<'a>(pub &'a Buffer);

impl FlatBufferRoot for IPCMessage<'_> {}

Expand All @@ -37,6 +37,7 @@ impl WriteFlatBuffer for IPCMessage<'_> {
Self::Context(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Schema(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Chunk(f) => f.write_flatbuffer(fbb).as_union_value(),
Self::Page(f) => f.write_flatbuffer(fbb).as_union_value(),
};

let mut msg = fb::MessageBuilder::new(fbb);
Expand All @@ -45,6 +46,7 @@ impl WriteFlatBuffer for IPCMessage<'_> {
Self::Context(_) => fb::MessageHeader::Context,
Self::Schema(_) => fb::MessageHeader::Schema,
Self::Chunk(_) => fb::MessageHeader::Chunk,
Self::Page(_) => fb::MessageHeader::Page,
});
msg.add_header(header);
msg.finish()
Expand Down Expand Up @@ -205,3 +207,24 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
)
}
}

impl<'a> WriteFlatBuffer for IPCPage<'a> {
type Target<'t> = fb::Page<'t>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let buffer_size = self.0.len();
let aligned_size = (buffer_size + (ALIGNMENT - 1)) & !(ALIGNMENT - 1);
let padding_size = aligned_size - buffer_size;

fb::Page::create(
fbb,
&fb::PageArgs {
buffer_size: buffer_size as u32,
padding: padding_size as u16,
},
)
}
}
18 changes: 18 additions & 0 deletions vortex-ipc/src/stream_reader/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::ops::Deref;
use std::sync::Arc;

use futures_util::stream::try_unfold;
use futures_util::Stream;
use vortex::stream::ArrayStream;
use vortex::{Context, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::VortexResult;

Expand Down Expand Up @@ -64,4 +67,19 @@ impl<R: VortexRead> StreamArrayReader<R> {
let dtype = self.dtype.as_ref().expect("DType not set").deref().clone();
self.msgs.array_stream(view_context, dtype)
}

/// Reads a single page from the stream.
pub async fn next_page(&mut self) -> VortexResult<Option<Buffer>> {
self.msgs.maybe_read_page().await
}

/// Reads consecutive pages from the stream until the message type changes.
pub async fn page_stream(&mut self) -> impl Stream<Item = VortexResult<Buffer>> + '_ {
try_unfold(self, |reader| async {
match reader.next_page().await? {
Some(page) => Ok(Some((page, reader))),
None => Ok(None),
}
})
}
}