Skip to content

Commit

Permalink
drop skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed May 6, 2024
1 parent f7291bc commit dfb1ca6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 67 deletions.
1 change: 0 additions & 1 deletion vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pub mod flatbuffers {

pub mod iter;
mod messages;
mod read_ext;
pub mod reader;
pub mod writer;

Expand Down
53 changes: 0 additions & 53 deletions vortex-ipc/src/read_ext.rs

This file was deleted.

28 changes: 15 additions & 13 deletions vortex-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use vortex_scalar::Scalar;
use crate::flatbuffers::ipc::Message;
use crate::iter::{FallibleIterator, FallibleLendingIterator, FallibleLendingIteratorඞItem};
use crate::messages::SerdeContextDeserializer;
use crate::read_ext::AsyncReadRentSkip;

pub struct StreamReader<R: AsyncReadRent> {
read: R,
Expand Down Expand Up @@ -259,27 +258,30 @@ impl<'iter, R: AsyncReadRent> FallibleLendingIterator for StreamArrayReader<'ite
// Read all the column's buffers
self.buffers.clear();
let mut offset: usize = 0;
for buffer in chunk_msg.buffers().unwrap_or_default().iter() {
let _skip = buffer.offset() as usize - offset;
self.read.skip(buffer.offset() as usize - offset).await?;
let buffers = chunk_msg.buffers().unwrap_or_default();
for i in 0..buffers.len() {
let buffer = buffers.get(i);
let next_offset = if i == buffers.len() - 1 {
chunk_msg.buffer_size() as usize
} else {
buffers.get(i + 1).offset() as usize
};
let buf_len = buffer.length() as usize;
let padding = next_offset - offset - buf_len;

// TODO(ngates): read into a single buffer, then Arc::clone and slice
let bytes = Vec::with_capacity(buffer.length() as usize);
let (len_res, bytes_read) = self.read.read_exact(bytes).await;
if len_res? != buffer.length() as usize {
let bytes = Vec::with_capacity(buf_len + padding);
let (len_res, mut bytes_read) = self.read.read_exact(bytes).await;
if len_res? != buf_len + padding {
vortex_bail!("Mismatched length read from buffer");
}
bytes_read.truncate(buf_len);
let arrow_buffer = ArrowBuffer::from_vec(bytes_read);
self.buffers.push(Buffer::Owned(arrow_buffer));

offset = (buffer.offset() + buffer.length()) as usize;
offset = next_offset;
}

// Consume any remaining padding after the final buffer.
self.read
.skip(chunk_msg.buffer_size() as usize - offset)
.await?;

// After reading the buffers we're now able to load the next message.
let col_array = self
.messages
Expand Down

0 comments on commit dfb1ca6

Please sign in to comment.