diff --git a/vortex-ipc/src/io/offset.rs b/vortex-ipc/src/io/offset.rs index b9e2e94e9e..1c8cc18138 100644 --- a/vortex-ipc/src/io/offset.rs +++ b/vortex-ipc/src/io/offset.rs @@ -1,8 +1,9 @@ use std::future::Future; +use std::io; use bytes::BytesMut; -use crate::io::VortexReadAt; +use crate::io::{VortexRead, VortexReadAt}; /// An adapter that offsets all reads by a fixed amount. pub struct OffsetReadAt { @@ -21,7 +22,7 @@ impl VortexReadAt for OffsetReadAt { &mut self, pos: u64, buffer: BytesMut, - ) -> impl Future> { + ) -> impl Future> { self.read.read_at_into(pos + self.offset, buffer) } @@ -29,3 +30,12 @@ impl VortexReadAt for OffsetReadAt { self.read.performance_hint() } } + +impl VortexRead for OffsetReadAt { + async fn read_into(&mut self, buffer: BytesMut) -> io::Result { + let buffer_len = buffer.len() as u64; + let res = self.read.read_at_into(self.offset, buffer).await?; + self.offset += buffer_len; + Ok(res) + } +} diff --git a/vortex-ipc/src/writer.rs b/vortex-ipc/src/writer.rs index 19d7f02277..545f395e27 100644 --- a/vortex-ipc/src/writer.rs +++ b/vortex-ipc/src/writer.rs @@ -1,7 +1,9 @@ use futures_util::{Stream, TryStreamExt}; + +use vortex::{Array, IntoArrayData, ViewContext}; use vortex::array::chunked::ChunkedArray; use vortex::stream::ArrayStream; -use vortex::{Array, IntoArrayData, ViewContext}; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; @@ -14,6 +16,7 @@ pub struct ArrayWriter { view_ctx_range: Option, array_layouts: Vec, + page_ranges: Vec, } impl ArrayWriter { @@ -23,6 +26,7 @@ impl ArrayWriter { view_ctx, view_ctx_range: None, array_layouts: vec![], + page_ranges: vec![], } } @@ -34,6 +38,10 @@ impl ArrayWriter { &self.array_layouts } + pub fn page_ranges(&self) -> &[ByteRange] { + &self.page_ranges + } + pub fn into_inner(self) -> W { self.msgs.into_inner() } @@ -60,8 +68,8 @@ impl ArrayWriter { } async fn write_array_chunks(&mut self, mut stream: S) -> VortexResult - where - S: Stream> + Unpin, + where + S: Stream> + Unpin, { let mut byte_offsets = vec![self.msgs.tell()]; let mut row_offsets = vec![0]; @@ -102,6 +110,14 @@ impl ArrayWriter { self.write_array_stream(array.into_array_stream()).await } } + + pub async fn write_page(mut self, buffer: Buffer) -> VortexResult { + let begin = self.msgs.tell(); + self.msgs.write_page(buffer).await?; + let end = self.msgs.tell(); + self.page_ranges.push(ByteRange { begin, end }); + Ok(self) + } } #[derive(Copy, Clone, Debug)]