From 681e7b3afd337c0f546ece4495954eb05d66ca06 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 4 Jun 2024 07:41:11 -0400 Subject: [PATCH] Expose ViewContext and page byte ranges in array reader (#339) --- Cargo.toml | 2 +- vortex-fastlanes/src/bitpacking/compress.rs | 2 +- vortex-ipc/src/stream_reader/mod.rs | 5 +++++ vortex-ipc/src/writer.rs | 15 +++++++++++++++ 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f04c450fbc..080a89a81a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,5 +107,5 @@ zigzag = "0.1.0" warnings = "deny" [workspace.lints.clippy] -all = "deny" +all = { level = "deny", priority = -1 } or_fun_call = "deny" diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 63ed4dba55..62c2cd7fed 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -300,7 +300,7 @@ pub fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult( packed: &[u8], bit_width: usize, diff --git a/vortex-ipc/src/stream_reader/mod.rs b/vortex-ipc/src/stream_reader/mod.rs index a9ebd8b2fa..bba9759655 100644 --- a/vortex-ipc/src/stream_reader/mod.rs +++ b/vortex-ipc/src/stream_reader/mod.rs @@ -43,6 +43,11 @@ impl StreamArrayReader { Ok(self) } + /// Retrieve the loaded view_context + pub fn view_context(&self) -> Option> { + self.view_context.clone() + } + pub fn with_dtype(self, dtype: DType) -> Self { assert!(self.dtype.is_none(), "DType already set"); Self { diff --git a/vortex-ipc/src/writer.rs b/vortex-ipc/src/writer.rs index 19d7f02277..4670106fb3 100644 --- a/vortex-ipc/src/writer.rs +++ b/vortex-ipc/src/writer.rs @@ -2,6 +2,7 @@ use futures_util::{Stream, TryStreamExt}; use vortex::array::chunked::ChunkedArray; use vortex::stream::ArrayStream; use vortex::{Array, IntoArrayData, ViewContext}; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; @@ -14,6 +15,7 @@ pub struct ArrayWriter { view_ctx_range: Option, array_layouts: Vec, + page_ranges: Vec, } impl ArrayWriter { @@ -23,6 +25,7 @@ impl ArrayWriter { view_ctx, view_ctx_range: None, array_layouts: vec![], + page_ranges: vec![], } } @@ -34,6 +37,10 @@ impl ArrayWriter { &self.array_layouts } + pub fn page_ranges(&self) -> &[ByteRange] { + &self.page_ranges + } + pub fn into_inner(self) -> W { self.msgs.into_inner() } @@ -102,6 +109,14 @@ impl ArrayWriter { self.write_array_stream(array.into_array_stream()).await } } + + pub async fn write_page(mut self, buffer: Buffer) -> VortexResult { + let begin = self.msgs.tell(); + self.msgs.write_page(buffer).await?; + let end = self.msgs.tell(); + self.page_ranges.push(ByteRange { begin, end }); + Ok(self) + } } #[derive(Copy, Clone, Debug)]