diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index ff8c44e3c0..8b13c35bc7 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -26,7 +26,6 @@ pub mod flatbuffers { pub mod iter; mod messages; -mod read_ext; pub mod reader; pub mod writer; diff --git a/vortex-ipc/src/read_ext.rs b/vortex-ipc/src/read_ext.rs deleted file mode 100644 index 7e555087e3..0000000000 --- a/vortex-ipc/src/read_ext.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::future::Future; -use std::io; - -use monoio::io::AsyncReadRent; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; - -const BUF_SIZE: usize = 8192; - -pub trait AsyncReadRentSkip: AsyncReadRent { - /// Skip n bytes in the stream. - fn skip(&mut self, nbytes: usize) -> impl Future>; -} - -impl AsyncReadRentSkip for R { - async fn skip(&mut self, nbytes: usize) -> VortexResult<()> { - if nbytes < BUF_SIZE { - let buf = Vec::with_capacity(nbytes); - let (res_len, _) = self.read(buf).await; - return res_len.map(|_| ()).map_err(|e| vortex_err!(IOError: e)); - } - - let mut buf: Vec = Vec::with_capacity(BUF_SIZE); - let mut remaining: usize = nbytes; - - while remaining >= BUF_SIZE { - buf.clear(); - let (read_res, buf_read) = self.read(buf).await; - match read_res { - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { - // retry - buf = buf_read; - continue; - } - Err(e) => { - // should return error - vortex_bail!(IOError: e); - } - Ok(n) => { - remaining -= n; - buf = buf_read; - } - } - } - - if remaining > 0 { - let buf = Vec::with_capacity(remaining); - let (res_len, _) = self.read(buf).await; - return res_len.map(|_| ()).map_err(|e| vortex_err!(IOError: e)); - } - - Ok(()) - } -} diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index 10cfc8c2da..5ddc79685a 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -25,7 +25,6 @@ use vortex_scalar::Scalar; use crate::flatbuffers::ipc::Message; use crate::iter::{FallibleIterator, FallibleLendingIterator, FallibleLendingIteratorāļžItem}; use crate::messages::SerdeContextDeserializer; -use crate::read_ext::AsyncReadRentSkip; pub struct StreamReader { read: R, @@ -258,27 +257,30 @@ impl<'iter, R: AsyncReadRent> FallibleLendingIterator for StreamArrayReader<'ite // Read all the column's buffers self.buffers.clear(); let mut offset: usize = 0; - for buffer in chunk_msg.buffers().unwrap_or_default().iter() { - let _skip = buffer.offset() as usize - offset; - self.read.skip(buffer.offset() as usize - offset).await?; + let buffers = chunk_msg.buffers().unwrap_or_default(); + for i in 0..buffers.len() { + let buffer = buffers.get(i); + let next_offset = if i == buffers.len() - 1 { + chunk_msg.buffer_size() as usize + } else { + buffers.get(i + 1).offset() as usize + }; + let buf_len = buffer.length() as usize; + let padding = next_offset - offset - buf_len; // TODO(ngates): read into a single buffer, then Arc::clone and slice - let bytes = Vec::with_capacity(buffer.length() as usize); - let (len_res, bytes_read) = self.read.read_exact(bytes).await; - if len_res? != buffer.length() as usize { + let bytes = Vec::with_capacity(buf_len + padding); + let (len_res, mut bytes_read) = self.read.read_exact(bytes).await; + if len_res? != buf_len + padding { vortex_bail!("Mismatched length read from buffer"); } + bytes_read.truncate(buf_len); let arrow_buffer = ArrowBuffer::from_vec(bytes_read); self.buffers.push(Buffer::Owned(arrow_buffer)); - offset = (buffer.offset() + buffer.length()) as usize; + offset = next_offset; } - // Consume any remaining padding after the final buffer. - self.read - .skip(chunk_msg.buffer_size() as usize - offset) - .await?; - // After reading the buffers we're now able to load the next message. let col_array = self .messages