Skip to content

Commit

Permalink
IPC Terminator (#267)
Browse files Browse the repository at this point in the history
This PR adds an end-of-stream marker to the IPC format such that the
format itself can be embedded within other streams without consuming any
further bytes.

It also fixes a bug whereby an ArrayStreamReader would return an error
if it saw anything but a ChunkMessage, which would occur whenever we
wrote multiple arrays into a single stream. There is now a
StreamMessageReader that essentially allows peeking at the next message
in order to check if it should be processed.
  • Loading branch information
gatesn authored Apr 26, 2024
1 parent 06d5ba1 commit c12db77
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 96 deletions.
34 changes: 2 additions & 32 deletions vortex-flatbuffers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::io;
use std::io::{Read, Write};
use std::io::Write;

use flatbuffers::{root, FlatBufferBuilder, Follow, Verifiable, WIPOffset};
use vortex_error::{vortex_err, VortexResult};
use flatbuffers::{FlatBufferBuilder, WIPOffset};

pub trait FlatBufferRoot {}

Expand Down Expand Up @@ -35,35 +34,6 @@ impl<F: WriteFlatBuffer + FlatBufferRoot> FlatBufferToBytes for F {
}
}

pub trait FlatBufferReader {
/// Returns Ok(None) if the reader has reached EOF.
fn read_message<'a, F>(&mut self, buffer: &'a mut Vec<u8>) -> VortexResult<Option<F>>
where
F: 'a + Follow<'a, Inner = F> + Verifiable;
}

impl<R: Read> FlatBufferReader for R {
fn read_message<'a, F>(&mut self, buffer: &'a mut Vec<u8>) -> VortexResult<Option<F>>
where
F: 'a + Follow<'a, Inner = F> + Verifiable,
{
let mut msg_size: [u8; 4] = [0; 4];
if let Err(e) = self.read_exact(&mut msg_size) {
return match e.kind() {
io::ErrorKind::UnexpectedEof => Ok(None),
_ => Err(vortex_err!(IOError: e)),
};
}
let msg_size = u32::from_le_bytes(msg_size) as u64;
if msg_size == 0 {
// FIXME(ngates): I think this is wrong.
return Ok(None);
}
self.take(msg_size).read_to_end(buffer)?;
Ok(Some(root::<F>(buffer)?))
}
}

pub trait FlatBufferWriter {
// Write the given FlatBuffer message, appending padding until the total bytes written
// are a multiple of `alignment`.
Expand Down
6 changes: 4 additions & 2 deletions vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ mod tests {

let mut cursor = Cursor::new(Vec::new());
let ctx = SerdeContext::default();
let mut writer = StreamWriter::try_new_unbuffered(&mut cursor, ctx).unwrap();
writer.write_array(&arr).unwrap();
{
let mut writer = StreamWriter::try_new_unbuffered(&mut cursor, ctx).unwrap();
writer.write_array(&arr).unwrap();
}
cursor.flush().unwrap();
cursor.set_position(0);

Expand Down
Loading

0 comments on commit c12db77

Please sign in to comment.