From ddc60589dedf6572fb6f910fd83683eb790194e4 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 8 May 2024 17:57:52 +0100 Subject: [PATCH 01/11] Async --- Cargo.lock | 2 + Cargo.toml | 2 + vortex-ipc/Cargo.toml | 4 +- vortex-ipc/src/async.rs | 89 ++++++++++++++++++++++++++++++++++++++ vortex-ipc/src/lib.rs | 1 + vortex-ipc/src/messages.rs | 10 ++--- 6 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 vortex-ipc/src/async.rs diff --git a/Cargo.lock b/Cargo.lock index 8d8f187969..bdfd06486b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5059,9 +5059,11 @@ dependencies = [ "fallible-iterator", "flatbuffers", "flatc", + "futures", "itertools 0.12.1", "log", "nougat", + "pin-project", "rand", "simplelog", "vortex-alp", diff --git a/Cargo.toml b/Cargo.toml index bf92f3ed1b..c25e1135ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ flatbuffers = "23.5.26" flatc = "0.2.2" flexbuffers = "2.0.0" fs_extra = "1.3.0" +futures = "0.3.30" getrandom = "0.2.14" half = { version = "^2", features = ["std", "num-traits"] } hashbrown = "0.14.3" @@ -75,6 +76,7 @@ num-traits = "0.2.18" num_enum = "0.7.2" parquet = "51.0.0" paste = "1.0.14" +pin-project = "1.1.5" prost = "0.12.4" prost-build = "0.12.4" prost-types = "0.12.4" diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index a7782dae80..21390a9e23 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -15,9 +15,11 @@ rust-version = { workspace = true } arrow-buffer = { workspace = true } fallible-iterator = { workspace = true } flatbuffers = { workspace = true } +futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } nougat = "0.2.4" +pin-project = { workspace = true } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } vortex-error = { path = "../vortex-error" } @@ -42,7 +44,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } [lints] -workspace = true +# workspace = true [[bench]] name = "ipc_take" diff --git a/vortex-ipc/src/async.rs b/vortex-ipc/src/async.rs new file mode 100644 index 0000000000..78ec42b013 --- /dev/null +++ b/vortex-ipc/src/async.rs @@ -0,0 +1,89 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::stream::Stream; +use pin_project::pin_project; +use vortex::array::primitive::PrimitiveArray; +use vortex::{IntoArray, OwnedArray}; +use vortex_buffer::Buffer; +use vortex_dtype::DType; +use vortex_error::{vortex_err, VortexResult}; + +use crate::messages::IPCMessage; + +type ArrayChunk = OwnedArray; + +/// Iterate over many arrays in a single stream. +pub trait IPCReader: Stream>> {} + +/// Similar to Arrow RecordBatchReader. +pub trait ArrayReader: Stream> { + fn dtype(&self) -> &DType; +} + +#[pin_project] +struct MessageArrayReader { + dtype: DType, + #[pin] + message_reader: M, +} + +impl ArrayReader for MessageArrayReader { + fn dtype(&self) -> &DType { + &self.dtype + } +} + +/// Return a new ArrayChunk from a stream of messages. +impl Stream for MessageArrayReader { + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let msg = match self.project().message_reader.poll_next(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + }; + + // TODO(ngates): how expensive can this get? Should we offload? + let chunk = msg.and_then(|msg| { + if let IPCMessage::Chunk(_chunk) = msg { + Ok(PrimitiveArray::from(vec![0, 1, 2]).into_array()) + } else { + Err(vortex_err!("expected IPCChunk")) + } + }); + + Poll::Ready(Some(chunk)) + } +} + +// An abstraction for reading Vortex messages that can be implemented for several IO frameworks. +pub trait MessageReader: Stream>> {} + +///// Compatability with byte streams + +/// Wrap a stream of bytes into a `MessageReader`. +pub struct StreamMessageReader(pub S) +where + S: Stream>, + B: Into; + +impl MessageReader for StreamMessageReader +where + S: Stream>, + B: Into, +{ +} + +impl Stream for StreamMessageReader +where + S: Stream>, + B: Into, +{ + type Item = VortexResult>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + todo!() + } +} diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index dbc4620191..e90501992b 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -23,6 +23,7 @@ pub mod flatbuffers { } } +pub mod r#async; pub mod iter; mod messages; pub mod reader; diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index 46a744b5c8..e0c1eb1c49 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -10,16 +10,16 @@ use crate::flatbuffers::ipc as fb; use crate::flatbuffers::ipc::Compression; use crate::{missing, ALIGNMENT}; -pub(crate) enum IPCMessage<'a> { +pub enum IPCMessage<'a> { Context(IPCContext<'a>), Schema(IPCSchema<'a>), Chunk(IPCChunk<'a>), } -pub(crate) struct IPCContext<'a>(pub &'a ViewContext); -pub(crate) struct IPCSchema<'a>(pub &'a DType); -pub(crate) struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData); -pub(crate) struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData); +pub struct IPCContext<'a>(pub &'a ViewContext); +pub struct IPCSchema<'a>(pub &'a DType); +pub struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData); +pub struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData); impl FlatBufferRoot for IPCMessage<'_> {} impl WriteFlatBuffer for IPCMessage<'_> { From 86ee2cef890eba58b777bb1b656bb3bac8f20fcf Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Thu, 9 May 2024 16:26:14 +0100 Subject: [PATCH 02/11] Async --- Cargo.lock | 118 +++++++++++++++++++++++- vortex-buffer/src/lib.rs | 6 ++ vortex-ipc/Cargo.toml | 14 ++- vortex-ipc/src/array_reader.rs | 40 ++++++++ vortex-ipc/src/async.rs | 89 ------------------ vortex-ipc/src/codecs/futures.rs | 62 +++++++++++++ vortex-ipc/src/codecs/message_reader.rs | 75 +++++++++++++++ vortex-ipc/src/codecs/message_stream.rs | 8 ++ vortex-ipc/src/codecs/mod.rs | 10 ++ vortex-ipc/src/lib.rs | 3 +- 10 files changed, 332 insertions(+), 93 deletions(-) create mode 100644 vortex-ipc/src/array_reader.rs delete mode 100644 vortex-ipc/src/async.rs create mode 100644 vortex-ipc/src/codecs/futures.rs create mode 100644 vortex-ipc/src/codecs/message_reader.rs create mode 100644 vortex-ipc/src/codecs/message_stream.rs create mode 100644 vortex-ipc/src/codecs/mod.rs diff --git a/Cargo.lock b/Cargo.lock index bdfd06486b..1330bec004 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,6 +363,19 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "834eee9ce518130a3b4d5af09ecc43e9d6b57ee76613f227a1ddd6b77c7a62bc" +[[package]] +name = "asynchronous-codec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "atoi" version = "2.0.0" @@ -372,6 +385,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "auto-const-array" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f7df18977a1ee03650ee4b31b4aefed6d56bac188760b6e37610400fe8d4bb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "autocfg" version = "1.3.0" @@ -1903,6 +1927,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2285,6 +2318,16 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -2895,6 +2938,15 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.9.1" @@ -2932,6 +2984,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] @@ -2967,6 +3020,48 @@ dependencies = [ "uuid", ] +[[package]] +name = "monoio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60824f1a372fa200937885a2d434610333fe47d6c6f3d0855bd4555b46316fe7" +dependencies = [ + "auto-const-array", + "bytes", + "fxhash", + "io-uring", + "libc", + "memchr", + "mio", + "monoio-macros", + "nix", + "pin-project-lite", + "socket2", + "windows-sys 0.48.0", +] + +[[package]] +name = "monoio-codec" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58c0e52577d2f766a87e7d60df9a635a35aecf37962e801ca07c73666b95b620" +dependencies = [ + "bytes", + "monoio", + "tokio-util", +] + +[[package]] +name = "monoio-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "multimap" version = "0.10.0" @@ -2991,6 +3086,19 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", + "memoffset 0.7.1", + "pin-utils", +] + [[package]] name = "nom" version = "7.1.3" @@ -3627,7 +3735,7 @@ dependencies = [ "cfg-if", "indoc", "libc", - "memoffset", + "memoffset 0.9.1", "parking_lot", "portable-atomic", "pyo3-build-config", @@ -5055,17 +5163,23 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", + "asynchronous-codec", + "bytes", "criterion", "fallible-iterator", "flatbuffers", "flatc", - "futures", + "futures-util", "itertools 0.12.1", "log", + "monoio", + "monoio-codec", "nougat", "pin-project", "rand", "simplelog", + "tokio", + "tokio-util", "vortex-alp", "vortex-array", "vortex-buffer", diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index 4c9f112d78..57cf71277d 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -106,6 +106,12 @@ impl From> for Buffer { } } +impl From for Buffer { + fn from(value: bytes::Bytes) -> Self { + Buffer::Bytes(value) + } +} + impl From for Buffer { fn from(value: ArrowBuffer) -> Self { Buffer::Arrow(value) diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index 21390a9e23..aac4dc7a52 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -13,13 +13,19 @@ rust-version = { workspace = true } [dependencies] arrow-buffer = { workspace = true } +asynchronous-codec = { version = "0.7.0", optional = true } +bytes = { workspace = true } fallible-iterator = { workspace = true } flatbuffers = { workspace = true } -futures = { workspace = true } +futures-util = { version = "0.3.30" } itertools = { workspace = true } log = { workspace = true } +monoio = { version = "0.2.3", optional = true } +monoio-codec = { version = "0.3.4", optional = true } nougat = "0.2.4" pin-project = { workspace = true } +tokio = { version = "1.37.0", optional = true } +tokio-util = { version = "0.7.11", default-features = false, features = ["codec"], optional = true } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } vortex-error = { path = "../vortex-error" } @@ -46,6 +52,12 @@ arrow-select = { workspace = true } [lints] # workspace = true +[features] +default = ["futures", "monoio", "tokio"] +futures = ["dep:asynchronous-codec"] +monoio = ["dep:monoio", "dep:monoio-codec"] +tokio = ["dep:tokio", "dep:tokio-util"] + [[bench]] name = "ipc_take" harness = false diff --git a/vortex-ipc/src/array_reader.rs b/vortex-ipc/src/array_reader.rs new file mode 100644 index 0000000000..1f533c255a --- /dev/null +++ b/vortex-ipc/src/array_reader.rs @@ -0,0 +1,40 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_util::Stream; +use pin_project::pin_project; +use vortex::OwnedArray; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +/// NOTE: similar to Arrow RecordBatchReader. +pub trait ArrayReader: Stream> { + fn dtype(&self) -> &DType; +} + +/// Wrap a DType with a stream of array chunks to implement an ArrayReader. +#[pin_project] +pub struct ArrayReaderImpl { + dtype: DType, + #[pin] + inner: S, +} + +impl>> ArrayReader for ArrayReaderImpl { + #[inline] + fn dtype(&self) -> &DType { + &self.dtype + } +} + +impl>> Stream for ArrayReaderImpl { + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} diff --git a/vortex-ipc/src/async.rs b/vortex-ipc/src/async.rs deleted file mode 100644 index 78ec42b013..0000000000 --- a/vortex-ipc/src/async.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::stream::Stream; -use pin_project::pin_project; -use vortex::array::primitive::PrimitiveArray; -use vortex::{IntoArray, OwnedArray}; -use vortex_buffer::Buffer; -use vortex_dtype::DType; -use vortex_error::{vortex_err, VortexResult}; - -use crate::messages::IPCMessage; - -type ArrayChunk = OwnedArray; - -/// Iterate over many arrays in a single stream. -pub trait IPCReader: Stream>> {} - -/// Similar to Arrow RecordBatchReader. -pub trait ArrayReader: Stream> { - fn dtype(&self) -> &DType; -} - -#[pin_project] -struct MessageArrayReader { - dtype: DType, - #[pin] - message_reader: M, -} - -impl ArrayReader for MessageArrayReader { - fn dtype(&self) -> &DType { - &self.dtype - } -} - -/// Return a new ArrayChunk from a stream of messages. -impl Stream for MessageArrayReader { - type Item = VortexResult; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let msg = match self.project().message_reader.poll_next(cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => return Poll::Pending, - }; - - // TODO(ngates): how expensive can this get? Should we offload? - let chunk = msg.and_then(|msg| { - if let IPCMessage::Chunk(_chunk) = msg { - Ok(PrimitiveArray::from(vec![0, 1, 2]).into_array()) - } else { - Err(vortex_err!("expected IPCChunk")) - } - }); - - Poll::Ready(Some(chunk)) - } -} - -// An abstraction for reading Vortex messages that can be implemented for several IO frameworks. -pub trait MessageReader: Stream>> {} - -///// Compatability with byte streams - -/// Wrap a stream of bytes into a `MessageReader`. -pub struct StreamMessageReader(pub S) -where - S: Stream>, - B: Into; - -impl MessageReader for StreamMessageReader -where - S: Stream>, - B: Into, -{ -} - -impl Stream for StreamMessageReader -where - S: Stream>, - B: Into, -{ - type Item = VortexResult>; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - todo!() - } -} diff --git a/vortex-ipc/src/codecs/futures.rs b/vortex-ipc/src/codecs/futures.rs new file mode 100644 index 0000000000..7674ab041d --- /dev/null +++ b/vortex-ipc/src/codecs/futures.rs @@ -0,0 +1,62 @@ +#![cfg(feature = "futures")] + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use asynchronous_codec::{Decoder, FramedRead}; +use bytes::{Buf, BytesMut}; +use futures_util::{AsyncRead, Stream}; +use pin_project::pin_project; +use vortex_buffer::Buffer; +use vortex_error::{VortexError, VortexResult}; + +use crate::codecs::message_stream::MessageStream; + +/// The Vortex message codec implemented over streams of bytes. +struct StreamMessageCodec; + +impl Decoder for StreamMessageCodec { + type Item = Buffer; + type Error = VortexError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.len() < 4 { + return Ok(None); + } + + // Extract the length of the message. + let mut len_bytes = [0u8; 4]; + len_bytes.copy_from_slice(&src[..4]); + let len = u32::from_le_bytes(len_bytes) as usize; + + if src.len() - 4 >= len { + // Skip the length header we already read. + src.advance(4); + Ok(Some(src.split_to(len).freeze().into())) + } else { + Ok(None) + } + } +} + +#[pin_project] +pub struct AsyncReadMessageStream { + #[pin] + framed: FramedRead, +} + +impl AsyncReadMessageStream { + pub fn new(read: R) -> Self { + Self { + framed: FramedRead::new(read, StreamMessageCodec), + } + } +} + +impl Stream for AsyncReadMessageStream { + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Stream::poll_next(self.project().framed, cx) + } +} diff --git a/vortex-ipc/src/codecs/message_reader.rs b/vortex-ipc/src/codecs/message_reader.rs new file mode 100644 index 0000000000..0b82881f37 --- /dev/null +++ b/vortex-ipc/src/codecs/message_reader.rs @@ -0,0 +1,75 @@ +use std::io; + +use bytes::BytesMut; +use flatbuffers::{root, root_unchecked}; +use futures_util::{AsyncRead, AsyncReadExt}; +use vortex_error::VortexResult; + +use crate::flatbuffers::ipc::Message; + +struct AsyncReadMessageReader { + // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. + message: BytesMut, + prev_message: BytesMut, + finished: bool, +} + +impl AsyncReadMessageReader { + pub async fn try_new(read: &mut R) -> VortexResult { + let mut reader = Self { + message: BytesMut::new(), + prev_message: BytesMut::new(), + finished: false, + }; + reader.load_next_message(read).await?; + Ok(reader) + } + + pub fn peek(&self) -> Option { + if self.finished { + return None; + } + // The message has been validated by the next() call. + Some(unsafe { root_unchecked::(&self.message) }) + } + + pub async fn next(&mut self, read: &mut R) -> VortexResult { + if self.finished { + panic!("StreamMessageReader is finished - should've checked peek!"); + } + self.prev_message = self.message.split(); + if !self.load_next_message(read).await? { + self.finished = true; + } + Ok(unsafe { root_unchecked::(&self.prev_message) }) + } + + async fn load_next_message(&mut self, read: &mut R) -> VortexResult { + let mut len_buf = [0u8; 4]; + match read.read_exact(&mut len_buf).await { + Ok(()) => {} + Err(e) => { + return match e.kind() { + io::ErrorKind::UnexpectedEof => Ok(false), + _ => Err(e.into()), + }; + } + } + + let len = u32::from_le_bytes(len_buf); + if len == u32::MAX { + // Marker for no more messages. + return Ok(false); + } + + assert_eq!(self.message.len(), 0); + // self.message.clear(); + self.message.reserve(len as usize); + self.message.truncate(len as usize); + read.read_exact(&mut self.message).await?; + + /// Validate that the message is valid a flatbuffer. + let _ = root::(&self.message)?; + Ok(true) + } +} diff --git a/vortex-ipc/src/codecs/message_stream.rs b/vortex-ipc/src/codecs/message_stream.rs new file mode 100644 index 0000000000..054b907a31 --- /dev/null +++ b/vortex-ipc/src/codecs/message_stream.rs @@ -0,0 +1,8 @@ +use futures_util::Stream; +use vortex_buffer::Buffer; +use vortex_error::VortexResult; + +/// A message stream allows the caller to consume Vortex messages as well as arbitrary buffers. +pub trait MessageStream: Stream> { + fn read_exact(&mut self, buffer: B) -> VortexResult; +} diff --git a/vortex-ipc/src/codecs/mod.rs b/vortex-ipc/src/codecs/mod.rs new file mode 100644 index 0000000000..3b3a0ab3cd --- /dev/null +++ b/vortex-ipc/src/codecs/mod.rs @@ -0,0 +1,10 @@ +use vortex_buffer::Buffer; + +mod futures; +mod message_reader; +mod message_stream; + +pub enum Message { + FlatBuffer(Buffer), + ByteBuffer(Buffer), +} diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index e90501992b..3bab34f804 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -23,7 +23,8 @@ pub mod flatbuffers { } } -pub mod r#async; +mod array_reader; +mod codecs; pub mod iter; mod messages; pub mod reader; From 780ada0bcab5e70f99d6910aa0fc8d79fe44137a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 May 2024 12:06:24 +0100 Subject: [PATCH 03/11] merge --- Cargo.lock | 11 ++ vortex-array/src/view.rs | 2 +- vortex-ipc/Cargo.toml | 4 +- vortex-ipc/src/array_reader.rs | 1 + vortex-ipc/src/codecs/array_reader.rs | 157 ++++++++++++++++++ vortex-ipc/src/codecs/async_message_reader.rs | 101 +++++++++++ vortex-ipc/src/codecs/futures.rs | 62 ------- vortex-ipc/src/codecs/message_reader.rs | 103 +++++------- vortex-ipc/src/codecs/message_stream.rs | 8 - vortex-ipc/src/codecs/mod.rs | 12 +- .../src/codecs/monoio_message_reader.rs | 111 +++++++++++++ vortex-ipc/src/lib.rs | 2 + vortex-ipc/src/reader2.rs | 10 ++ vortex-ipc/src/stream.rs | 14 ++ 14 files changed, 454 insertions(+), 144 deletions(-) create mode 100644 vortex-ipc/src/codecs/array_reader.rs create mode 100644 vortex-ipc/src/codecs/async_message_reader.rs delete mode 100644 vortex-ipc/src/codecs/futures.rs delete mode 100644 vortex-ipc/src/codecs/message_stream.rs create mode 100644 vortex-ipc/src/codecs/monoio_message_reader.rs create mode 100644 vortex-ipc/src/reader2.rs create mode 100644 vortex-ipc/src/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 1330bec004..52061bbc00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4368,6 +4368,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "simplelog" version = "0.12.2" @@ -4734,7 +4743,9 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 96d4fbb036..676047e062 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -230,7 +230,7 @@ impl<'v> IntoArray<'v> for ArrayView<'v> { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ViewContext { encodings: Vec, } diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index aac4dc7a52..50d41ec701 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -20,11 +20,11 @@ flatbuffers = { workspace = true } futures-util = { version = "0.3.30" } itertools = { workspace = true } log = { workspace = true } -monoio = { version = "0.2.3", optional = true } +monoio = { version = "0.2.3", optional = true, features = ["bytes"] } monoio-codec = { version = "0.3.4", optional = true } nougat = "0.2.4" pin-project = { workspace = true } -tokio = { version = "1.37.0", optional = true } +tokio = { version = "1.37.0", optional = true, features = ["full"] } tokio-util = { version = "0.7.11", default-features = false, features = ["codec"], optional = true } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } diff --git a/vortex-ipc/src/array_reader.rs b/vortex-ipc/src/array_reader.rs index 1f533c255a..74a360324a 100644 --- a/vortex-ipc/src/array_reader.rs +++ b/vortex-ipc/src/array_reader.rs @@ -9,6 +9,7 @@ use vortex_error::VortexResult; /// NOTE: similar to Arrow RecordBatchReader. pub trait ArrayReader: Stream> { + #[allow(dead_code)] fn dtype(&self) -> &DType; } diff --git a/vortex-ipc/src/codecs/array_reader.rs b/vortex-ipc/src/codecs/array_reader.rs new file mode 100644 index 0000000000..49d2451982 --- /dev/null +++ b/vortex-ipc/src/codecs/array_reader.rs @@ -0,0 +1,157 @@ +use std::pin::Pin; +use std::task::Poll; + +use bytes::BytesMut; +use futures_util::Stream; +use vortex::{ArrayView, Context, IntoArray, OwnedArray, ToArray, ViewContext}; +use vortex_buffer::Buffer; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; + +use crate::codecs::message_reader::MessageReader; +use crate::messages::SerdeContextDeserializer; +use crate::reader2::ArrayReader; + +pub struct IPCReader<'a, M: MessageReader> { + view_ctx: ViewContext, + messages: &'a mut M, +} + +impl<'m, M: MessageReader> IPCReader<'m, M> { + pub fn new(view_ctx: ViewContext, messages: &'m mut M) -> Self { + Self { view_ctx, messages } + } + + pub async fn try_from_messages(ctx: &Context, messages: &'m mut M) -> VortexResult { + match messages.peek() { + None => vortex_bail!("IPC stream is empty"), + Some(msg) => { + if msg.header_as_context().is_none() { + vortex_bail!(InvalidSerde: "Expected IPC Context as first message in stream") + } + } + } + + let view_ctx: ViewContext = SerdeContextDeserializer { + fb: messages.next().await?.header_as_context().unwrap(), + ctx, + } + .try_into()?; + + Ok(Self { messages, view_ctx }) + } + + pub async fn next<'a>(&'a mut self) -> VortexResult>> { + if self + .messages + .peek() + .and_then(|msg| msg.header_as_schema()) + .is_none() + { + return Ok(None); + } + + let schema_msg = self.messages.next().await?.header_as_schema().unwrap(); + + let dtype = DType::try_from( + schema_msg + .dtype() + .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?, + ) + .map_err(|e| vortex_err!(InvalidSerde: "Failed to parse DType: {}", e))?; + + Ok(Some(MessageArrayReader::new( + self.view_ctx.clone(), + dtype, + &mut self.messages, + ))) + } +} + +pub struct MessageArrayReader<'a, M: MessageReader> { + ctx: ViewContext, + dtype: DType, + messages: &'a mut M, + + // State + buffers: Vec, + row_offset: usize, +} + +impl MessageArrayReader<'_, M> { + /// Construct an ArrayReader with a message stream containing chunk messages. + pub fn new(ctx: ViewContext, dtype: DType, messages: &mut M) -> Self { + Self { + ctx, + dtype, + messages, + buffers: Vec::new(), + row_offset: 0, + } + } + + async fn next_chunk(&mut self) -> VortexResult> { + let Some(chunk_msg) = self.messages.peek().and_then(|msg| msg.header_as_chunk()) else { + return Ok(None); + }; + + // Read all the column's buffers + self.buffers.clear(); + let mut offset = 0; + for buffer in chunk_msg.buffers().unwrap_or_default().iter() { + let _skip = buffer.offset() - offset; + self.messages.skip(buffer.offset() - offset).await?; + + // TODO(ngates): read into a single buffer, then Arc::clone and slice + let bytes = BytesMut::zeroed(buffer.length() as usize); + let bytes = self.messages.read_into(bytes).await?; + self.buffers.push(Buffer::from(bytes.freeze())); + + offset = buffer.offset() + buffer.length(); + } + + // Consume any remaining padding after the final buffer. + self.messages.skip(chunk_msg.buffer_size() - offset).await?; + + // After reading the buffers we're now able to load the next message. + let col_array = self + .messages + .next() + .await? + .header_as_chunk() + .unwrap() + .array() + .unwrap(); + + let view = ArrayView::try_new(&self.ctx, &self.dtype, col_array, self.buffers.as_slice())?; + + // Validate it + view.to_array().with_dyn(|_| Ok::<(), VortexError>(()))?; + + let array = view.into_array(); + self.row_offset += array.len(); + Ok(Some(array)) + } +} + +pub struct ArrayReaderImpl { + dtype: DType, +} + +impl ArrayReader for ArrayReaderImpl { + #[inline] + fn dtype(&self) -> &DType { + &self.dtype + } +} + +impl Stream for ArrayReaderImpl { + type Item = VortexResult; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_next(cx) + } +} diff --git a/vortex-ipc/src/codecs/async_message_reader.rs b/vortex-ipc/src/codecs/async_message_reader.rs new file mode 100644 index 0000000000..23ccbfa006 --- /dev/null +++ b/vortex-ipc/src/codecs/async_message_reader.rs @@ -0,0 +1,101 @@ +use std::io; + +use bytes::BytesMut; +use flatbuffers::{root, root_unchecked}; +use futures_util::{AsyncRead, AsyncReadExt}; +use vortex_error::VortexResult; + +use crate::codecs::message_reader::test::create_stream; +use crate::codecs::message_reader::MessageReader; +use crate::flatbuffers::ipc::Message; + +pub struct AsyncReadMessageReader { + // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. + read: R, + message: BytesMut, + prev_message: BytesMut, + finished: bool, +} + +impl AsyncReadMessageReader { + pub async fn try_new(read: R) -> VortexResult { + let mut reader = Self { + read, + message: BytesMut::new(), + prev_message: BytesMut::new(), + finished: false, + }; + reader.load_next_message().await?; + Ok(reader) + } + + async fn load_next_message(&mut self) -> VortexResult { + let mut len_buf = [0u8; 4]; + match self.read.read_exact(&mut len_buf).await { + Ok(()) => {} + Err(e) => { + println!("ERROR READING LENGTH"); + return match e.kind() { + io::ErrorKind::UnexpectedEof => Ok(false), + _ => Err(e.into()), + }; + } + } + + let len = u32::from_le_bytes(len_buf); + if len == u32::MAX { + // Marker for no more messages. + return Ok(false); + } + + assert_eq!(self.message.len(), 0); + self.message.resize(len as usize, 0); + println!("MESSAGE {}", self.message.len()); + self.read.read_exact(&mut self.message).await?; + + // Validate that the message is valid a flatbuffer. + let _ = root::(&self.message)?; + Ok(true) + } +} + +impl MessageReader for AsyncReadMessageReader { + fn peek(&self) -> Option { + if self.finished { + return None; + } + // The message has been validated by the next() call. + Some(unsafe { root_unchecked::(&self.message) }) + } + + async fn next(&mut self) -> VortexResult { + if self.finished { + panic!("StreamMessageReader is finished - should've checked peek!"); + } + self.prev_message = self.message.split(); + if !self.load_next_message().await? { + self.finished = true; + } + Ok(unsafe { root_unchecked::(&self.prev_message) }) + } +} + +#[tokio::test] +async fn test_something() -> VortexResult<()> { + let buffer = create_stream(); + + // TODO(ngates): stream + // let _stream = buffer + // .into_iter() + // .chunks(64) + // .into_iter() + // .map(|chunk| chunk.collect::>()); + + let mut reader = AsyncReadMessageReader::try_new(buffer.as_slice()).await?; + while reader.peek().is_some() { + let msg = reader.next().await?; + println!("MSG {:?}", msg); + } + + Ok(()) +} diff --git a/vortex-ipc/src/codecs/futures.rs b/vortex-ipc/src/codecs/futures.rs deleted file mode 100644 index 7674ab041d..0000000000 --- a/vortex-ipc/src/codecs/futures.rs +++ /dev/null @@ -1,62 +0,0 @@ -#![cfg(feature = "futures")] - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use asynchronous_codec::{Decoder, FramedRead}; -use bytes::{Buf, BytesMut}; -use futures_util::{AsyncRead, Stream}; -use pin_project::pin_project; -use vortex_buffer::Buffer; -use vortex_error::{VortexError, VortexResult}; - -use crate::codecs::message_stream::MessageStream; - -/// The Vortex message codec implemented over streams of bytes. -struct StreamMessageCodec; - -impl Decoder for StreamMessageCodec { - type Item = Buffer; - type Error = VortexError; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - if src.len() < 4 { - return Ok(None); - } - - // Extract the length of the message. - let mut len_bytes = [0u8; 4]; - len_bytes.copy_from_slice(&src[..4]); - let len = u32::from_le_bytes(len_bytes) as usize; - - if src.len() - 4 >= len { - // Skip the length header we already read. - src.advance(4); - Ok(Some(src.split_to(len).freeze().into())) - } else { - Ok(None) - } - } -} - -#[pin_project] -pub struct AsyncReadMessageStream { - #[pin] - framed: FramedRead, -} - -impl AsyncReadMessageStream { - pub fn new(read: R) -> Self { - Self { - framed: FramedRead::new(read, StreamMessageCodec), - } - } -} - -impl Stream for AsyncReadMessageStream { - type Item = VortexResult; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Stream::poll_next(self.project().framed, cx) - } -} diff --git a/vortex-ipc/src/codecs/message_reader.rs b/vortex-ipc/src/codecs/message_reader.rs index 0b82881f37..5f7bc4cca9 100644 --- a/vortex-ipc/src/codecs/message_reader.rs +++ b/vortex-ipc/src/codecs/message_reader.rs @@ -1,75 +1,54 @@ -use std::io; +use std::future::Future; use bytes::BytesMut; -use flatbuffers::{root, root_unchecked}; -use futures_util::{AsyncRead, AsyncReadExt}; use vortex_error::VortexResult; use crate::flatbuffers::ipc::Message; -struct AsyncReadMessageReader { - // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. - message: BytesMut, - prev_message: BytesMut, - finished: bool, +#[allow(dead_code)] +pub trait MessageReader { + fn peek(&self) -> Option; + fn next(&mut self) -> impl Future>; + fn skip(&mut self, nbytes: u64) -> impl Future>; + fn read_into(&mut self, buffer: BytesMut) -> impl Future>; } -impl AsyncReadMessageReader { - pub async fn try_new(read: &mut R) -> VortexResult { - let mut reader = Self { - message: BytesMut::new(), - prev_message: BytesMut::new(), - finished: false, - }; - reader.load_next_message(read).await?; - Ok(reader) - } - - pub fn peek(&self) -> Option { - if self.finished { - return None; - } - // The message has been validated by the next() call. - Some(unsafe { root_unchecked::(&self.message) }) - } - - pub async fn next(&mut self, read: &mut R) -> VortexResult { - if self.finished { - panic!("StreamMessageReader is finished - should've checked peek!"); - } - self.prev_message = self.message.split(); - if !self.load_next_message(read).await? { - self.finished = true; - } - Ok(unsafe { root_unchecked::(&self.prev_message) }) - } - - async fn load_next_message(&mut self, read: &mut R) -> VortexResult { - let mut len_buf = [0u8; 4]; - match read.read_exact(&mut len_buf).await { - Ok(()) => {} - Err(e) => { - return match e.kind() { - io::ErrorKind::UnexpectedEof => Ok(false), - _ => Err(e.into()), - }; - } - } - - let len = u32::from_le_bytes(len_buf); - if len == u32::MAX { - // Marker for no more messages. - return Ok(false); +#[cfg(test)] +pub mod test { + use std::io::Cursor; + + use vortex::array::chunked::ChunkedArray; + use vortex::array::primitive::PrimitiveArray; + use vortex::encoding::EncodingRef; + use vortex::{ArrayDType, Context, IntoArray}; + use vortex_alp::ALPEncoding; + use vortex_fastlanes::BitPackedEncoding; + + use crate::writer::StreamWriter; + + pub fn create_stream() -> Vec { + let ctx = Context::default().with_encodings([ + &ALPEncoding as EncodingRef, + &BitPackedEncoding as EncodingRef, + ]); + let array = PrimitiveArray::from(vec![0, 1, 2]).into_array(); + let chunked_array = + ChunkedArray::try_new(vec![array.clone(), array.clone()], array.dtype().clone()) + .unwrap() + .into_array(); + + let mut buffer = vec![]; + let mut cursor = Cursor::new(&mut buffer); + { + let mut writer = StreamWriter::try_new(&mut cursor, &ctx).unwrap(); + writer.write_array(&array).unwrap(); + writer.write_array(&chunked_array).unwrap(); } - assert_eq!(self.message.len(), 0); - // self.message.clear(); - self.message.reserve(len as usize); - self.message.truncate(len as usize); - read.read_exact(&mut self.message).await?; + // Push some extra bytes to test that the reader is well-behaved and doesn't read past the + // end of the stream. + // let _ = cursor.write(b"hello").unwrap(); - /// Validate that the message is valid a flatbuffer. - let _ = root::(&self.message)?; - Ok(true) + buffer } } diff --git a/vortex-ipc/src/codecs/message_stream.rs b/vortex-ipc/src/codecs/message_stream.rs deleted file mode 100644 index 054b907a31..0000000000 --- a/vortex-ipc/src/codecs/message_stream.rs +++ /dev/null @@ -1,8 +0,0 @@ -use futures_util::Stream; -use vortex_buffer::Buffer; -use vortex_error::VortexResult; - -/// A message stream allows the caller to consume Vortex messages as well as arbitrary buffers. -pub trait MessageStream: Stream> { - fn read_exact(&mut self, buffer: B) -> VortexResult; -} diff --git a/vortex-ipc/src/codecs/mod.rs b/vortex-ipc/src/codecs/mod.rs index 3b3a0ab3cd..98ec182784 100644 --- a/vortex-ipc/src/codecs/mod.rs +++ b/vortex-ipc/src/codecs/mod.rs @@ -1,10 +1,4 @@ -use vortex_buffer::Buffer; - -mod futures; +mod array_reader; +mod async_message_reader; mod message_reader; -mod message_stream; - -pub enum Message { - FlatBuffer(Buffer), - ByteBuffer(Buffer), -} +mod monoio_message_reader; diff --git a/vortex-ipc/src/codecs/monoio_message_reader.rs b/vortex-ipc/src/codecs/monoio_message_reader.rs new file mode 100644 index 0000000000..6f4bf1b3ed --- /dev/null +++ b/vortex-ipc/src/codecs/monoio_message_reader.rs @@ -0,0 +1,111 @@ +#![cfg(feature = "monoio")] +#![allow(dead_code)] + +use bytes::BytesMut; +use flatbuffers::{root, root_unchecked}; +use monoio::buf::IoBufMut; +use monoio::io::{AsyncReadRent, AsyncReadRentExt}; +use vortex_error::VortexResult; + +use crate::codecs::message_reader::test::create_stream; +use crate::codecs::message_reader::MessageReader; +use crate::flatbuffers::ipc::Message; + +struct MonoIoMessageReader { + // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. + read: R, + message: BytesMut, + prev_message: BytesMut, + finished: bool, +} + +impl MonoIoMessageReader { + pub async fn try_new(read: R) -> VortexResult { + let mut reader = Self { + read, + message: BytesMut::new(), + prev_message: BytesMut::new(), + finished: false, + }; + reader.load_next_message().await?; + Ok(reader) + } + + async fn load_next_message(&mut self) -> VortexResult { + // FIXME(ngates): how do we read into a stack allocated thing? + let len_buf = self.read.read_exact_into(Vec::with_capacity(4)).await?; + + let len = u32::from_le_bytes(len_buf.as_slice().try_into()?); + if len == u32::MAX { + // Marker for no more messages. + return Ok(false); + } + + // TODO(ngates): we may be able to use self.message.split() and then swap back after. + + let message = self + .read + .read_exact_into(BytesMut::with_capacity(len as usize)) + .await?; + + // Validate that the message is valid a flatbuffer. + let _ = root::(message.as_ref())?; + + self.message = message; + + Ok(true) + } +} + +impl MessageReader for MonoIoMessageReader { + fn peek(&self) -> Option { + if self.finished { + return None; + } + // The message has been validated by the next() call. + Some(unsafe { root_unchecked::(&self.message) }) + } + + async fn next(&mut self) -> VortexResult { + if self.finished { + panic!("StreamMessageReader is finished - should've checked peek!"); + } + self.prev_message = self.message.split(); + if !self.load_next_message().await? { + self.finished = true; + } + Ok(unsafe { root_unchecked::(&self.prev_message) }) + } +} + +trait AsyncReadRentMoreExt: AsyncReadRentExt { + /// Same as read_exact except unwraps the BufResult into a regular IO result. + async fn read_exact_into(&mut self, buf: B) -> std::io::Result { + match self.read_exact(buf).await { + (Ok(_), buf) => Ok(buf), + (Err(e), _) => Err(e), + } + } +} + +impl AsyncReadRentMoreExt for R {} + +#[monoio::test] +async fn test_something() -> VortexResult<()> { + let buffer = create_stream(); + + // TODO(ngates): stream + // let _stream = buffer + // .into_iter() + // .chunks(64) + // .into_iter() + // .map(|chunk| chunk.collect::>()); + + let mut reader = MonoIoMessageReader::try_new(buffer.as_slice()).await?; + while reader.peek().is_some() { + let msg = reader.next().await?; + println!("MSG {:?}", msg); + } + + Ok(()) +} diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index 17663f6a1e..7e3aed9867 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -34,6 +34,8 @@ mod codecs; pub mod iter; mod messages; pub mod reader; +mod reader2; +mod stream; pub mod writer; pub(crate) const fn missing(field: &'static str) -> impl FnOnce() -> VortexError { diff --git a/vortex-ipc/src/reader2.rs b/vortex-ipc/src/reader2.rs new file mode 100644 index 0000000000..4023712fa2 --- /dev/null +++ b/vortex-ipc/src/reader2.rs @@ -0,0 +1,10 @@ +use futures_util::Stream; +use vortex::OwnedArray; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +/// A stream of array chunks along with a DType. +pub trait ArrayReader: Stream> { + #[allow(dead_code)] + fn dtype(&self) -> &DType; +} diff --git a/vortex-ipc/src/stream.rs b/vortex-ipc/src/stream.rs new file mode 100644 index 0000000000..39e4254d9f --- /dev/null +++ b/vortex-ipc/src/stream.rs @@ -0,0 +1,14 @@ +use std::future::Future; + +#[must_use = "streams must be polled"] +pub trait Stream { + type Item; + + fn next(&mut self) -> impl Future>; + + /// Returns the bounds on the remaining length of the stream. + #[inline] + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} From 839c19966113eb1c78d6339231a4d6dd75ea736d Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 May 2024 15:15:38 +0100 Subject: [PATCH 04/11] merge --- vortex-buffer/src/lib.rs | 1 + vortex-ipc/Cargo.toml | 2 +- vortex-ipc/src/codecs/array_reader.rs | 78 +++++--- vortex-ipc/src/codecs/async_message_reader.rs | 101 ---------- vortex-ipc/src/codecs/ipc_reader.rs | 0 .../src/codecs/message_reader/futures.rs | 179 ++++++++++++++++++ .../mod.rs} | 9 +- .../monoio.rs} | 81 ++++++-- vortex-ipc/src/codecs/mod.rs | 3 +- vortex-ipc/src/lending_stream.rs | 9 + vortex-ipc/src/lib.rs | 3 +- vortex-ipc/src/reader2.rs | 10 - vortex-ipc/src/stream.rs | 14 -- 13 files changed, 319 insertions(+), 171 deletions(-) delete mode 100644 vortex-ipc/src/codecs/async_message_reader.rs create mode 100644 vortex-ipc/src/codecs/ipc_reader.rs create mode 100644 vortex-ipc/src/codecs/message_reader/futures.rs rename vortex-ipc/src/codecs/{message_reader.rs => message_reader/mod.rs} (88%) rename vortex-ipc/src/codecs/{monoio_message_reader.rs => message_reader/monoio.rs} (54%) create mode 100644 vortex-ipc/src/lending_stream.rs delete mode 100644 vortex-ipc/src/reader2.rs delete mode 100644 vortex-ipc/src/stream.rs diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index 57cf71277d..f1e9403cd3 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -58,6 +58,7 @@ impl Buffer { } } + // TODO(ngates): make this more like `into_mut` and get back a Vortex BufferMut. pub fn into_vec(self) -> Result, Buffer> { match self { Buffer::Arrow(buffer) => match_each_native_ptype!(T::PTYPE, |$T| { diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index 50d41ec701..cb882e9efa 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -25,7 +25,7 @@ monoio-codec = { version = "0.3.4", optional = true } nougat = "0.2.4" pin-project = { workspace = true } tokio = { version = "1.37.0", optional = true, features = ["full"] } -tokio-util = { version = "0.7.11", default-features = false, features = ["codec"], optional = true } +tokio-util = { version = "0.7.11", default-features = false, features = ["codec", "io-util"], optional = true } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } vortex-error = { path = "../vortex-error" } diff --git a/vortex-ipc/src/codecs/array_reader.rs b/vortex-ipc/src/codecs/array_reader.rs index 49d2451982..daead31fd3 100644 --- a/vortex-ipc/src/codecs/array_reader.rs +++ b/vortex-ipc/src/codecs/array_reader.rs @@ -1,22 +1,22 @@ use std::pin::Pin; use std::task::Poll; -use bytes::BytesMut; use futures_util::Stream; -use vortex::{ArrayView, Context, IntoArray, OwnedArray, ToArray, ViewContext}; +use pin_project::pin_project; +use vortex::{Array, ArrayView, Context, IntoArray, OwnedArray, ToArray, ToStatic, ViewContext}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; use crate::codecs::message_reader::MessageReader; use crate::messages::SerdeContextDeserializer; -use crate::reader2::ArrayReader; pub struct IPCReader<'a, M: MessageReader> { view_ctx: ViewContext, messages: &'a mut M, } +#[allow(dead_code)] impl<'m, M: MessageReader> IPCReader<'m, M> { pub fn new(view_ctx: ViewContext, messages: &'m mut M) -> Self { Self { view_ctx, messages } @@ -78,9 +78,9 @@ pub struct MessageArrayReader<'a, M: MessageReader> { row_offset: usize, } -impl MessageArrayReader<'_, M> { +impl<'m, M: MessageReader> MessageArrayReader<'m, M> { /// Construct an ArrayReader with a message stream containing chunk messages. - pub fn new(ctx: ViewContext, dtype: DType, messages: &mut M) -> Self { + pub fn new(ctx: ViewContext, dtype: DType, messages: &'m mut M) -> Self { Self { ctx, dtype, @@ -90,28 +90,33 @@ impl MessageArrayReader<'_, M> { } } - async fn next_chunk(&mut self) -> VortexResult> { - let Some(chunk_msg) = self.messages.peek().and_then(|msg| msg.header_as_chunk()) else { - return Ok(None); - }; + pub fn into_reader(self) -> impl ArrayReader + 'm { + let dtype = self.dtype.clone(); - // Read all the column's buffers - self.buffers.clear(); - let mut offset = 0; - for buffer in chunk_msg.buffers().unwrap_or_default().iter() { - let _skip = buffer.offset() - offset; - self.messages.skip(buffer.offset() - offset).await?; + let inner = futures_util::stream::unfold(self, |mut reader| async move { + match reader.next().await { + Ok(Some(array)) => Some((Ok(array.to_static()), reader)), + Ok(None) => None, + Err(e) => Some((Err(e), reader)), + } + }); - // TODO(ngates): read into a single buffer, then Arc::clone and slice - let bytes = BytesMut::zeroed(buffer.length() as usize); - let bytes = self.messages.read_into(bytes).await?; - self.buffers.push(Buffer::from(bytes.freeze())); + ArrayReaderImpl { dtype, inner } + } +} - offset = buffer.offset() + buffer.length(); +impl MessageArrayReader<'_, M> { + pub async fn next(&mut self) -> VortexResult> { + if self + .messages + .peek() + .and_then(|msg| msg.header_as_chunk()) + .is_none() + { + return Ok(None); } - // Consume any remaining padding after the final buffer. - self.messages.skip(chunk_msg.buffer_size() - offset).await?; + self.buffers = self.messages.buffers().await?; // After reading the buffers we're now able to load the next message. let col_array = self @@ -134,18 +139,35 @@ impl MessageArrayReader<'_, M> { } } -pub struct ArrayReaderImpl { +/// A stream of array chunks along with a DType. +pub trait ArrayReader: Stream> { + #[allow(dead_code)] + fn dtype(&self) -> &DType; +} + +#[pin_project] +struct ArrayReaderImpl +where + S: Stream>, +{ dtype: DType, + #[pin] + inner: S, } -impl ArrayReader for ArrayReaderImpl { - #[inline] +impl ArrayReader for ArrayReaderImpl +where + S: Stream>, +{ fn dtype(&self) -> &DType { &self.dtype } } -impl Stream for ArrayReaderImpl { +impl Stream for ArrayReaderImpl +where + S: Stream>, +{ type Item = VortexResult; fn poll_next( @@ -154,4 +176,8 @@ impl Stream for ArrayReaderImpl { ) -> Poll> { self.project().inner.poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } } diff --git a/vortex-ipc/src/codecs/async_message_reader.rs b/vortex-ipc/src/codecs/async_message_reader.rs deleted file mode 100644 index 23ccbfa006..0000000000 --- a/vortex-ipc/src/codecs/async_message_reader.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::io; - -use bytes::BytesMut; -use flatbuffers::{root, root_unchecked}; -use futures_util::{AsyncRead, AsyncReadExt}; -use vortex_error::VortexResult; - -use crate::codecs::message_reader::test::create_stream; -use crate::codecs::message_reader::MessageReader; -use crate::flatbuffers::ipc::Message; - -pub struct AsyncReadMessageReader { - // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. - read: R, - message: BytesMut, - prev_message: BytesMut, - finished: bool, -} - -impl AsyncReadMessageReader { - pub async fn try_new(read: R) -> VortexResult { - let mut reader = Self { - read, - message: BytesMut::new(), - prev_message: BytesMut::new(), - finished: false, - }; - reader.load_next_message().await?; - Ok(reader) - } - - async fn load_next_message(&mut self) -> VortexResult { - let mut len_buf = [0u8; 4]; - match self.read.read_exact(&mut len_buf).await { - Ok(()) => {} - Err(e) => { - println!("ERROR READING LENGTH"); - return match e.kind() { - io::ErrorKind::UnexpectedEof => Ok(false), - _ => Err(e.into()), - }; - } - } - - let len = u32::from_le_bytes(len_buf); - if len == u32::MAX { - // Marker for no more messages. - return Ok(false); - } - - assert_eq!(self.message.len(), 0); - self.message.resize(len as usize, 0); - println!("MESSAGE {}", self.message.len()); - self.read.read_exact(&mut self.message).await?; - - // Validate that the message is valid a flatbuffer. - let _ = root::(&self.message)?; - Ok(true) - } -} - -impl MessageReader for AsyncReadMessageReader { - fn peek(&self) -> Option { - if self.finished { - return None; - } - // The message has been validated by the next() call. - Some(unsafe { root_unchecked::(&self.message) }) - } - - async fn next(&mut self) -> VortexResult { - if self.finished { - panic!("StreamMessageReader is finished - should've checked peek!"); - } - self.prev_message = self.message.split(); - if !self.load_next_message().await? { - self.finished = true; - } - Ok(unsafe { root_unchecked::(&self.prev_message) }) - } -} - -#[tokio::test] -async fn test_something() -> VortexResult<()> { - let buffer = create_stream(); - - // TODO(ngates): stream - // let _stream = buffer - // .into_iter() - // .chunks(64) - // .into_iter() - // .map(|chunk| chunk.collect::>()); - - let mut reader = AsyncReadMessageReader::try_new(buffer.as_slice()).await?; - while reader.peek().is_some() { - let msg = reader.next().await?; - println!("MSG {:?}", msg); - } - - Ok(()) -} diff --git a/vortex-ipc/src/codecs/ipc_reader.rs b/vortex-ipc/src/codecs/ipc_reader.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/vortex-ipc/src/codecs/message_reader/futures.rs b/vortex-ipc/src/codecs/message_reader/futures.rs new file mode 100644 index 0000000000..ef9e48f5f0 --- /dev/null +++ b/vortex-ipc/src/codecs/message_reader/futures.rs @@ -0,0 +1,179 @@ +#![cfg(feature = "futures")] +use std::io; + +use bytes::{Bytes, BytesMut}; +use flatbuffers::{root, root_unchecked}; +use futures_util::{AsyncRead, AsyncReadExt, TryStreamExt}; +use vortex::encoding::EncodingRef; +use vortex::Context; +use vortex_alp::ALPEncoding; +use vortex_buffer::Buffer; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use vortex_fastlanes::BitPackedEncoding; + +use crate::codecs::array_reader::IPCReader; +use crate::codecs::message_reader::test::create_stream; +use crate::codecs::message_reader::MessageReader; +use crate::flatbuffers::ipc::Message; + +pub struct AsyncReadMessageReader { + // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. + read: R, + message: BytesMut, + prev_message: BytesMut, + finished: bool, +} + +impl AsyncReadMessageReader { + pub async fn try_new(read: R) -> VortexResult { + let mut reader = Self { + read, + message: BytesMut::new(), + prev_message: BytesMut::new(), + finished: false, + }; + reader.load_next_message().await?; + Ok(reader) + } + + async fn load_next_message(&mut self) -> VortexResult { + let mut len_buf = [0u8; 4]; + match self.read.read_exact(&mut len_buf).await { + Ok(()) => {} + Err(e) => { + return match e.kind() { + io::ErrorKind::UnexpectedEof => Ok(false), + _ => Err(e.into()), + }; + } + } + + let len = u32::from_le_bytes(len_buf); + if len == u32::MAX { + // Marker for no more messages. + return Ok(false); + } else if len == 0 { + vortex_bail!(InvalidSerde: "Invalid IPC stream") + } + + let mut message = BytesMut::zeroed(len as usize); + self.read.read_exact(message.as_mut()).await?; + + self.message = message; + + // Validate that the message is valid a flatbuffer. + root::(&self.message).map_err( + |e| vortex_err!(InvalidSerde: "Failed to parse flatbuffer message: {:?}", e), + )?; + + Ok(true) + } +} + +trait AsyncReadMoreExt: AsyncReadExt { + async fn skip(&mut self, nbytes: u64) -> VortexResult<()> + where + Self: Unpin, + { + // TODO(ngates): can we grab dev/null? At the very least we should do this in small buffers. + let mut bytes = BytesMut::zeroed(nbytes as usize); + self.read_exact(bytes.as_mut()).await?; + Ok(()) + } +} +impl AsyncReadMoreExt for R {} + +impl MessageReader for AsyncReadMessageReader { + fn peek(&self) -> Option { + if self.finished { + return None; + } + // The message has been validated by the next() call. + Some(unsafe { root_unchecked::(&self.message) }) + } + + async fn next(&mut self) -> VortexResult { + if self.finished { + panic!("StreamMessageReader is finished - should've checked peek!"); + } + self.prev_message = self.message.split(); + if !self.load_next_message().await? { + self.finished = true; + } + Ok(unsafe { root_unchecked::(&self.prev_message) }) + } + + async fn buffers(&mut self) -> VortexResult> { + let Some(chunk_msg) = unsafe { root_unchecked::(&self.message) }.header_as_chunk() + else { + // We could return an error here? + return Ok(Vec::new()); + }; + + // Read all the column's buffers + let mut offset = 0; + let mut buffers = Vec::with_capacity(chunk_msg.buffers().unwrap_or_default().len()); + for buffer in chunk_msg.buffers().unwrap_or_default().iter() { + let _skip = buffer.offset() - offset; + self.read.skip(buffer.offset() - offset).await?; + + // TODO(ngates): read into a single buffer, then Arc::clone and slice + let mut bytes = BytesMut::zeroed(buffer.length() as usize); + self.read.read_exact(bytes.as_mut()).await?; + buffers.push(Buffer::from(bytes.freeze())); + + offset = buffer.offset() + buffer.length(); + } + + // Consume any remaining padding after the final buffer. + let _buffer_size = chunk_msg.buffer_size(); + self.read.skip(chunk_msg.buffer_size() - offset).await?; + + Ok(buffers) + } +} + +#[tokio::test] +async fn test_something() -> VortexResult<()> { + let buffer = create_stream(); + + let ctx = Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = AsyncReadMessageReader::try_new(buffer.as_slice()).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(mut array) = reader.next().await? { + println!("ARRAY"); + + while let Some(chunk) = array.next().await? { + println!("chunk {:?}", chunk); + } + } + + Ok(()) +} + +#[tokio::test] +async fn test_stream() -> VortexResult<()> { + let buffer = create_stream(); + + let stream = futures_util::stream::iter( + buffer + .chunks(64) + .map(|chunk| Ok(Bytes::from(chunk.to_vec()))), + ); + let reader = stream.into_async_read(); + + let ctx = Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = AsyncReadMessageReader::try_new(reader).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(mut array) = reader.next().await? { + println!("ARRAY"); + + while let Some(chunk) = array.next().await? { + println!("chunk {:?}", chunk); + } + } + + Ok(()) +} diff --git a/vortex-ipc/src/codecs/message_reader.rs b/vortex-ipc/src/codecs/message_reader/mod.rs similarity index 88% rename from vortex-ipc/src/codecs/message_reader.rs rename to vortex-ipc/src/codecs/message_reader/mod.rs index 5f7bc4cca9..aacee2cbfe 100644 --- a/vortex-ipc/src/codecs/message_reader.rs +++ b/vortex-ipc/src/codecs/message_reader/mod.rs @@ -1,16 +1,17 @@ use std::future::Future; -use bytes::BytesMut; +use vortex_buffer::Buffer; use vortex_error::VortexResult; - +mod futures; +mod monoio; use crate::flatbuffers::ipc::Message; #[allow(dead_code)] pub trait MessageReader { fn peek(&self) -> Option; fn next(&mut self) -> impl Future>; - fn skip(&mut self, nbytes: u64) -> impl Future>; - fn read_into(&mut self, buffer: BytesMut) -> impl Future>; + /// Fetch the buffers associated with this message. + fn buffers(&mut self) -> impl Future>>; } #[cfg(test)] diff --git a/vortex-ipc/src/codecs/monoio_message_reader.rs b/vortex-ipc/src/codecs/message_reader/monoio.rs similarity index 54% rename from vortex-ipc/src/codecs/monoio_message_reader.rs rename to vortex-ipc/src/codecs/message_reader/monoio.rs index 6f4bf1b3ed..2e13d35a0a 100644 --- a/vortex-ipc/src/codecs/monoio_message_reader.rs +++ b/vortex-ipc/src/codecs/message_reader/monoio.rs @@ -5,8 +5,14 @@ use bytes::BytesMut; use flatbuffers::{root, root_unchecked}; use monoio::buf::IoBufMut; use monoio::io::{AsyncReadRent, AsyncReadRentExt}; +use vortex::encoding::EncodingRef; +use vortex::Context; +use vortex_alp::ALPEncoding; +use vortex_buffer::Buffer; use vortex_error::VortexResult; +use vortex_fastlanes::BitPackedEncoding; +use crate::codecs::array_reader::IPCReader; use crate::codecs::message_reader::test::create_stream; use crate::codecs::message_reader::MessageReader; use crate::flatbuffers::ipc::Message; @@ -76,6 +82,34 @@ impl MessageReader for MonoIoMessageReader { } Ok(unsafe { root_unchecked::(&self.prev_message) }) } + + async fn buffers(&mut self) -> VortexResult> { + let Some(chunk_msg) = unsafe { root_unchecked::(&self.message) }.header_as_chunk() + else { + // We could return an error here? + return Ok(Vec::new()); + }; + + // Read all the column's buffers + let mut offset = 0; + let mut buffers = Vec::with_capacity(chunk_msg.buffers().unwrap_or_default().len()); + for buffer in chunk_msg.buffers().unwrap_or_default().iter() { + let _skip = buffer.offset() - offset; + self.read.skip(buffer.offset() - offset).await?; + + // TODO(ngates): read into a single buffer, then Arc::clone and slice + let bytes = BytesMut::zeroed(buffer.length() as usize); + let bytes = self.read.read_exact_into(bytes).await?; + buffers.push(Buffer::from(bytes.freeze())); + + offset = buffer.offset() + buffer.length(); + } + + // Consume any remaining padding after the final buffer. + self.read.skip(chunk_msg.buffer_size() - offset).await?; + + Ok(buffers) + } } trait AsyncReadRentMoreExt: AsyncReadRentExt { @@ -86,6 +120,13 @@ trait AsyncReadRentMoreExt: AsyncReadRentExt { (Err(e), _) => Err(e), } } + + async fn skip(&mut self, nbytes: u64) -> VortexResult<()> { + let _ = self + .read_exact_into(BytesMut::zeroed(nbytes as usize)) + .await?; + Ok(()) + } } impl AsyncReadRentMoreExt for R {} @@ -94,17 +135,35 @@ impl AsyncReadRentMoreExt for R {} async fn test_something() -> VortexResult<()> { let buffer = create_stream(); - // TODO(ngates): stream - // let _stream = buffer - // .into_iter() - // .chunks(64) - // .into_iter() - // .map(|chunk| chunk.collect::>()); - - let mut reader = MonoIoMessageReader::try_new(buffer.as_slice()).await?; - while reader.peek().is_some() { - let msg = reader.next().await?; - println!("MSG {:?}", msg); + let ctx = Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(mut array) = reader.next().await? { + println!("ARRAY"); + + while let Some(chunk) = array.next().await? { + println!("chunk {:?}", chunk); + } + } + + Ok(()) +} + +#[monoio::test] +async fn test_array_stream() -> VortexResult<()> { + let buffer = create_stream(); + + let ctx = Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(mut array) = reader.next().await? { + println!("ARRAY"); + + while let Some(chunk) = array.next().await? { + println!("chunk {:?}", chunk); + } } Ok(()) diff --git a/vortex-ipc/src/codecs/mod.rs b/vortex-ipc/src/codecs/mod.rs index 98ec182784..f3ef693ef4 100644 --- a/vortex-ipc/src/codecs/mod.rs +++ b/vortex-ipc/src/codecs/mod.rs @@ -1,4 +1,3 @@ mod array_reader; -mod async_message_reader; +mod ipc_reader; mod message_reader; -mod monoio_message_reader; diff --git a/vortex-ipc/src/lending_stream.rs b/vortex-ipc/src/lending_stream.rs new file mode 100644 index 0000000000..5212a8fc4c --- /dev/null +++ b/vortex-ipc/src/lending_stream.rs @@ -0,0 +1,9 @@ +use std::future::Future; + +pub trait LendingStream { + type Item<'next> + where + Self: 'next; + + fn next(&mut self) -> impl Future>>; +} diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index 7e3aed9867..492812a945 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -32,10 +32,9 @@ pub mod flatbuffers { mod array_reader; mod codecs; pub mod iter; +mod lending_stream; mod messages; pub mod reader; -mod reader2; -mod stream; pub mod writer; pub(crate) const fn missing(field: &'static str) -> impl FnOnce() -> VortexError { diff --git a/vortex-ipc/src/reader2.rs b/vortex-ipc/src/reader2.rs deleted file mode 100644 index 4023712fa2..0000000000 --- a/vortex-ipc/src/reader2.rs +++ /dev/null @@ -1,10 +0,0 @@ -use futures_util::Stream; -use vortex::OwnedArray; -use vortex_dtype::DType; -use vortex_error::VortexResult; - -/// A stream of array chunks along with a DType. -pub trait ArrayReader: Stream> { - #[allow(dead_code)] - fn dtype(&self) -> &DType; -} diff --git a/vortex-ipc/src/stream.rs b/vortex-ipc/src/stream.rs deleted file mode 100644 index 39e4254d9f..0000000000 --- a/vortex-ipc/src/stream.rs +++ /dev/null @@ -1,14 +0,0 @@ -use std::future::Future; - -#[must_use = "streams must be polled"] -pub trait Stream { - type Item; - - fn next(&mut self) -> impl Future>; - - /// Returns the bounds on the remaining length of the stream. - #[inline] - fn size_hint(&self) -> (usize, Option) { - (0, None) - } -} From b10eeafc3a4eb25ab0d2b23a750c344ac5a7e76f Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 May 2024 16:45:22 +0100 Subject: [PATCH 05/11] merge --- Cargo.lock | 11 ++ vortex-ipc/Cargo.toml | 1 + vortex-ipc/src/codecs/array_reader.rs | 138 ++++++------------ vortex-ipc/src/codecs/ipc_reader.rs | 68 +++++++++ .../src/codecs/message_reader/futures.rs | 2 +- .../src/codecs/message_reader/monoio.rs | 12 +- vortex-ipc/src/lending_stream.rs | 9 -- vortex-ipc/src/lib.rs | 4 +- 8 files changed, 134 insertions(+), 111 deletions(-) delete mode 100644 vortex-ipc/src/lending_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 52061bbc00..474bdac10c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2764,6 +2764,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "lending-stream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6df9b2ed75ba713c108ef896cf3f4cb07b4bbe42b074de3a2d0b0b0f874bac42" +dependencies = [ + "futures-core", + "pin-project", +] + [[package]] name = "lexical-core" version = "0.8.5" @@ -5182,6 +5192,7 @@ dependencies = [ "flatc", "futures-util", "itertools 0.12.1", + "lending-stream", "log", "monoio", "monoio-codec", diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index cb882e9efa..4d56314db1 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -19,6 +19,7 @@ fallible-iterator = { workspace = true } flatbuffers = { workspace = true } futures-util = { version = "0.3.30" } itertools = { workspace = true } +lending-stream = "1.0.0" log = { workspace = true } monoio = { version = "0.2.3", optional = true, features = ["bytes"] } monoio-codec = { version = "0.3.4", optional = true } diff --git a/vortex-ipc/src/codecs/array_reader.rs b/vortex-ipc/src/codecs/array_reader.rs index daead31fd3..ce27ec7f3b 100644 --- a/vortex-ipc/src/codecs/array_reader.rs +++ b/vortex-ipc/src/codecs/array_reader.rs @@ -3,72 +3,62 @@ use std::task::Poll; use futures_util::Stream; use pin_project::pin_project; -use vortex::{Array, ArrayView, Context, IntoArray, OwnedArray, ToArray, ToStatic, ViewContext}; +use vortex::{Array, ArrayView, IntoArray, OwnedArray, ToArray, ToStatic, ViewContext}; use vortex_buffer::Buffer; use vortex_dtype::DType; -use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; +use vortex_error::{VortexError, VortexResult}; use crate::codecs::message_reader::MessageReader; -use crate::messages::SerdeContextDeserializer; -pub struct IPCReader<'a, M: MessageReader> { - view_ctx: ViewContext, - messages: &'a mut M, +/// A stream of array chunks along with a DType. +/// +/// Can be thought of as equivalent to Arrow's RecordBatchReader. +pub trait ArrayReader: Stream> { + fn dtype(&self) -> &DType; } -#[allow(dead_code)] -impl<'m, M: MessageReader> IPCReader<'m, M> { - pub fn new(view_ctx: ViewContext, messages: &'m mut M) -> Self { - Self { view_ctx, messages } - } - - pub async fn try_from_messages(ctx: &Context, messages: &'m mut M) -> VortexResult { - match messages.peek() { - None => vortex_bail!("IPC stream is empty"), - Some(msg) => { - if msg.header_as_context().is_none() { - vortex_bail!(InvalidSerde: "Expected IPC Context as first message in stream") - } - } - } - - let view_ctx: ViewContext = SerdeContextDeserializer { - fb: messages.next().await?.header_as_context().unwrap(), - ctx, - } - .try_into()?; +/// An adapter for a stream of array chunks to implement an ArrayReader. +#[pin_project] +struct ArrayReaderAdapter { + dtype: DType, + #[pin] + inner: S, +} - Ok(Self { messages, view_ctx }) +impl ArrayReaderAdapter { + pub fn new(dtype: DType, inner: S) -> Self { + Self { dtype, inner } } +} - pub async fn next<'a>(&'a mut self) -> VortexResult>> { - if self - .messages - .peek() - .and_then(|msg| msg.header_as_schema()) - .is_none() - { - return Ok(None); - } +impl ArrayReader for ArrayReaderAdapter +where + S: Stream>, +{ + fn dtype(&self) -> &DType { + &self.dtype + } +} - let schema_msg = self.messages.next().await?.header_as_schema().unwrap(); +impl Stream for ArrayReaderAdapter +where + S: Stream>, +{ + type Item = VortexResult; - let dtype = DType::try_from( - schema_msg - .dtype() - .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?, - ) - .map_err(|e| vortex_err!(InvalidSerde: "Failed to parse DType: {}", e))?; + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_next(cx) + } - Ok(Some(MessageArrayReader::new( - self.view_ctx.clone(), - dtype, - &mut self.messages, - ))) + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() } } -pub struct MessageArrayReader<'a, M: MessageReader> { +pub(crate) struct MessageArrayReader<'a, M: MessageReader> { ctx: ViewContext, dtype: DType, messages: &'a mut M, @@ -93,7 +83,7 @@ impl<'m, M: MessageReader> MessageArrayReader<'m, M> { pub fn into_reader(self) -> impl ArrayReader + 'm { let dtype = self.dtype.clone(); - let inner = futures_util::stream::unfold(self, |mut reader| async move { + let inner = futures_util::stream::unfold(self, move |mut reader| async move { match reader.next().await { Ok(Some(array)) => Some((Ok(array.to_static()), reader)), Ok(None) => None, @@ -101,7 +91,7 @@ impl<'m, M: MessageReader> MessageArrayReader<'m, M> { } }); - ArrayReaderImpl { dtype, inner } + ArrayReaderAdapter { dtype, inner } } } @@ -116,6 +106,7 @@ impl MessageArrayReader<'_, M> { return Ok(None); } + // TODO(ngates): can we reuse our existing buffers? self.buffers = self.messages.buffers().await?; // After reading the buffers we're now able to load the next message. @@ -138,46 +129,3 @@ impl MessageArrayReader<'_, M> { Ok(Some(array)) } } - -/// A stream of array chunks along with a DType. -pub trait ArrayReader: Stream> { - #[allow(dead_code)] - fn dtype(&self) -> &DType; -} - -#[pin_project] -struct ArrayReaderImpl -where - S: Stream>, -{ - dtype: DType, - #[pin] - inner: S, -} - -impl ArrayReader for ArrayReaderImpl -where - S: Stream>, -{ - fn dtype(&self) -> &DType { - &self.dtype - } -} - -impl Stream for ArrayReaderImpl -where - S: Stream>, -{ - type Item = VortexResult; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.project().inner.poll_next(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} diff --git a/vortex-ipc/src/codecs/ipc_reader.rs b/vortex-ipc/src/codecs/ipc_reader.rs index e69de29bb2..2d9055e51a 100644 --- a/vortex-ipc/src/codecs/ipc_reader.rs +++ b/vortex-ipc/src/codecs/ipc_reader.rs @@ -0,0 +1,68 @@ +use pin_project::pin_project; +use vortex::{Context, ViewContext}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +use crate::codecs::array_reader::MessageArrayReader; +use crate::codecs::message_reader::MessageReader; +use crate::messages::SerdeContextDeserializer; + +/// An IPC reader is used to emit arrays from a stream of Vortex IPC messages. +#[pin_project] +pub struct IPCReader<'m, M> { + view_ctx: ViewContext, + messages: &'m mut M, +} + +impl<'m, M: MessageReader> IPCReader<'m, M> { + /// Construct an IPC reader using an existing ViewContext. + pub fn new(view_ctx: ViewContext, messages: &'m mut M) -> Self { + Self { view_ctx, messages } + } + + /// Read a ViewContext message from the stream and use it to construct an IPCReader. + pub async fn try_from_messages(ctx: &Context, messages: &'m mut M) -> VortexResult { + match messages.peek() { + None => vortex_bail!("IPC stream is empty"), + Some(msg) => { + if msg.header_as_context().is_none() { + vortex_bail!(InvalidSerde: "Expected IPC Context as first message in stream") + } + } + } + + let view_ctx: ViewContext = SerdeContextDeserializer { + fb: messages.next().await?.header_as_context().unwrap(), + ctx, + } + .try_into()?; + + Ok(Self { messages, view_ctx }) + } + + pub async fn next<'a>(&'a mut self) -> VortexResult>> { + if self + .messages + .peek() + .and_then(|msg| msg.header_as_schema()) + .is_none() + { + return Ok(None); + } + + let schema_msg = self.messages.next().await?.header_as_schema().unwrap(); + + let dtype = DType::try_from( + schema_msg + .dtype() + .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?, + ) + .map_err(|e| vortex_err!(InvalidSerde: "Failed to parse DType: {}", e))?; + + Ok(Some(MessageArrayReader::new( + self.view_ctx.clone(), + dtype, + &mut self.messages, + ))) + } +} diff --git a/vortex-ipc/src/codecs/message_reader/futures.rs b/vortex-ipc/src/codecs/message_reader/futures.rs index ef9e48f5f0..8728a7352b 100644 --- a/vortex-ipc/src/codecs/message_reader/futures.rs +++ b/vortex-ipc/src/codecs/message_reader/futures.rs @@ -11,7 +11,7 @@ use vortex_buffer::Buffer; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_fastlanes::BitPackedEncoding; -use crate::codecs::array_reader::IPCReader; +use crate::codecs::ipc_reader::IPCReader; use crate::codecs::message_reader::test::create_stream; use crate::codecs::message_reader::MessageReader; use crate::flatbuffers::ipc::Message; diff --git a/vortex-ipc/src/codecs/message_reader/monoio.rs b/vortex-ipc/src/codecs/message_reader/monoio.rs index 2e13d35a0a..9bc6504819 100644 --- a/vortex-ipc/src/codecs/message_reader/monoio.rs +++ b/vortex-ipc/src/codecs/message_reader/monoio.rs @@ -3,6 +3,7 @@ use bytes::BytesMut; use flatbuffers::{root, root_unchecked}; +use futures_util::TryStreamExt; use monoio::buf::IoBufMut; use monoio::io::{AsyncReadRent, AsyncReadRentExt}; use vortex::encoding::EncodingRef; @@ -12,7 +13,7 @@ use vortex_buffer::Buffer; use vortex_error::VortexResult; use vortex_fastlanes::BitPackedEncoding; -use crate::codecs::array_reader::IPCReader; +use crate::codecs::ipc_reader::IPCReader; use crate::codecs::message_reader::test::create_stream; use crate::codecs::message_reader::MessageReader; use crate::flatbuffers::ipc::Message; @@ -158,11 +159,12 @@ async fn test_array_stream() -> VortexResult<()> { let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; - while let Some(mut array) = reader.next().await? { - println!("ARRAY"); + while let Some(array) = reader.next().await? { + let reader = array.into_reader(); + futures_util::pin_mut!(reader); - while let Some(chunk) = array.next().await? { - println!("chunk {:?}", chunk); + while let Some(array) = reader.try_next().await? { + println!("chunk {:?}", array); } } diff --git a/vortex-ipc/src/lending_stream.rs b/vortex-ipc/src/lending_stream.rs deleted file mode 100644 index 5212a8fc4c..0000000000 --- a/vortex-ipc/src/lending_stream.rs +++ /dev/null @@ -1,9 +0,0 @@ -use std::future::Future; - -pub trait LendingStream { - type Item<'next> - where - Self: 'next; - - fn next(&mut self) -> impl Future>>; -} diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index 492812a945..e0eb0682c2 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -1,3 +1,6 @@ +#![feature(impl_trait_in_assoc_type)] +#![allow(dead_code)] + use vortex_error::{vortex_err, VortexError}; pub const ALIGNMENT: usize = 64; @@ -32,7 +35,6 @@ pub mod flatbuffers { mod array_reader; mod codecs; pub mod iter; -mod lending_stream; mod messages; pub mod reader; pub mod writer; From 3d4142bbefba1872008c306a0abfc3e7b63a60a9 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 May 2024 16:53:47 +0100 Subject: [PATCH 06/11] merge --- vortex-ipc/src/array_reader.rs | 41 ------------------- vortex-ipc/src/codecs/ipc_reader.rs | 12 +++--- .../src/codecs/message_reader/futures.rs | 17 ++++---- .../src/codecs/message_reader/monoio.rs | 14 +++---- vortex-ipc/src/lib.rs | 1 - 5 files changed, 21 insertions(+), 64 deletions(-) delete mode 100644 vortex-ipc/src/array_reader.rs diff --git a/vortex-ipc/src/array_reader.rs b/vortex-ipc/src/array_reader.rs deleted file mode 100644 index 74a360324a..0000000000 --- a/vortex-ipc/src/array_reader.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures_util::Stream; -use pin_project::pin_project; -use vortex::OwnedArray; -use vortex_dtype::DType; -use vortex_error::VortexResult; - -/// NOTE: similar to Arrow RecordBatchReader. -pub trait ArrayReader: Stream> { - #[allow(dead_code)] - fn dtype(&self) -> &DType; -} - -/// Wrap a DType with a stream of array chunks to implement an ArrayReader. -#[pin_project] -pub struct ArrayReaderImpl { - dtype: DType, - #[pin] - inner: S, -} - -impl>> ArrayReader for ArrayReaderImpl { - #[inline] - fn dtype(&self) -> &DType { - &self.dtype - } -} - -impl>> Stream for ArrayReaderImpl { - type Item = VortexResult; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} diff --git a/vortex-ipc/src/codecs/ipc_reader.rs b/vortex-ipc/src/codecs/ipc_reader.rs index 2d9055e51a..ffd6b56148 100644 --- a/vortex-ipc/src/codecs/ipc_reader.rs +++ b/vortex-ipc/src/codecs/ipc_reader.rs @@ -3,7 +3,7 @@ use vortex::{Context, ViewContext}; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexResult}; -use crate::codecs::array_reader::MessageArrayReader; +use crate::codecs::array_reader::{ArrayReader, MessageArrayReader}; use crate::codecs::message_reader::MessageReader; use crate::messages::SerdeContextDeserializer; @@ -40,7 +40,7 @@ impl<'m, M: MessageReader> IPCReader<'m, M> { Ok(Self { messages, view_ctx }) } - pub async fn next<'a>(&'a mut self) -> VortexResult>> { + pub async fn next<'a>(&'a mut self) -> VortexResult> { if self .messages .peek() @@ -59,10 +59,8 @@ impl<'m, M: MessageReader> IPCReader<'m, M> { ) .map_err(|e| vortex_err!(InvalidSerde: "Failed to parse DType: {}", e))?; - Ok(Some(MessageArrayReader::new( - self.view_ctx.clone(), - dtype, - &mut self.messages, - ))) + Ok(Some( + MessageArrayReader::new(self.view_ctx.clone(), dtype, self.messages).into_reader(), + )) } } diff --git a/vortex-ipc/src/codecs/message_reader/futures.rs b/vortex-ipc/src/codecs/message_reader/futures.rs index 8728a7352b..eaec82f2a9 100644 --- a/vortex-ipc/src/codecs/message_reader/futures.rs +++ b/vortex-ipc/src/codecs/message_reader/futures.rs @@ -11,6 +11,7 @@ use vortex_buffer::Buffer; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_fastlanes::BitPackedEncoding; +use crate::codecs::array_reader::ArrayReader; use crate::codecs::ipc_reader::IPCReader; use crate::codecs::message_reader::test::create_stream; use crate::codecs::message_reader::MessageReader; @@ -141,10 +142,10 @@ async fn test_something() -> VortexResult<()> { let mut messages = AsyncReadMessageReader::try_new(buffer.as_slice()).await?; let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; - while let Some(mut array) = reader.next().await? { - println!("ARRAY"); - - while let Some(chunk) = array.next().await? { + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY: {}", array.dtype()); + while let Some(chunk) = array.try_next().await? { println!("chunk {:?}", chunk); } } @@ -167,10 +168,10 @@ async fn test_stream() -> VortexResult<()> { let mut messages = AsyncReadMessageReader::try_new(reader).await?; let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; - while let Some(mut array) = reader.next().await? { - println!("ARRAY"); - - while let Some(chunk) = array.next().await? { + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY {}", array.dtype()); + while let Some(chunk) = array.try_next().await? { println!("chunk {:?}", chunk); } } diff --git a/vortex-ipc/src/codecs/message_reader/monoio.rs b/vortex-ipc/src/codecs/message_reader/monoio.rs index 9bc6504819..c66bf777c3 100644 --- a/vortex-ipc/src/codecs/message_reader/monoio.rs +++ b/vortex-ipc/src/codecs/message_reader/monoio.rs @@ -13,6 +13,7 @@ use vortex_buffer::Buffer; use vortex_error::VortexResult; use vortex_fastlanes::BitPackedEncoding; +use crate::codecs::array_reader::ArrayReader; use crate::codecs::ipc_reader::IPCReader; use crate::codecs::message_reader::test::create_stream; use crate::codecs::message_reader::MessageReader; @@ -140,10 +141,11 @@ async fn test_something() -> VortexResult<()> { let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; - while let Some(mut array) = reader.next().await? { - println!("ARRAY"); + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY {}", array.dtype()); - while let Some(chunk) = array.next().await? { + while let Some(chunk) = array.try_next().await? { println!("chunk {:?}", chunk); } } @@ -160,10 +162,8 @@ async fn test_array_stream() -> VortexResult<()> { let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; while let Some(array) = reader.next().await? { - let reader = array.into_reader(); - futures_util::pin_mut!(reader); - - while let Some(array) = reader.try_next().await? { + futures_util::pin_mut!(array); + while let Some(array) = array.try_next().await? { println!("chunk {:?}", array); } } diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index e0eb0682c2..8ddeef9f8b 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -32,7 +32,6 @@ pub mod flatbuffers { } } -mod array_reader; mod codecs; pub mod iter; mod messages; From 9739035cf1c4bbd63b765bf0a99989b1195c8ce1 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 May 2024 17:06:40 +0100 Subject: [PATCH 07/11] merge --- Cargo.lock | 38 -------- Cargo.toml | 3 +- vortex-ipc/Cargo.toml | 17 ++-- .../src/codecs/message_reader/futures.rs | 96 ++++++++++--------- .../src/codecs/message_reader/monoio.rs | 66 +++++++------ 5 files changed, 98 insertions(+), 122 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 474bdac10c..8d95bf5d39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,19 +363,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "834eee9ce518130a3b4d5af09ecc43e9d6b57ee76613f227a1ddd6b77c7a62bc" -[[package]] -name = "asynchronous-codec" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" -dependencies = [ - "bytes", - "futures-sink", - "futures-util", - "memchr", - "pin-project-lite", -] - [[package]] name = "atoi" version = "2.0.0" @@ -2764,16 +2751,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "lending-stream" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6df9b2ed75ba713c108ef896cf3f4cb07b4bbe42b074de3a2d0b0b0f874bac42" -dependencies = [ - "futures-core", - "pin-project", -] - [[package]] name = "lexical-core" version = "0.8.5" @@ -3050,17 +3027,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "monoio-codec" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58c0e52577d2f766a87e7d60df9a635a35aecf37962e801ca07c73666b95b620" -dependencies = [ - "bytes", - "monoio", - "tokio-util", -] - [[package]] name = "monoio-macros" version = "0.1.0" @@ -5184,7 +5150,6 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "asynchronous-codec", "bytes", "criterion", "fallible-iterator", @@ -5192,16 +5157,13 @@ dependencies = [ "flatc", "futures-util", "itertools 0.12.1", - "lending-stream", "log", "monoio", - "monoio-codec", "nougat", "pin-project", "rand", "simplelog", "tokio", - "tokio-util", "vortex-alp", "vortex-array", "vortex-buffer", diff --git a/Cargo.toml b/Cargo.toml index c25e1135ed..38b88ffa01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ flatbuffers = "23.5.26" flatc = "0.2.2" flexbuffers = "2.0.0" fs_extra = "1.3.0" -futures = "0.3.30" +futures-util = "0.3.30" getrandom = "0.2.14" half = { version = "^2", features = ["std", "num-traits"] } hashbrown = "0.14.3" @@ -72,6 +72,7 @@ itertools = "0.12.1" lazy_static = "1.4.0" leb128 = "0.2.5" log = "0.4.21" +monoio = "0.2.3" num-traits = "0.2.18" num_enum = "0.7.2" parquet = "51.0.0" diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index 4d56314db1..a19fb45966 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -13,20 +13,16 @@ rust-version = { workspace = true } [dependencies] arrow-buffer = { workspace = true } -asynchronous-codec = { version = "0.7.0", optional = true } bytes = { workspace = true } fallible-iterator = { workspace = true } flatbuffers = { workspace = true } -futures-util = { version = "0.3.30" } +futures-util = { workspace = true, features = ["io"] } itertools = { workspace = true } -lending-stream = "1.0.0" log = { workspace = true } -monoio = { version = "0.2.3", optional = true, features = ["bytes"] } -monoio-codec = { version = "0.3.4", optional = true } +monoio = { workspace = true, optional = true, features = ["bytes"] } nougat = "0.2.4" pin-project = { workspace = true } -tokio = { version = "1.37.0", optional = true, features = ["full"] } -tokio-util = { version = "0.7.11", default-features = false, features = ["codec", "io-util"], optional = true } +tokio = { workspace = true, optional = true } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } vortex-error = { path = "../vortex-error" } @@ -42,6 +38,7 @@ walkdir = { workspace = true } criterion = { workspace = true } rand = { workspace = true } simplelog = { workspace = true } +tokio = { workspace = true, features = ["full"] } vortex-alp = { path = "../vortex-alp" } vortex-fastlanes = { path = "../vortex-fastlanes" } arrow = { workspace = true } @@ -55,9 +52,9 @@ arrow-select = { workspace = true } [features] default = ["futures", "monoio", "tokio"] -futures = ["dep:asynchronous-codec"] -monoio = ["dep:monoio", "dep:monoio-codec"] -tokio = ["dep:tokio", "dep:tokio-util"] +futures = [] +monoio = ["dep:monoio"] +tokio = ["dep:tokio"] [[bench]] name = "ipc_take" diff --git a/vortex-ipc/src/codecs/message_reader/futures.rs b/vortex-ipc/src/codecs/message_reader/futures.rs index eaec82f2a9..cc400c6de6 100644 --- a/vortex-ipc/src/codecs/message_reader/futures.rs +++ b/vortex-ipc/src/codecs/message_reader/futures.rs @@ -4,16 +4,10 @@ use std::io; use bytes::{Bytes, BytesMut}; use flatbuffers::{root, root_unchecked}; use futures_util::{AsyncRead, AsyncReadExt, TryStreamExt}; -use vortex::encoding::EncodingRef; -use vortex::Context; -use vortex_alp::ALPEncoding; use vortex_buffer::Buffer; use vortex_error::{vortex_bail, vortex_err, VortexResult}; -use vortex_fastlanes::BitPackedEncoding; use crate::codecs::array_reader::ArrayReader; -use crate::codecs::ipc_reader::IPCReader; -use crate::codecs::message_reader::test::create_stream; use crate::codecs::message_reader::MessageReader; use crate::flatbuffers::ipc::Message; @@ -134,47 +128,61 @@ impl MessageReader for AsyncReadMessageReader { } } -#[tokio::test] -async fn test_something() -> VortexResult<()> { - let buffer = create_stream(); - - let ctx = Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); - let mut messages = AsyncReadMessageReader::try_new(buffer.as_slice()).await?; - - let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; - while let Some(array) = reader.next().await? { - futures_util::pin_mut!(array); - println!("ARRAY: {}", array.dtype()); - while let Some(chunk) = array.try_next().await? { - println!("chunk {:?}", chunk); +#[cfg(test)] +mod tests { + use vortex::encoding::EncodingRef; + use vortex::Context; + use vortex_alp::ALPEncoding; + use vortex_fastlanes::BitPackedEncoding; + + use super::*; + use crate::codecs::ipc_reader::IPCReader; + use crate::codecs::message_reader::test::create_stream; + + #[tokio::test] + async fn test_something() -> VortexResult<()> { + let buffer = create_stream(); + + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = AsyncReadMessageReader::try_new(buffer.as_slice()).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY: {}", array.dtype()); + while let Some(chunk) = array.try_next().await? { + println!("chunk {:?}", chunk); + } } - } - Ok(()) -} + Ok(()) + } -#[tokio::test] -async fn test_stream() -> VortexResult<()> { - let buffer = create_stream(); - - let stream = futures_util::stream::iter( - buffer - .chunks(64) - .map(|chunk| Ok(Bytes::from(chunk.to_vec()))), - ); - let reader = stream.into_async_read(); - - let ctx = Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); - let mut messages = AsyncReadMessageReader::try_new(reader).await?; - - let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; - while let Some(array) = reader.next().await? { - futures_util::pin_mut!(array); - println!("ARRAY {}", array.dtype()); - while let Some(chunk) = array.try_next().await? { - println!("chunk {:?}", chunk); + #[tokio::test] + async fn test_stream() -> VortexResult<()> { + let buffer = create_stream(); + + let stream = futures_util::stream::iter( + buffer + .chunks(64) + .map(|chunk| Ok(Bytes::from(chunk.to_vec()))), + ); + let reader = stream.into_async_read(); + + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = AsyncReadMessageReader::try_new(reader).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY {}", array.dtype()); + while let Some(chunk) = array.try_next().await? { + println!("chunk {:?}", chunk); + } } - } - Ok(()) + Ok(()) + } } diff --git a/vortex-ipc/src/codecs/message_reader/monoio.rs b/vortex-ipc/src/codecs/message_reader/monoio.rs index c66bf777c3..49e4b24fbb 100644 --- a/vortex-ipc/src/codecs/message_reader/monoio.rs +++ b/vortex-ipc/src/codecs/message_reader/monoio.rs @@ -8,14 +8,11 @@ use monoio::buf::IoBufMut; use monoio::io::{AsyncReadRent, AsyncReadRentExt}; use vortex::encoding::EncodingRef; use vortex::Context; -use vortex_alp::ALPEncoding; use vortex_buffer::Buffer; use vortex_error::VortexResult; -use vortex_fastlanes::BitPackedEncoding; use crate::codecs::array_reader::ArrayReader; use crate::codecs::ipc_reader::IPCReader; -use crate::codecs::message_reader::test::create_stream; use crate::codecs::message_reader::MessageReader; use crate::flatbuffers::ipc::Message; @@ -133,40 +130,51 @@ trait AsyncReadRentMoreExt: AsyncReadRentExt { impl AsyncReadRentMoreExt for R {} -#[monoio::test] -async fn test_something() -> VortexResult<()> { - let buffer = create_stream(); +#[cfg(test)] +mod tests { + use vortex_alp::ALPEncoding; + use vortex_fastlanes::BitPackedEncoding; - let ctx = Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); - let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; + use super::*; + use crate::codecs::message_reader::test::create_stream; - let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; - while let Some(array) = reader.next().await? { - futures_util::pin_mut!(array); - println!("ARRAY {}", array.dtype()); + #[monoio::test] + async fn test_something() -> VortexResult<()> { + let buffer = create_stream(); - while let Some(chunk) = array.try_next().await? { - println!("chunk {:?}", chunk); + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY {}", array.dtype()); + + while let Some(chunk) = array.try_next().await? { + println!("chunk {:?}", chunk); + } } - } - Ok(()) -} + Ok(()) + } -#[monoio::test] -async fn test_array_stream() -> VortexResult<()> { - let buffer = create_stream(); + #[monoio::test] + async fn test_array_stream() -> VortexResult<()> { + let buffer = create_stream(); - let ctx = Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); - let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; - let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; - while let Some(array) = reader.next().await? { - futures_util::pin_mut!(array); - while let Some(array) = array.try_next().await? { - println!("chunk {:?}", array); + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + while let Some(array) = array.try_next().await? { + println!("chunk {:?}", array); + } } - } - Ok(()) + Ok(()) + } } From 20701965d3d0bb0edd602b26ea9c9d66c51820c7 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 May 2024 17:09:42 +0100 Subject: [PATCH 08/11] merge --- vortex-ipc/Cargo.toml | 2 +- vortex-ipc/src/codecs/message_reader/futures.rs | 8 +++++--- vortex-ipc/src/codecs/message_reader/monoio.rs | 10 +++++----- vortex-ipc/src/lib.rs | 1 - 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index a19fb45966..9653f3e426 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -48,7 +48,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } [lints] -# workspace = true +workspace = true [features] default = ["futures", "monoio", "tokio"] diff --git a/vortex-ipc/src/codecs/message_reader/futures.rs b/vortex-ipc/src/codecs/message_reader/futures.rs index cc400c6de6..eb23527561 100644 --- a/vortex-ipc/src/codecs/message_reader/futures.rs +++ b/vortex-ipc/src/codecs/message_reader/futures.rs @@ -1,13 +1,12 @@ #![cfg(feature = "futures")] use std::io; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use flatbuffers::{root, root_unchecked}; -use futures_util::{AsyncRead, AsyncReadExt, TryStreamExt}; +use futures_util::{AsyncRead, AsyncReadExt}; use vortex_buffer::Buffer; use vortex_error::{vortex_bail, vortex_err, VortexResult}; -use crate::codecs::array_reader::ArrayReader; use crate::codecs::message_reader::MessageReader; use crate::flatbuffers::ipc::Message; @@ -130,12 +129,15 @@ impl MessageReader for AsyncReadMessageReader { #[cfg(test)] mod tests { + use bytes::Bytes; + use futures_util::TryStreamExt; use vortex::encoding::EncodingRef; use vortex::Context; use vortex_alp::ALPEncoding; use vortex_fastlanes::BitPackedEncoding; use super::*; + use crate::codecs::array_reader::ArrayReader; use crate::codecs::ipc_reader::IPCReader; use crate::codecs::message_reader::test::create_stream; diff --git a/vortex-ipc/src/codecs/message_reader/monoio.rs b/vortex-ipc/src/codecs/message_reader/monoio.rs index 49e4b24fbb..99a47bf040 100644 --- a/vortex-ipc/src/codecs/message_reader/monoio.rs +++ b/vortex-ipc/src/codecs/message_reader/monoio.rs @@ -3,16 +3,11 @@ use bytes::BytesMut; use flatbuffers::{root, root_unchecked}; -use futures_util::TryStreamExt; use monoio::buf::IoBufMut; use monoio::io::{AsyncReadRent, AsyncReadRentExt}; -use vortex::encoding::EncodingRef; -use vortex::Context; use vortex_buffer::Buffer; use vortex_error::VortexResult; -use crate::codecs::array_reader::ArrayReader; -use crate::codecs::ipc_reader::IPCReader; use crate::codecs::message_reader::MessageReader; use crate::flatbuffers::ipc::Message; @@ -132,10 +127,15 @@ impl AsyncReadRentMoreExt for R {} #[cfg(test)] mod tests { + use futures_util::TryStreamExt; + use vortex::encoding::EncodingRef; + use vortex::Context; use vortex_alp::ALPEncoding; use vortex_fastlanes::BitPackedEncoding; use super::*; + use crate::codecs::array_reader::ArrayReader; + use crate::codecs::ipc_reader::IPCReader; use crate::codecs::message_reader::test::create_stream; #[monoio::test] diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index 8ddeef9f8b..7b39c96fa8 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] #![allow(dead_code)] use vortex_error::{vortex_err, VortexError}; From ce5cc556d8790a371d054118248bc43cc1e00137 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 May 2024 17:17:18 +0100 Subject: [PATCH 09/11] merge --- vortex-ipc/src/messages.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index a0e70daf24..80b34f333e 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -10,16 +10,19 @@ use crate::flatbuffers::ipc as fb; use crate::flatbuffers::ipc::Compression; use crate::{missing, ALIGNMENT}; -pub enum IPCMessage<'a> { +pub(crate) enum IPCMessage<'a> { Context(IPCContext<'a>), Schema(IPCSchema<'a>), Chunk(IPCChunk<'a>), } -pub struct IPCContext<'a>(pub &'a ViewContext); -pub struct IPCSchema<'a>(pub &'a DType); -pub struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData); -pub struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData); +pub(crate) struct IPCContext<'a>(pub &'a ViewContext); + +pub(crate) struct IPCSchema<'a>(pub &'a DType); + +pub(crate) struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData); + +pub(crate) struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData); impl FlatBufferRoot for IPCMessage<'_> {} From a96c14d312f62cf40607ede5cbe7787974e2768a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Sun, 12 May 2024 11:59:44 +0100 Subject: [PATCH 10/11] Remove skip, use read_vectored --- .../src/codecs/message_reader/futures.rs | 45 ++----------- vortex-ipc/src/codecs/message_reader/mod.rs | 63 ++++++++++++++++++- .../src/codecs/message_reader/monoio.rs | 46 ++++---------- 3 files changed, 81 insertions(+), 73 deletions(-) diff --git a/vortex-ipc/src/codecs/message_reader/futures.rs b/vortex-ipc/src/codecs/message_reader/futures.rs index eb23527561..15d800cf38 100644 --- a/vortex-ipc/src/codecs/message_reader/futures.rs +++ b/vortex-ipc/src/codecs/message_reader/futures.rs @@ -1,10 +1,10 @@ #![cfg(feature = "futures")] + use std::io; use bytes::BytesMut; use flatbuffers::{root, root_unchecked}; use futures_util::{AsyncRead, AsyncReadExt}; -use vortex_buffer::Buffer; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::codecs::message_reader::MessageReader; @@ -64,19 +64,6 @@ impl AsyncReadMessageReader { } } -trait AsyncReadMoreExt: AsyncReadExt { - async fn skip(&mut self, nbytes: u64) -> VortexResult<()> - where - Self: Unpin, - { - // TODO(ngates): can we grab dev/null? At the very least we should do this in small buffers. - let mut bytes = BytesMut::zeroed(nbytes as usize); - self.read_exact(bytes.as_mut()).await?; - Ok(()) - } -} -impl AsyncReadMoreExt for R {} - impl MessageReader for AsyncReadMessageReader { fn peek(&self) -> Option { if self.finished { @@ -97,32 +84,12 @@ impl MessageReader for AsyncReadMessageReader { Ok(unsafe { root_unchecked::(&self.prev_message) }) } - async fn buffers(&mut self) -> VortexResult> { - let Some(chunk_msg) = unsafe { root_unchecked::(&self.message) }.header_as_chunk() - else { - // We could return an error here? - return Ok(Vec::new()); - }; - - // Read all the column's buffers - let mut offset = 0; - let mut buffers = Vec::with_capacity(chunk_msg.buffers().unwrap_or_default().len()); - for buffer in chunk_msg.buffers().unwrap_or_default().iter() { - let _skip = buffer.offset() - offset; - self.read.skip(buffer.offset() - offset).await?; - - // TODO(ngates): read into a single buffer, then Arc::clone and slice - let mut bytes = BytesMut::zeroed(buffer.length() as usize); - self.read.read_exact(bytes.as_mut()).await?; - buffers.push(Buffer::from(bytes.freeze())); - - offset = buffer.offset() + buffer.length(); + async fn read_into(&mut self, mut buffers: Vec>) -> VortexResult>> { + // TODO(ngates): there is no read_vectored_exact for AsyncRead, so for now we'll + // just read one-by-one + for buffer in buffers.iter_mut() { + self.read.read_exact(buffer).await?; } - - // Consume any remaining padding after the final buffer. - let _buffer_size = chunk_msg.buffer_size(); - self.read.skip(chunk_msg.buffer_size() - offset).await?; - Ok(buffers) } } diff --git a/vortex-ipc/src/codecs/message_reader/mod.rs b/vortex-ipc/src/codecs/message_reader/mod.rs index aacee2cbfe..7dedbc9632 100644 --- a/vortex-ipc/src/codecs/message_reader/mod.rs +++ b/vortex-ipc/src/codecs/message_reader/mod.rs @@ -1,7 +1,11 @@ use std::future::Future; +use itertools::Itertools; use vortex_buffer::Buffer; use vortex_error::VortexResult; + +use crate::ALIGNMENT; + mod futures; mod monoio; use crate::flatbuffers::ipc::Message; @@ -10,8 +14,65 @@ use crate::flatbuffers::ipc::Message; pub trait MessageReader { fn peek(&self) -> Option; fn next(&mut self) -> impl Future>; + fn read_into( + &mut self, + buffers: Vec>, + ) -> impl Future>>>; + /// Fetch the buffers associated with this message. - fn buffers(&mut self) -> impl Future>>; + async fn buffers(&mut self) -> VortexResult> { + let Some(chunk_msg) = self.peek().and_then(|m| m.header_as_chunk()) else { + // We could return an error here? + return Ok(Vec::new()); + }; + + // Initialize the column's buffers for a vectored read. + // To start with, we include the padding and then truncate the buffers after. + // TODO(ngates): improve the flatbuffer format instead of storing offset/len per buffer. + let buffers = chunk_msg + .buffers() + .unwrap_or_default() + .iter() + .map(|buffer| { + // FIXME(ngates): this assumes the next buffer offset == the aligned length of + // the previous buffer. I will fix this by improving the flatbuffer format instead + // of fiddling with the logic here. + let len_width_padding = + (buffer.length() as usize + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); + // TODO(ngates): switch to use uninitialized + vec![0u8; len_width_padding] + }) + .collect_vec(); + + // Just sanity check the above + assert_eq!( + buffers.iter().map(|b| b.len()).sum::(), + chunk_msg.buffer_size() as usize + ); + + // Issue a vectored read to fill all buffers + let buffers: Vec> = self.read_into(buffers).await?; + + // Truncate each buffer to strip the padding. + let buffers = buffers + .into_iter() + .zip( + self.peek() + .unwrap() + .header_as_chunk() + .unwrap() + .buffers() + .unwrap_or_default() + .iter(), + ) + .map(|(mut vec, buf)| { + vec.truncate(buf.length() as usize); + Buffer::from(vec) + }) + .collect_vec(); + + Ok(buffers) + } } #[cfg(test)] diff --git a/vortex-ipc/src/codecs/message_reader/monoio.rs b/vortex-ipc/src/codecs/message_reader/monoio.rs index 99a47bf040..c911447d08 100644 --- a/vortex-ipc/src/codecs/message_reader/monoio.rs +++ b/vortex-ipc/src/codecs/message_reader/monoio.rs @@ -3,9 +3,8 @@ use bytes::BytesMut; use flatbuffers::{root, root_unchecked}; -use monoio::buf::IoBufMut; +use monoio::buf::{IoBufMut, IoVecBufMut, VecBuf}; use monoio::io::{AsyncReadRent, AsyncReadRentExt}; -use vortex_buffer::Buffer; use vortex_error::VortexResult; use crate::codecs::message_reader::MessageReader; @@ -77,32 +76,12 @@ impl MessageReader for MonoIoMessageReader { Ok(unsafe { root_unchecked::(&self.prev_message) }) } - async fn buffers(&mut self) -> VortexResult> { - let Some(chunk_msg) = unsafe { root_unchecked::(&self.message) }.header_as_chunk() - else { - // We could return an error here? - return Ok(Vec::new()); - }; - - // Read all the column's buffers - let mut offset = 0; - let mut buffers = Vec::with_capacity(chunk_msg.buffers().unwrap_or_default().len()); - for buffer in chunk_msg.buffers().unwrap_or_default().iter() { - let _skip = buffer.offset() - offset; - self.read.skip(buffer.offset() - offset).await?; - - // TODO(ngates): read into a single buffer, then Arc::clone and slice - let bytes = BytesMut::zeroed(buffer.length() as usize); - let bytes = self.read.read_exact_into(bytes).await?; - buffers.push(Buffer::from(bytes.freeze())); - - offset = buffer.offset() + buffer.length(); - } - - // Consume any remaining padding after the final buffer. - self.read.skip(chunk_msg.buffer_size() - offset).await?; - - Ok(buffers) + async fn read_into(&mut self, buffers: Vec>) -> VortexResult>> { + Ok(self + .read + .readv_exact_into(VecBuf::from(buffers)) + .await? + .into()) } } @@ -115,11 +94,12 @@ trait AsyncReadRentMoreExt: AsyncReadRentExt { } } - async fn skip(&mut self, nbytes: u64) -> VortexResult<()> { - let _ = self - .read_exact_into(BytesMut::zeroed(nbytes as usize)) - .await?; - Ok(()) + /// Same as read_vectored_exact except unwraps the BufResult into a regular IO result. + async fn readv_exact_into(&mut self, buf: B) -> std::io::Result { + match self.read_vectored_exact(buf).await { + (Ok(_), buf) => Ok(buf), + (Err(e), _) => Err(e), + } } } From f9c29852944ef8233039b658ba70c920e3363777 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Sun, 12 May 2024 12:08:38 +0100 Subject: [PATCH 11/11] Remove skip, use read_vectored --- vortex-ipc/src/codecs/array_reader.rs | 6 - vortex-ipc/src/codecs/message_reader/mod.rs | 111 +++++++++--------- .../src/codecs/message_reader/monoio.rs | 3 +- vortex-ipc/src/codecs/mod.rs | 4 + vortex-ipc/src/lib.rs | 4 +- vortex-ipc/src/reader.rs | 1 - vortex-ipc/src/writer.rs | 1 - 7 files changed, 62 insertions(+), 68 deletions(-) diff --git a/vortex-ipc/src/codecs/array_reader.rs b/vortex-ipc/src/codecs/array_reader.rs index ce27ec7f3b..e328fdefc4 100644 --- a/vortex-ipc/src/codecs/array_reader.rs +++ b/vortex-ipc/src/codecs/array_reader.rs @@ -25,12 +25,6 @@ struct ArrayReaderAdapter { inner: S, } -impl ArrayReaderAdapter { - pub fn new(dtype: DType, inner: S) -> Self { - Self { dtype, inner } - } -} - impl ArrayReader for ArrayReaderAdapter where S: Stream>, diff --git a/vortex-ipc/src/codecs/message_reader/mod.rs b/vortex-ipc/src/codecs/message_reader/mod.rs index 7dedbc9632..67909748d4 100644 --- a/vortex-ipc/src/codecs/message_reader/mod.rs +++ b/vortex-ipc/src/codecs/message_reader/mod.rs @@ -6,11 +6,10 @@ use vortex_error::VortexResult; use crate::ALIGNMENT; -mod futures; -mod monoio; +pub mod futures; +pub mod monoio; use crate::flatbuffers::ipc::Message; -#[allow(dead_code)] pub trait MessageReader { fn peek(&self) -> Option; fn next(&mut self) -> impl Future>; @@ -20,58 +19,60 @@ pub trait MessageReader { ) -> impl Future>>>; /// Fetch the buffers associated with this message. - async fn buffers(&mut self) -> VortexResult> { - let Some(chunk_msg) = self.peek().and_then(|m| m.header_as_chunk()) else { - // We could return an error here? - return Ok(Vec::new()); - }; - - // Initialize the column's buffers for a vectored read. - // To start with, we include the padding and then truncate the buffers after. - // TODO(ngates): improve the flatbuffer format instead of storing offset/len per buffer. - let buffers = chunk_msg - .buffers() - .unwrap_or_default() - .iter() - .map(|buffer| { - // FIXME(ngates): this assumes the next buffer offset == the aligned length of - // the previous buffer. I will fix this by improving the flatbuffer format instead - // of fiddling with the logic here. - let len_width_padding = - (buffer.length() as usize + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); - // TODO(ngates): switch to use uninitialized - vec![0u8; len_width_padding] - }) - .collect_vec(); - - // Just sanity check the above - assert_eq!( - buffers.iter().map(|b| b.len()).sum::(), - chunk_msg.buffer_size() as usize - ); - - // Issue a vectored read to fill all buffers - let buffers: Vec> = self.read_into(buffers).await?; - - // Truncate each buffer to strip the padding. - let buffers = buffers - .into_iter() - .zip( - self.peek() - .unwrap() - .header_as_chunk() - .unwrap() - .buffers() - .unwrap_or_default() - .iter(), - ) - .map(|(mut vec, buf)| { - vec.truncate(buf.length() as usize); - Buffer::from(vec) - }) - .collect_vec(); - - Ok(buffers) + fn buffers(&mut self) -> impl Future>> { + async { + let Some(chunk_msg) = self.peek().and_then(|m| m.header_as_chunk()) else { + // We could return an error here? + return Ok(Vec::new()); + }; + + // Initialize the column's buffers for a vectored read. + // To start with, we include the padding and then truncate the buffers after. + // TODO(ngates): improve the flatbuffer format instead of storing offset/len per buffer. + let buffers = chunk_msg + .buffers() + .unwrap_or_default() + .iter() + .map(|buffer| { + // FIXME(ngates): this assumes the next buffer offset == the aligned length of + // the previous buffer. I will fix this by improving the flatbuffer format instead + // of fiddling with the logic here. + let len_width_padding = + (buffer.length() as usize + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); + // TODO(ngates): switch to use uninitialized + vec![0u8; len_width_padding] + }) + .collect_vec(); + + // Just sanity check the above + assert_eq!( + buffers.iter().map(|b| b.len()).sum::(), + chunk_msg.buffer_size() as usize + ); + + // Issue a vectored read to fill all buffers + let buffers: Vec> = self.read_into(buffers).await?; + + // Truncate each buffer to strip the padding. + let buffers = buffers + .into_iter() + .zip( + self.peek() + .unwrap() + .header_as_chunk() + .unwrap() + .buffers() + .unwrap_or_default() + .iter(), + ) + .map(|(mut vec, buf)| { + vec.truncate(buf.length() as usize); + Buffer::from(vec) + }) + .collect_vec(); + + Ok(buffers) + } } } diff --git a/vortex-ipc/src/codecs/message_reader/monoio.rs b/vortex-ipc/src/codecs/message_reader/monoio.rs index c911447d08..8338d143d3 100644 --- a/vortex-ipc/src/codecs/message_reader/monoio.rs +++ b/vortex-ipc/src/codecs/message_reader/monoio.rs @@ -1,5 +1,4 @@ #![cfg(feature = "monoio")] -#![allow(dead_code)] use bytes::BytesMut; use flatbuffers::{root, root_unchecked}; @@ -10,7 +9,7 @@ use vortex_error::VortexResult; use crate::codecs::message_reader::MessageReader; use crate::flatbuffers::ipc::Message; -struct MonoIoMessageReader { +pub struct MonoIoMessageReader { // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. read: R, message: BytesMut, diff --git a/vortex-ipc/src/codecs/mod.rs b/vortex-ipc/src/codecs/mod.rs index f3ef693ef4..2a26f114f7 100644 --- a/vortex-ipc/src/codecs/mod.rs +++ b/vortex-ipc/src/codecs/mod.rs @@ -1,3 +1,7 @@ mod array_reader; mod ipc_reader; mod message_reader; + +pub use array_reader::ArrayReader; +pub use ipc_reader::IPCReader; +pub use message_reader::*; diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index 7b39c96fa8..2702d57d4b 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use vortex_error::{vortex_err, VortexError}; pub const ALIGNMENT: usize = 64; @@ -31,7 +29,7 @@ pub mod flatbuffers { } } -mod codecs; +pub mod codecs; pub mod iter; mod messages; pub mod reader; diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index 6418254cef..5642a47e59 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -121,7 +121,6 @@ impl FallibleLendingIterator for StreamReader { } } -#[allow(dead_code)] pub struct StreamArrayReader<'a, R: Read> { ctx: &'a ViewContext, read: &'a mut R, diff --git a/vortex-ipc/src/writer.rs b/vortex-ipc/src/writer.rs index 9bfbdb78f4..cf1dd328c1 100644 --- a/vortex-ipc/src/writer.rs +++ b/vortex-ipc/src/writer.rs @@ -10,7 +10,6 @@ use vortex_flatbuffers::FlatBufferWriter; use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCSchema}; use crate::ALIGNMENT; -#[allow(dead_code)] pub struct StreamWriter { write: W, ctx: ViewContext,