Skip to content

Commit

Permalink
something
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed May 23, 2024
1 parent 6181d44 commit 51116cb
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
14 changes: 12 additions & 2 deletions vortex-ipc/src/io/offset.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::future::Future;
use std::io;

use bytes::BytesMut;

use crate::io::VortexReadAt;
use crate::io::{VortexRead, VortexReadAt};

/// An adapter that offsets all reads by a fixed amount.
pub struct OffsetReadAt<R> {
Expand All @@ -21,11 +22,20 @@ impl<R: VortexReadAt> VortexReadAt for OffsetReadAt<R> {
&mut self,
pos: u64,
buffer: BytesMut,
) -> impl Future<Output = std::io::Result<BytesMut>> {
) -> impl Future<Output = io::Result<BytesMut>> {
self.read.read_at_into(pos + self.offset, buffer)
}

fn performance_hint(&self) -> usize {
self.read.performance_hint()
}
}

impl<R: VortexReadAt> VortexRead for OffsetReadAt<R> {
async fn read_into(&mut self, buffer: BytesMut) -> io::Result<BytesMut> {
let buffer_len = buffer.len() as u64;
let res = self.read.read_at_into(self.offset, buffer).await?;
self.offset += buffer_len;
Ok(res)
}
}
22 changes: 19 additions & 3 deletions vortex-ipc/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use futures_util::{Stream, TryStreamExt};

use vortex::{Array, IntoArrayData, ViewContext};
use vortex::array::chunked::ChunkedArray;
use vortex::stream::ArrayStream;
use vortex::{Array, IntoArrayData, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};

Expand All @@ -14,6 +16,7 @@ pub struct ArrayWriter<W: VortexWrite> {

view_ctx_range: Option<ByteRange>,
array_layouts: Vec<ArrayLayout>,
page_ranges: Vec<ByteRange>,
}

impl<W: VortexWrite> ArrayWriter<W> {
Expand All @@ -23,6 +26,7 @@ impl<W: VortexWrite> ArrayWriter<W> {
view_ctx,
view_ctx_range: None,
array_layouts: vec![],
page_ranges: vec![],
}
}

Expand All @@ -34,6 +38,10 @@ impl<W: VortexWrite> ArrayWriter<W> {
&self.array_layouts
}

pub fn page_ranges(&self) -> &[ByteRange] {
&self.page_ranges
}

pub fn into_inner(self) -> W {
self.msgs.into_inner()
}
Expand All @@ -60,8 +68,8 @@ impl<W: VortexWrite> ArrayWriter<W> {
}

async fn write_array_chunks<S>(&mut self, mut stream: S) -> VortexResult<ChunkLayout>
where
S: Stream<Item = VortexResult<Array>> + Unpin,
where
S: Stream<Item=VortexResult<Array>> + Unpin,
{
let mut byte_offsets = vec![self.msgs.tell()];
let mut row_offsets = vec![0];
Expand Down Expand Up @@ -102,6 +110,14 @@ impl<W: VortexWrite> ArrayWriter<W> {
self.write_array_stream(array.into_array_stream()).await
}
}

pub async fn write_page(mut self, buffer: Buffer) -> VortexResult<Self> {
let begin = self.msgs.tell();
self.msgs.write_page(buffer).await?;
let end = self.msgs.tell();
self.page_ranges.push(ByteRange { begin, end });
Ok(self)
}
}

#[derive(Copy, Clone, Debug)]
Expand Down

0 comments on commit 51116cb

Please sign in to comment.