diff --git a/Cargo.lock b/Cargo.lock index ab7ec6131f..f145e032c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -372,6 +372,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "auto-const-array" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f7df18977a1ee03650ee4b31b4aefed6d56bac188760b6e37610400fe8d4bb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "autocfg" version = "1.3.0" @@ -1903,6 +1914,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2285,6 +2305,16 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -2895,6 +2925,15 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.9.1" @@ -2932,6 +2971,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] @@ -2967,6 +3007,37 @@ dependencies = [ "uuid", ] +[[package]] +name = "monoio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60824f1a372fa200937885a2d434610333fe47d6c6f3d0855bd4555b46316fe7" +dependencies = [ + "auto-const-array", + "bytes", + "fxhash", + "io-uring", + "libc", + "memchr", + "mio", + "monoio-macros", + "nix", + "pin-project-lite", + "socket2", + "windows-sys 0.48.0", +] + +[[package]] +name = "monoio-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "multimap" version = "0.10.0" @@ -2991,6 +3062,19 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", + "memoffset 0.7.1", + "pin-utils", +] + [[package]] name = "nom" version = "7.1.3" @@ -3627,7 +3711,7 @@ dependencies = [ "cfg-if", "indoc", "libc", - "memoffset", + "memoffset 0.9.1", "parking_lot", "portable-atomic", "pyo3-build-config", @@ -4260,6 +4344,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "simplelog" version = "0.12.2" @@ -4626,7 +4719,9 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", @@ -5054,15 +5149,20 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", + "bytes", "criterion", "fallible-iterator", "flatbuffers", "flatc", + "futures-util", "itertools 0.12.1", "log", + "monoio", "nougat", + "pin-project", "rand", "simplelog", + "tokio", "vortex-alp", "vortex-array", "vortex-buffer", diff --git a/Cargo.toml b/Cargo.toml index bf92f3ed1b..38b88ffa01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ flatbuffers = "23.5.26" flatc = "0.2.2" flexbuffers = "2.0.0" fs_extra = "1.3.0" +futures-util = "0.3.30" getrandom = "0.2.14" half = { version = "^2", features = ["std", "num-traits"] } hashbrown = "0.14.3" @@ -71,10 +72,12 @@ itertools = "0.12.1" lazy_static = "1.4.0" leb128 = "0.2.5" log = "0.4.21" +monoio = "0.2.3" num-traits = "0.2.18" num_enum = "0.7.2" parquet = "51.0.0" paste = "1.0.14" +pin-project = "1.1.5" prost = "0.12.4" prost-build = "0.12.4" prost-types = "0.12.4" diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 96d4fbb036..676047e062 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -230,7 +230,7 @@ impl<'v> IntoArray<'v> for ArrayView<'v> { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ViewContext { encodings: Vec, } diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index 57fe4536fb..efe72248aa 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -84,6 +84,12 @@ impl From> for Buffer { } } +impl From for Buffer { + fn from(value: bytes::Bytes) -> Self { + Buffer::Bytes(value) + } +} + impl From for Buffer { fn from(value: ArrowBuffer) -> Self { Buffer::Arrow(value) diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index a7782dae80..9653f3e426 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -13,11 +13,16 @@ rust-version = { workspace = true } [dependencies] arrow-buffer = { workspace = true } +bytes = { workspace = true } fallible-iterator = { workspace = true } flatbuffers = { workspace = true } +futures-util = { workspace = true, features = ["io"] } itertools = { workspace = true } log = { workspace = true } +monoio = { workspace = true, optional = true, features = ["bytes"] } nougat = "0.2.4" +pin-project = { workspace = true } +tokio = { workspace = true, optional = true } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } vortex-error = { path = "../vortex-error" } @@ -33,6 +38,7 @@ walkdir = { workspace = true } criterion = { workspace = true } rand = { workspace = true } simplelog = { workspace = true } +tokio = { workspace = true, features = ["full"] } vortex-alp = { path = "../vortex-alp" } vortex-fastlanes = { path = "../vortex-fastlanes" } arrow = { workspace = true } @@ -44,6 +50,12 @@ arrow-select = { workspace = true } [lints] workspace = true +[features] +default = ["futures", "monoio", "tokio"] +futures = [] +monoio = ["dep:monoio"] +tokio = ["dep:tokio"] + [[bench]] name = "ipc_take" harness = false diff --git a/vortex-ipc/src/codecs/array_reader.rs b/vortex-ipc/src/codecs/array_reader.rs new file mode 100644 index 0000000000..e328fdefc4 --- /dev/null +++ b/vortex-ipc/src/codecs/array_reader.rs @@ -0,0 +1,125 @@ +use std::pin::Pin; +use std::task::Poll; + +use futures_util::Stream; +use pin_project::pin_project; +use vortex::{Array, ArrayView, IntoArray, OwnedArray, ToArray, ToStatic, ViewContext}; +use vortex_buffer::Buffer; +use vortex_dtype::DType; +use vortex_error::{VortexError, VortexResult}; + +use crate::codecs::message_reader::MessageReader; + +/// A stream of array chunks along with a DType. +/// +/// Can be thought of as equivalent to Arrow's RecordBatchReader. +pub trait ArrayReader: Stream> { + fn dtype(&self) -> &DType; +} + +/// An adapter for a stream of array chunks to implement an ArrayReader. +#[pin_project] +struct ArrayReaderAdapter { + dtype: DType, + #[pin] + inner: S, +} + +impl ArrayReader for ArrayReaderAdapter +where + S: Stream>, +{ + fn dtype(&self) -> &DType { + &self.dtype + } +} + +impl Stream for ArrayReaderAdapter +where + S: Stream>, +{ + type Item = VortexResult; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +pub(crate) struct MessageArrayReader<'a, M: MessageReader> { + ctx: ViewContext, + dtype: DType, + messages: &'a mut M, + + // State + buffers: Vec, + row_offset: usize, +} + +impl<'m, M: MessageReader> MessageArrayReader<'m, M> { + /// Construct an ArrayReader with a message stream containing chunk messages. + pub fn new(ctx: ViewContext, dtype: DType, messages: &'m mut M) -> Self { + Self { + ctx, + dtype, + messages, + buffers: Vec::new(), + row_offset: 0, + } + } + + pub fn into_reader(self) -> impl ArrayReader + 'm { + let dtype = self.dtype.clone(); + + let inner = futures_util::stream::unfold(self, move |mut reader| async move { + match reader.next().await { + Ok(Some(array)) => Some((Ok(array.to_static()), reader)), + Ok(None) => None, + Err(e) => Some((Err(e), reader)), + } + }); + + ArrayReaderAdapter { dtype, inner } + } +} + +impl MessageArrayReader<'_, M> { + pub async fn next(&mut self) -> VortexResult> { + if self + .messages + .peek() + .and_then(|msg| msg.header_as_chunk()) + .is_none() + { + return Ok(None); + } + + // TODO(ngates): can we reuse our existing buffers? + self.buffers = self.messages.buffers().await?; + + // After reading the buffers we're now able to load the next message. + let col_array = self + .messages + .next() + .await? + .header_as_chunk() + .unwrap() + .array() + .unwrap(); + + let view = ArrayView::try_new(&self.ctx, &self.dtype, col_array, self.buffers.as_slice())?; + + // Validate it + view.to_array().with_dyn(|_| Ok::<(), VortexError>(()))?; + + let array = view.into_array(); + self.row_offset += array.len(); + Ok(Some(array)) + } +} diff --git a/vortex-ipc/src/codecs/ipc_reader.rs b/vortex-ipc/src/codecs/ipc_reader.rs new file mode 100644 index 0000000000..ffd6b56148 --- /dev/null +++ b/vortex-ipc/src/codecs/ipc_reader.rs @@ -0,0 +1,66 @@ +use pin_project::pin_project; +use vortex::{Context, ViewContext}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +use crate::codecs::array_reader::{ArrayReader, MessageArrayReader}; +use crate::codecs::message_reader::MessageReader; +use crate::messages::SerdeContextDeserializer; + +/// An IPC reader is used to emit arrays from a stream of Vortex IPC messages. +#[pin_project] +pub struct IPCReader<'m, M> { + view_ctx: ViewContext, + messages: &'m mut M, +} + +impl<'m, M: MessageReader> IPCReader<'m, M> { + /// Construct an IPC reader using an existing ViewContext. + pub fn new(view_ctx: ViewContext, messages: &'m mut M) -> Self { + Self { view_ctx, messages } + } + + /// Read a ViewContext message from the stream and use it to construct an IPCReader. + pub async fn try_from_messages(ctx: &Context, messages: &'m mut M) -> VortexResult { + match messages.peek() { + None => vortex_bail!("IPC stream is empty"), + Some(msg) => { + if msg.header_as_context().is_none() { + vortex_bail!(InvalidSerde: "Expected IPC Context as first message in stream") + } + } + } + + let view_ctx: ViewContext = SerdeContextDeserializer { + fb: messages.next().await?.header_as_context().unwrap(), + ctx, + } + .try_into()?; + + Ok(Self { messages, view_ctx }) + } + + pub async fn next<'a>(&'a mut self) -> VortexResult> { + if self + .messages + .peek() + .and_then(|msg| msg.header_as_schema()) + .is_none() + { + return Ok(None); + } + + let schema_msg = self.messages.next().await?.header_as_schema().unwrap(); + + let dtype = DType::try_from( + schema_msg + .dtype() + .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?, + ) + .map_err(|e| vortex_err!(InvalidSerde: "Failed to parse DType: {}", e))?; + + Ok(Some( + MessageArrayReader::new(self.view_ctx.clone(), dtype, self.messages).into_reader(), + )) + } +} diff --git a/vortex-ipc/src/codecs/message_reader/futures.rs b/vortex-ipc/src/codecs/message_reader/futures.rs new file mode 100644 index 0000000000..15d800cf38 --- /dev/null +++ b/vortex-ipc/src/codecs/message_reader/futures.rs @@ -0,0 +1,157 @@ +#![cfg(feature = "futures")] + +use std::io; + +use bytes::BytesMut; +use flatbuffers::{root, root_unchecked}; +use futures_util::{AsyncRead, AsyncReadExt}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +use crate::codecs::message_reader::MessageReader; +use crate::flatbuffers::ipc::Message; + +pub struct AsyncReadMessageReader { + // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. + read: R, + message: BytesMut, + prev_message: BytesMut, + finished: bool, +} + +impl AsyncReadMessageReader { + pub async fn try_new(read: R) -> VortexResult { + let mut reader = Self { + read, + message: BytesMut::new(), + prev_message: BytesMut::new(), + finished: false, + }; + reader.load_next_message().await?; + Ok(reader) + } + + async fn load_next_message(&mut self) -> VortexResult { + let mut len_buf = [0u8; 4]; + match self.read.read_exact(&mut len_buf).await { + Ok(()) => {} + Err(e) => { + return match e.kind() { + io::ErrorKind::UnexpectedEof => Ok(false), + _ => Err(e.into()), + }; + } + } + + let len = u32::from_le_bytes(len_buf); + if len == u32::MAX { + // Marker for no more messages. + return Ok(false); + } else if len == 0 { + vortex_bail!(InvalidSerde: "Invalid IPC stream") + } + + let mut message = BytesMut::zeroed(len as usize); + self.read.read_exact(message.as_mut()).await?; + + self.message = message; + + // Validate that the message is valid a flatbuffer. + root::(&self.message).map_err( + |e| vortex_err!(InvalidSerde: "Failed to parse flatbuffer message: {:?}", e), + )?; + + Ok(true) + } +} + +impl MessageReader for AsyncReadMessageReader { + fn peek(&self) -> Option { + if self.finished { + return None; + } + // The message has been validated by the next() call. + Some(unsafe { root_unchecked::(&self.message) }) + } + + async fn next(&mut self) -> VortexResult { + if self.finished { + panic!("StreamMessageReader is finished - should've checked peek!"); + } + self.prev_message = self.message.split(); + if !self.load_next_message().await? { + self.finished = true; + } + Ok(unsafe { root_unchecked::(&self.prev_message) }) + } + + async fn read_into(&mut self, mut buffers: Vec>) -> VortexResult>> { + // TODO(ngates): there is no read_vectored_exact for AsyncRead, so for now we'll + // just read one-by-one + for buffer in buffers.iter_mut() { + self.read.read_exact(buffer).await?; + } + Ok(buffers) + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use futures_util::TryStreamExt; + use vortex::encoding::EncodingRef; + use vortex::Context; + use vortex_alp::ALPEncoding; + use vortex_fastlanes::BitPackedEncoding; + + use super::*; + use crate::codecs::array_reader::ArrayReader; + use crate::codecs::ipc_reader::IPCReader; + use crate::codecs::message_reader::test::create_stream; + + #[tokio::test] + async fn test_something() -> VortexResult<()> { + let buffer = create_stream(); + + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = AsyncReadMessageReader::try_new(buffer.as_slice()).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY: {}", array.dtype()); + while let Some(chunk) = array.try_next().await? { + println!("chunk {:?}", chunk); + } + } + + Ok(()) + } + + #[tokio::test] + async fn test_stream() -> VortexResult<()> { + let buffer = create_stream(); + + let stream = futures_util::stream::iter( + buffer + .chunks(64) + .map(|chunk| Ok(Bytes::from(chunk.to_vec()))), + ); + let reader = stream.into_async_read(); + + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = AsyncReadMessageReader::try_new(reader).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY {}", array.dtype()); + while let Some(chunk) = array.try_next().await? { + println!("chunk {:?}", chunk); + } + } + + Ok(()) + } +} diff --git a/vortex-ipc/src/codecs/message_reader/mod.rs b/vortex-ipc/src/codecs/message_reader/mod.rs new file mode 100644 index 0000000000..67909748d4 --- /dev/null +++ b/vortex-ipc/src/codecs/message_reader/mod.rs @@ -0,0 +1,117 @@ +use std::future::Future; + +use itertools::Itertools; +use vortex_buffer::Buffer; +use vortex_error::VortexResult; + +use crate::ALIGNMENT; + +pub mod futures; +pub mod monoio; +use crate::flatbuffers::ipc::Message; + +pub trait MessageReader { + fn peek(&self) -> Option; + fn next(&mut self) -> impl Future>; + fn read_into( + &mut self, + buffers: Vec>, + ) -> impl Future>>>; + + /// Fetch the buffers associated with this message. + fn buffers(&mut self) -> impl Future>> { + async { + let Some(chunk_msg) = self.peek().and_then(|m| m.header_as_chunk()) else { + // We could return an error here? + return Ok(Vec::new()); + }; + + // Initialize the column's buffers for a vectored read. + // To start with, we include the padding and then truncate the buffers after. + // TODO(ngates): improve the flatbuffer format instead of storing offset/len per buffer. + let buffers = chunk_msg + .buffers() + .unwrap_or_default() + .iter() + .map(|buffer| { + // FIXME(ngates): this assumes the next buffer offset == the aligned length of + // the previous buffer. I will fix this by improving the flatbuffer format instead + // of fiddling with the logic here. + let len_width_padding = + (buffer.length() as usize + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); + // TODO(ngates): switch to use uninitialized + vec![0u8; len_width_padding] + }) + .collect_vec(); + + // Just sanity check the above + assert_eq!( + buffers.iter().map(|b| b.len()).sum::(), + chunk_msg.buffer_size() as usize + ); + + // Issue a vectored read to fill all buffers + let buffers: Vec> = self.read_into(buffers).await?; + + // Truncate each buffer to strip the padding. + let buffers = buffers + .into_iter() + .zip( + self.peek() + .unwrap() + .header_as_chunk() + .unwrap() + .buffers() + .unwrap_or_default() + .iter(), + ) + .map(|(mut vec, buf)| { + vec.truncate(buf.length() as usize); + Buffer::from(vec) + }) + .collect_vec(); + + Ok(buffers) + } + } +} + +#[cfg(test)] +pub mod test { + use std::io::Cursor; + + use vortex::array::chunked::ChunkedArray; + use vortex::array::primitive::PrimitiveArray; + use vortex::encoding::EncodingRef; + use vortex::{ArrayDType, Context, IntoArray}; + use vortex_alp::ALPEncoding; + use vortex_fastlanes::BitPackedEncoding; + + use crate::writer::StreamWriter; + + pub fn create_stream() -> Vec { + let ctx = Context::default().with_encodings([ + &ALPEncoding as EncodingRef, + &BitPackedEncoding as EncodingRef, + ]); + let array = PrimitiveArray::from(vec![0, 1, 2]).into_array(); + let chunked_array = + ChunkedArray::try_new(vec![array.clone(), array.clone()], array.dtype().clone()) + .unwrap() + .into_array(); + + let mut buffer = vec![]; + let mut cursor = Cursor::new(&mut buffer); + { + let mut writer = StreamWriter::try_new(&mut cursor, &ctx).unwrap(); + writer.write_array(&array).unwrap(); + writer.write_array(&chunked_array).unwrap(); + } + + // Push some extra bytes to test that the reader is well-behaved and doesn't read past the + // end of the stream. + // let _ = cursor.write(b"hello").unwrap(); + + buffer + } +} diff --git a/vortex-ipc/src/codecs/message_reader/monoio.rs b/vortex-ipc/src/codecs/message_reader/monoio.rs new file mode 100644 index 0000000000..8338d143d3 --- /dev/null +++ b/vortex-ipc/src/codecs/message_reader/monoio.rs @@ -0,0 +1,159 @@ +#![cfg(feature = "monoio")] + +use bytes::BytesMut; +use flatbuffers::{root, root_unchecked}; +use monoio::buf::{IoBufMut, IoVecBufMut, VecBuf}; +use monoio::io::{AsyncReadRent, AsyncReadRentExt}; +use vortex_error::VortexResult; + +use crate::codecs::message_reader::MessageReader; +use crate::flatbuffers::ipc::Message; + +pub struct MonoIoMessageReader { + // TODO(ngates): swap this for our own mutable aligned buffer so we can support direct reads. + read: R, + message: BytesMut, + prev_message: BytesMut, + finished: bool, +} + +impl MonoIoMessageReader { + pub async fn try_new(read: R) -> VortexResult { + let mut reader = Self { + read, + message: BytesMut::new(), + prev_message: BytesMut::new(), + finished: false, + }; + reader.load_next_message().await?; + Ok(reader) + } + + async fn load_next_message(&mut self) -> VortexResult { + // FIXME(ngates): how do we read into a stack allocated thing? + let len_buf = self.read.read_exact_into(Vec::with_capacity(4)).await?; + + let len = u32::from_le_bytes(len_buf.as_slice().try_into()?); + if len == u32::MAX { + // Marker for no more messages. + return Ok(false); + } + + // TODO(ngates): we may be able to use self.message.split() and then swap back after. + + let message = self + .read + .read_exact_into(BytesMut::with_capacity(len as usize)) + .await?; + + // Validate that the message is valid a flatbuffer. + let _ = root::(message.as_ref())?; + + self.message = message; + + Ok(true) + } +} + +impl MessageReader for MonoIoMessageReader { + fn peek(&self) -> Option { + if self.finished { + return None; + } + // The message has been validated by the next() call. + Some(unsafe { root_unchecked::(&self.message) }) + } + + async fn next(&mut self) -> VortexResult { + if self.finished { + panic!("StreamMessageReader is finished - should've checked peek!"); + } + self.prev_message = self.message.split(); + if !self.load_next_message().await? { + self.finished = true; + } + Ok(unsafe { root_unchecked::(&self.prev_message) }) + } + + async fn read_into(&mut self, buffers: Vec>) -> VortexResult>> { + Ok(self + .read + .readv_exact_into(VecBuf::from(buffers)) + .await? + .into()) + } +} + +trait AsyncReadRentMoreExt: AsyncReadRentExt { + /// Same as read_exact except unwraps the BufResult into a regular IO result. + async fn read_exact_into(&mut self, buf: B) -> std::io::Result { + match self.read_exact(buf).await { + (Ok(_), buf) => Ok(buf), + (Err(e), _) => Err(e), + } + } + + /// Same as read_vectored_exact except unwraps the BufResult into a regular IO result. + async fn readv_exact_into(&mut self, buf: B) -> std::io::Result { + match self.read_vectored_exact(buf).await { + (Ok(_), buf) => Ok(buf), + (Err(e), _) => Err(e), + } + } +} + +impl AsyncReadRentMoreExt for R {} + +#[cfg(test)] +mod tests { + use futures_util::TryStreamExt; + use vortex::encoding::EncodingRef; + use vortex::Context; + use vortex_alp::ALPEncoding; + use vortex_fastlanes::BitPackedEncoding; + + use super::*; + use crate::codecs::array_reader::ArrayReader; + use crate::codecs::ipc_reader::IPCReader; + use crate::codecs::message_reader::test::create_stream; + + #[monoio::test] + async fn test_something() -> VortexResult<()> { + let buffer = create_stream(); + + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + println!("ARRAY {}", array.dtype()); + + while let Some(chunk) = array.try_next().await? { + println!("chunk {:?}", chunk); + } + } + + Ok(()) + } + + #[monoio::test] + async fn test_array_stream() -> VortexResult<()> { + let buffer = create_stream(); + + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut messages = MonoIoMessageReader::try_new(buffer.as_slice()).await?; + + let mut reader = IPCReader::try_from_messages(&ctx, &mut messages).await?; + while let Some(array) = reader.next().await? { + futures_util::pin_mut!(array); + while let Some(array) = array.try_next().await? { + println!("chunk {:?}", array); + } + } + + Ok(()) + } +} diff --git a/vortex-ipc/src/codecs/mod.rs b/vortex-ipc/src/codecs/mod.rs new file mode 100644 index 0000000000..2a26f114f7 --- /dev/null +++ b/vortex-ipc/src/codecs/mod.rs @@ -0,0 +1,7 @@ +mod array_reader; +mod ipc_reader; +mod message_reader; + +pub use array_reader::ArrayReader; +pub use ipc_reader::IPCReader; +pub use message_reader::*; diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index d15a0ff6dd..2702d57d4b 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -29,6 +29,7 @@ pub mod flatbuffers { } } +pub mod codecs; pub mod iter; mod messages; pub mod reader; diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index 6418254cef..5642a47e59 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -121,7 +121,6 @@ impl FallibleLendingIterator for StreamReader { } } -#[allow(dead_code)] pub struct StreamArrayReader<'a, R: Read> { ctx: &'a ViewContext, read: &'a mut R, diff --git a/vortex-ipc/src/writer.rs b/vortex-ipc/src/writer.rs index 9bfbdb78f4..cf1dd328c1 100644 --- a/vortex-ipc/src/writer.rs +++ b/vortex-ipc/src/writer.rs @@ -10,7 +10,6 @@ use vortex_flatbuffers::FlatBufferWriter; use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCSchema}; use crate::ALIGNMENT; -#[allow(dead_code)] pub struct StreamWriter { write: W, ctx: ViewContext,