Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose ViewContext and page byte ranges in array reader #339

Merged
merged 6 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions vortex-ipc/src/io/offset.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::future::Future;
use std::io;

use bytes::BytesMut;

use crate::io::VortexReadAt;
use crate::io::{VortexRead, VortexReadAt};

/// An adapter that offsets all reads by a fixed amount.
pub struct OffsetReadAt<R> {
Expand All @@ -21,11 +22,20 @@ impl<R: VortexReadAt> VortexReadAt for OffsetReadAt<R> {
&mut self,
pos: u64,
buffer: BytesMut,
) -> impl Future<Output = std::io::Result<BytesMut>> {
) -> impl Future<Output = io::Result<BytesMut>> {
self.read.read_at_into(pos + self.offset, buffer)
}

fn performance_hint(&self) -> usize {
self.read.performance_hint()
}
}

impl<R: VortexReadAt> VortexRead for OffsetReadAt<R> {
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
async fn read_into(&mut self, buffer: BytesMut) -> io::Result<BytesMut> {
let buffer_len = buffer.len() as u64;
let res = self.read.read_at_into(self.offset, buffer).await?;
self.offset += buffer_len;
Ok(res)
}
}
22 changes: 19 additions & 3 deletions vortex-ipc/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use futures_util::{Stream, TryStreamExt};

use vortex::{Array, IntoArrayData, ViewContext};
use vortex::array::chunked::ChunkedArray;
use vortex::stream::ArrayStream;
use vortex::{Array, IntoArrayData, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};

Expand All @@ -14,6 +16,7 @@ pub struct ArrayWriter<W: VortexWrite> {

view_ctx_range: Option<ByteRange>,
array_layouts: Vec<ArrayLayout>,
page_ranges: Vec<ByteRange>,
}

impl<W: VortexWrite> ArrayWriter<W> {
Expand All @@ -23,6 +26,7 @@ impl<W: VortexWrite> ArrayWriter<W> {
view_ctx,
view_ctx_range: None,
array_layouts: vec![],
page_ranges: vec![],
}
}

Expand All @@ -34,6 +38,10 @@ impl<W: VortexWrite> ArrayWriter<W> {
&self.array_layouts
}

pub fn page_ranges(&self) -> &[ByteRange] {
&self.page_ranges
}

pub fn into_inner(self) -> W {
self.msgs.into_inner()
}
Expand All @@ -60,8 +68,8 @@ impl<W: VortexWrite> ArrayWriter<W> {
}

async fn write_array_chunks<S>(&mut self, mut stream: S) -> VortexResult<ChunkLayout>
where
S: Stream<Item = VortexResult<Array>> + Unpin,
where
S: Stream<Item=VortexResult<Array>> + Unpin,
{
let mut byte_offsets = vec![self.msgs.tell()];
let mut row_offsets = vec![0];
Expand Down Expand Up @@ -102,6 +110,14 @@ impl<W: VortexWrite> ArrayWriter<W> {
self.write_array_stream(array.into_array_stream()).await
}
}

pub async fn write_page(mut self, buffer: Buffer) -> VortexResult<Self> {
let begin = self.msgs.tell();
self.msgs.write_page(buffer).await?;
let end = self.msgs.tell();
self.page_ranges.push(ByteRange { begin, end });
Ok(self)
}
}

#[derive(Copy, Clone, Debug)]
Expand Down
Loading