From 26e01669795c8c93fc3ac47207437d531738c1df Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Mon, 8 Jul 2024 12:34:56 +0100 Subject: [PATCH] Remove ViewContext and assign stable ids to encodings --- bench-vortex/src/data_downloads.rs | 9 +-- bench-vortex/src/lib.rs | 5 +- bench-vortex/src/reader.rs | 30 +++---- encodings/alp/src/array.rs | 2 +- encodings/byte_bool/src/lib.rs | 2 +- encodings/datetime-parts/src/array.rs | 2 +- encodings/dict/src/dict.rs | 2 +- encodings/fastlanes/src/bitpacking/mod.rs | 2 +- encodings/fastlanes/src/delta/mod.rs | 2 +- encodings/fastlanes/src/for/mod.rs | 2 +- encodings/roaring/src/boolean/mod.rs | 2 +- encodings/roaring/src/integer/mod.rs | 2 +- encodings/runend/src/runend.rs | 7 +- encodings/zigzag/src/zigzag.rs | 2 +- vortex-array/src/array/bool/mod.rs | 2 +- vortex-array/src/array/chunked/mod.rs | 2 +- vortex-array/src/array/constant/mod.rs | 3 +- vortex-array/src/array/extension/mod.rs | 2 +- vortex-array/src/array/null/mod.rs | 2 +- vortex-array/src/array/primitive/mod.rs | 2 +- vortex-array/src/array/sparse/mod.rs | 2 +- vortex-array/src/array/struct_/mod.rs | 2 +- vortex-array/src/array/varbin/mod.rs | 2 +- vortex-array/src/array/varbinview/mod.rs | 2 +- vortex-array/src/context.rs | 12 +-- vortex-array/src/encoding.rs | 11 ++- vortex-array/src/implementation.rs | 4 +- vortex-array/src/view.rs | 48 ++---------- vortex-ipc/benches/ipc_array_reader_take.rs | 20 ++--- vortex-ipc/benches/ipc_take.rs | 18 ++--- vortex-ipc/flatbuffers/message.fbs | 9 --- vortex-ipc/src/chunked_reader/mod.rs | 8 +- vortex-ipc/src/chunked_reader/take_rows.rs | 17 ++-- vortex-ipc/src/lib.rs | 36 +++------ vortex-ipc/src/message_reader.rs | 53 ++++--------- vortex-ipc/src/message_writer.rs | 18 ++--- vortex-ipc/src/messages.rs | 86 +++------------------ vortex-ipc/src/stream_reader/mod.rs | 43 ++--------- vortex-ipc/src/writer.rs | 30 +------ 39 files changed, 136 insertions(+), 369 deletions(-) diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs index cef25307e4..d1bdf017b7 100644 --- a/bench-vortex/src/data_downloads.rs +++ b/bench-vortex/src/data_downloads.rs @@ -11,14 +11,14 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::runtime::Runtime; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowType; -use vortex::{IntoArray, ToArrayData, ViewContext}; +use vortex::{IntoArray, ToArrayData}; use vortex_dtype::DType; use vortex_error::{VortexError, VortexResult}; use vortex_ipc::io::TokioAdapter; use vortex_ipc::writer::ArrayWriter; +use crate::idempotent; use crate::reader::BATCH_SIZE; -use crate::{idempotent, CTX}; pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf { idempotent(&fname, |path| { @@ -57,10 +57,7 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa .unwrap() .block_on(async move { let write = tokio::fs::File::create(path).await.unwrap(); - ArrayWriter::new(TokioAdapter(write), ViewContext::from(&CTX.clone())) - .write_context() - .await - .unwrap() + ArrayWriter::new(TokioAdapter(write)) .write_array(array) .await .unwrap(); diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 09b0294d6b..6a0c64e9e7 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -4,6 +4,7 @@ use std::collections::HashSet; use std::env::temp_dir; use std::fs::{create_dir_all, File}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use arrow_array::RecordBatchReader; use humansize::DECIMAL; @@ -48,7 +49,7 @@ pub mod taxi_data; pub mod vortex_utils; lazy_static! { - pub static ref CTX: Context = Context::default().with_encodings([ + pub static ref CTX: Arc = Arc::new(Context::default().with_encodings([ &ALPEncoding as EncodingRef, &DictEncoding, &BitPackedEncoding, @@ -60,7 +61,7 @@ lazy_static! { // &RoaringIntEncoding, // Doesn't offer anything more than FoR really // &ZigZagEncoding, - ]); + ])); } lazy_static! { diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 234de63fd5..bb458758b8 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -5,11 +5,11 @@ use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::Arc; -use arrow_array::types::Int64Type; use arrow_array::{ ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray, RecordBatch, RecordBatchReader, }; +use arrow_array::types::Int64Type; use arrow_select::concat::concat_batches; use arrow_select::take::take_record_batch; use bytes::{Bytes, BytesMut}; @@ -22,20 +22,21 @@ use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::arrow::ParquetRecordBatchStreamBuilder; use serde::{Deserialize, Serialize}; use stream::StreamExt; + +use vortex::{Array, IntoArray, IntoCanonical, ToArrayData}; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; use vortex::arrow::FromArrowType; use vortex::compress::CompressionStrategy; use vortex::stream::ArrayStreamExt; -use vortex::{Array, IntoArray, IntoCanonical, ToArrayData, ViewContext}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexResult}; use vortex_ipc::chunked_reader::ChunkedArrayReader; -use vortex_ipc::io::ObjectStoreExt; use vortex_ipc::io::{TokioAdapter, VortexReadAt, VortexWrite}; -use vortex_ipc::writer::ArrayWriter; +use vortex_ipc::io::ObjectStoreExt; use vortex_ipc::MessageReader; +use vortex_ipc::writer::ArrayWriter; use vortex_sampling_compressor::SamplingCompressor; use crate::{COMPRESSORS, CTX}; @@ -46,13 +47,13 @@ pub const BATCH_SIZE: usize = 65_536; pub struct VortexFooter { pub byte_offsets: Vec, pub row_offsets: Vec, - pub view_context_dtype_range: Range, + pub dtype_range: Range, } pub async fn open_vortex(path: &Path) -> VortexResult { let file = tokio::fs::File::open(path).await.unwrap(); let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap(); - msgs.array_stream_from_messages(&CTX) + msgs.array_stream_from_messages(CTX.clone()) .await .unwrap() .collect_chunked() @@ -66,20 +67,17 @@ pub async fn rewrite_parquet_as_vortex( ) -> VortexResult<()> { let chunked = compress_parquet_to_vortex(parquet_path.as_path())?; - let written = ArrayWriter::new(write, ViewContext::from(&CTX.clone())) - .write_context() - .await? + let written = ArrayWriter::new(write) .write_array_stream(chunked.array_stream()) .await?; - let view_ctx_range = written.view_context_range().unwrap(); let layout = written.array_layouts()[0].clone(); let mut w = written.into_inner(); let mut s = flexbuffers::FlexbufferSerializer::new(); VortexFooter { byte_offsets: layout.chunks.byte_offsets, row_offsets: layout.chunks.row_offsets, - view_context_dtype_range: view_ctx_range.begin..layout.dtype.end, + dtype_range: layout.dtype.begin..layout.dtype.end, } .serialize(&mut s)?; let footer_bytes = Buffer::Bytes(Bytes::from(s.take_buffer())); @@ -147,20 +145,16 @@ pub async fn read_vortex_footer_format( flexbuffers::Reader::get_root(buf.as_ref()).map_err(|e| vortex_err!("{}", e))?, )?; - let header_len = - (footer.view_context_dtype_range.end - footer.view_context_dtype_range.start) as usize; + let header_len = (footer.dtype_range.end - footer.dtype_range.start) as usize; buf.reserve(header_len - buf.len()); unsafe { buf.set_len(header_len) } - buf = reader - .read_at_into(footer.view_context_dtype_range.start, buf) - .await?; + buf = reader.read_at_into(footer.dtype_range.start, buf).await?; let mut header_reader = MessageReader::try_new(buf).await?; - let view_ctx = header_reader.read_view_context(&CTX).await?; let dtype = header_reader.read_dtype().await?; ChunkedArrayReader::try_new( reader, - view_ctx, + CTX.clone(), dtype, PrimitiveArray::from(footer.byte_offsets).into_array(), PrimitiveArray::from(footer.row_offsets).into_array(), diff --git a/encodings/alp/src/array.rs b/encodings/alp/src/array.rs index 0ea3e223aa..95fc44f5be 100644 --- a/encodings/alp/src/array.rs +++ b/encodings/alp/src/array.rs @@ -10,7 +10,7 @@ use vortex_error::vortex_bail; use crate::alp::Exponents; use crate::compress::{alp_encode, decompress}; -impl_encoding!("vortex.alp", ALP); +impl_encoding!("vortex.alp", 13u16, ALP); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ALPMetadata { diff --git a/encodings/byte_bool/src/lib.rs b/encodings/byte_bool/src/lib.rs index 93518c4602..ce20fa86a0 100644 --- a/encodings/byte_bool/src/lib.rs +++ b/encodings/byte_bool/src/lib.rs @@ -14,7 +14,7 @@ use vortex_buffer::Buffer; mod compute; mod stats; -impl_encoding!("vortex.byte_bool", ByteBool); +impl_encoding!("vortex.byte_bool", 12u16, ByteBool); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ByteBoolMetadata { diff --git a/encodings/datetime-parts/src/array.rs b/encodings/datetime-parts/src/array.rs index cc2f043970..2ca4e8fc35 100644 --- a/encodings/datetime-parts/src/array.rs +++ b/encodings/datetime-parts/src/array.rs @@ -7,7 +7,7 @@ use vortex_error::vortex_bail; use crate::compute::decode_to_localdatetime; -impl_encoding!("vortex.datetimeparts", DateTimeParts); +impl_encoding!("vortex.datetimeparts", 20u16, DateTimeParts); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DateTimePartsMetadata { diff --git a/encodings/dict/src/dict.rs b/encodings/dict/src/dict.rs index 5de3eea392..b9cf1a453a 100644 --- a/encodings/dict/src/dict.rs +++ b/encodings/dict/src/dict.rs @@ -9,7 +9,7 @@ use vortex::{impl_encoding, ArrayDType, Canonical, IntoCanonical}; use vortex_dtype::match_each_integer_ptype; use vortex_error::vortex_bail; -impl_encoding!("vortex.dict", Dict); +impl_encoding!("vortex.dict", 20u16, Dict); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DictMetadata { diff --git a/encodings/fastlanes/src/bitpacking/mod.rs b/encodings/fastlanes/src/bitpacking/mod.rs index 2c02471e6b..1bc874457a 100644 --- a/encodings/fastlanes/src/bitpacking/mod.rs +++ b/encodings/fastlanes/src/bitpacking/mod.rs @@ -11,7 +11,7 @@ use vortex_error::{vortex_bail, vortex_err}; mod compress; mod compute; -impl_encoding!("fastlanes.bitpacked", BitPacked); +impl_encoding!("fastlanes.bitpacked", 14u16, BitPacked); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BitPackedMetadata { diff --git a/encodings/fastlanes/src/delta/mod.rs b/encodings/fastlanes/src/delta/mod.rs index 48165e4e5e..eb91b45949 100644 --- a/encodings/fastlanes/src/delta/mod.rs +++ b/encodings/fastlanes/src/delta/mod.rs @@ -11,7 +11,7 @@ use vortex_error::vortex_bail; mod compress; mod compute; -impl_encoding!("fastlanes.delta", Delta); +impl_encoding!("fastlanes.delta", 16u16, Delta); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeltaMetadata { diff --git a/encodings/fastlanes/src/for/mod.rs b/encodings/fastlanes/src/for/mod.rs index 9117d395ba..70d8902067 100644 --- a/encodings/fastlanes/src/for/mod.rs +++ b/encodings/fastlanes/src/for/mod.rs @@ -11,7 +11,7 @@ use vortex_scalar::Scalar; mod compress; mod compute; -impl_encoding!("fastlanes.for", FoR); +impl_encoding!("fastlanes.for", 15u16, FoR); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FoRMetadata { diff --git a/encodings/roaring/src/boolean/mod.rs b/encodings/roaring/src/boolean/mod.rs index 5a72299b93..b2f40d2463 100644 --- a/encodings/roaring/src/boolean/mod.rs +++ b/encodings/roaring/src/boolean/mod.rs @@ -16,7 +16,7 @@ use vortex_error::{vortex_bail, vortex_err}; mod compress; mod compute; -impl_encoding!("vortex.roaring_bool", RoaringBool); +impl_encoding!("vortex.roaring_bool", 17u16, RoaringBool); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RoaringBoolMetadata { diff --git a/encodings/roaring/src/integer/mod.rs b/encodings/roaring/src/integer/mod.rs index 7a3f365559..e454d99c4e 100644 --- a/encodings/roaring/src/integer/mod.rs +++ b/encodings/roaring/src/integer/mod.rs @@ -14,7 +14,7 @@ use vortex_error::{vortex_bail, vortex_err}; mod compress; mod compute; -impl_encoding!("vortex.roaring_int", RoaringInt); +impl_encoding!("vortex.roaring_int", 18u16, RoaringInt); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RoaringIntMetadata { diff --git a/encodings/runend/src/runend.rs b/encodings/runend/src/runend.rs index 61c1819c77..dd4df3bb18 100644 --- a/encodings/runend/src/runend.rs +++ b/encodings/runend/src/runend.rs @@ -1,16 +1,17 @@ use serde::{Deserialize, Serialize}; + +use vortex::{ArrayDType, Canonical, impl_encoding, IntoArrayVariant, IntoCanonical}; use vortex::array::primitive::{Primitive, PrimitiveArray}; use vortex::compute::search_sorted::{search_sorted, SearchSortedSide}; use vortex::compute::unary::scalar_at::scalar_at; use vortex::stats::{ArrayStatistics, ArrayStatisticsCompute}; use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use vortex::{impl_encoding, ArrayDType, Canonical, IntoArrayVariant, IntoCanonical}; use vortex_error::vortex_bail; use crate::compress::{runend_decode, runend_encode}; -impl_encoding!("vortex.runend", RunEnd); +impl_encoding!("vortex.runend", 19u16, RunEnd); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RunEndMetadata { @@ -138,10 +139,10 @@ impl ArrayTrait for RunEndArray { #[cfg(test)] mod test { + use vortex::{ArrayDType, ArrayTrait, IntoArray, IntoCanonical}; use vortex::compute::slice::slice; use vortex::compute::unary::scalar_at::scalar_at; use vortex::validity::Validity; - use vortex::{ArrayDType, ArrayTrait, IntoArray, IntoCanonical}; use vortex_dtype::{DType, Nullability, PType}; use crate::RunEndArray; diff --git a/encodings/zigzag/src/zigzag.rs b/encodings/zigzag/src/zigzag.rs index 0a83fc5698..6f90f8b779 100644 --- a/encodings/zigzag/src/zigzag.rs +++ b/encodings/zigzag/src/zigzag.rs @@ -9,7 +9,7 @@ use vortex_error::{vortex_bail, vortex_err}; use crate::compress::zigzag_encode; -impl_encoding!("vortex.zigzag", ZigZag); +impl_encoding!("vortex.zigzag", 21u16, ZigZag); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ZigZagMetadata; diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index ae62800ce5..927f731a06 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -12,7 +12,7 @@ mod accessors; mod compute; mod stats; -impl_encoding!("vortex.bool", Bool); +impl_encoding!("vortex.bool", 2u16, Bool); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BoolMetadata { diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 1fa590a43d..7077e47149 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -23,7 +23,7 @@ mod canonical; mod compute; mod stats; -impl_encoding!("vortex.chunked", Chunked); +impl_encoding!("vortex.chunked", 11u16, Chunked); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChunkedMetadata; diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index b681941aa2..b5bf1aa565 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -7,11 +7,12 @@ use crate::impl_encoding; use crate::stats::Stat; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; + mod canonical; mod compute; mod stats; -impl_encoding!("vortex.constant", Constant); +impl_encoding!("vortex.constant", 10u16, Constant); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ConstantMetadata { diff --git a/vortex-array/src/array/extension/mod.rs b/vortex-array/src/array/extension/mod.rs index 706b187b5f..170aab6d32 100644 --- a/vortex-array/src/array/extension/mod.rs +++ b/vortex-array/src/array/extension/mod.rs @@ -8,7 +8,7 @@ use crate::{impl_encoding, ArrayDType, Canonical, IntoCanonical}; mod compute; -impl_encoding!("vortex.ext", Extension); +impl_encoding!("vortex.ext", 16u16, Extension); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ExtensionMetadata { diff --git a/vortex-array/src/array/null/mod.rs b/vortex-array/src/array/null/mod.rs index 98359ae522..15b6f16b40 100644 --- a/vortex-array/src/array/null/mod.rs +++ b/vortex-array/src/array/null/mod.rs @@ -7,7 +7,7 @@ use crate::{impl_encoding, Canonical, IntoCanonical}; mod compute; -impl_encoding!("vortex.null", Null); +impl_encoding!("vortex.null", 1u16, Null); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NullMetadata { diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index 85372d0e5f..7d30f42d38 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -15,7 +15,7 @@ mod accessor; mod compute; mod stats; -impl_encoding!("vortex.primitive", Primitive); +impl_encoding!("vortex.primitive", 3u16, Primitive); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PrimitiveMetadata { diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 8d5d7deb5d..b16ac0015a 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -14,7 +14,7 @@ use crate::{impl_encoding, ArrayDType, IntoCanonical}; mod compute; mod flatten; -impl_encoding!("vortex.sparse", Sparse); +impl_encoding!("vortex.sparse", 9u16, Sparse); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SparseMetadata { diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index ad4e1c21d4..68333ec6a9 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -10,7 +10,7 @@ use crate::{Canonical, IntoCanonical}; mod compute; -impl_encoding!("vortex.struct", Struct); +impl_encoding!("vortex.struct", 8u16, Struct); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct StructMetadata { diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 2b0728404d..5069bb7303 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -21,7 +21,7 @@ mod compute; mod flatten; mod stats; -impl_encoding!("vortex.varbin", VarBin); +impl_encoding!("vortex.varbin", 4u16, VarBin); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VarBinMetadata { diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 7f28a43708..339f05b9b3 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -103,7 +103,7 @@ impl Debug for BinaryView { pub const VIEW_SIZE: usize = mem::size_of::(); -impl_encoding!("vortex.varbinview", VarBinView); +impl_encoding!("vortex.varbinview", 5u16, VarBinView); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VarBinViewMetadata { diff --git a/vortex-array/src/context.rs b/vortex-array/src/context.rs index c34351c705..9ca3c5e0d7 100644 --- a/vortex-array/src/context.rs +++ b/vortex-array/src/context.rs @@ -13,18 +13,18 @@ use crate::encoding::EncodingRef; #[derive(Debug, Clone)] pub struct Context { - encodings: HashMap, + encodings: HashMap, } impl Context { pub fn with_encoding(mut self, encoding: EncodingRef) -> Self { - self.encodings.insert(encoding.id().to_string(), encoding); + self.encodings.insert(encoding.id().code(), encoding); self } pub fn with_encodings>(mut self, encodings: E) -> Self { self.encodings - .extend(encodings.into_iter().map(|e| (e.id().to_string(), e))); + .extend(encodings.into_iter().map(|e| (e.id().code(), e))); self } @@ -32,8 +32,8 @@ impl Context { self.encodings.values().cloned() } - pub fn lookup_encoding(&self, encoding_id: &str) -> Option { - self.encodings.get(encoding_id).cloned() + pub fn lookup_encoding(&self, encoding_code: u16) -> Option { + self.encodings.get(&encoding_code).cloned() } } @@ -53,7 +53,7 @@ impl Default for Context { &VarBinViewEncoding, ] .into_iter() - .map(|e| (e.id().to_string(), e)), + .map(|e| (e.id().code(), e)), ), } } diff --git a/vortex-array/src/encoding.rs b/vortex-array/src/encoding.rs index 8ff7160c8c..440019dc22 100644 --- a/vortex-array/src/encoding.rs +++ b/vortex-array/src/encoding.rs @@ -8,11 +8,15 @@ use crate::ArrayDef; use crate::{Array, ArrayTrait}; #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] -pub struct EncodingId(&'static str); +pub struct EncodingId(&'static str, u16); impl EncodingId { - pub const fn new(id: &'static str) -> Self { - Self(id) + pub const fn new(id: &'static str, code: u16) -> Self { + Self(id, code) + } + + pub const fn code(&self) -> u16 { + self.1 } } @@ -66,7 +70,6 @@ pub trait ArrayEncodingExt { IntoCanonical::into_canonical(typed) } - #[inline] fn with_dyn(array: &Array, mut f: F) -> R where F: for<'b> FnMut(&'b (dyn ArrayTrait + 'b)) -> R, diff --git a/vortex-array/src/implementation.rs b/vortex-array/src/implementation.rs index 92d789e8c4..6dca0708a0 100644 --- a/vortex-array/src/implementation.rs +++ b/vortex-array/src/implementation.rs @@ -25,7 +25,7 @@ pub trait ArrayDef { #[macro_export] macro_rules! impl_encoding { - ($id:literal, $Name:ident) => { + ($id:literal, $code:literal, $Name:ident) => { use $crate::vendored::paste::paste; paste! { @@ -58,7 +58,7 @@ macro_rules! impl_encoding { #[derive(Debug, Clone)] pub struct $Name; impl ArrayDef for $Name { - const ID: EncodingId = EncodingId::new($id); + const ID: EncodingId = EncodingId::new($id, $code); const ENCODING: EncodingRef = &[<$Name Encoding>]; type Array = [<$Name Array>]; type Metadata = [<$Name Metadata>]; diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index ffd971db52..d1765c0f58 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -9,7 +9,7 @@ use vortex_dtype::{DType, Nullability}; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; use vortex_scalar::{PValue, Scalar, ScalarValue}; -use crate::encoding::{EncodingId, EncodingRef}; +use crate::encoding::EncodingRef; use crate::flatbuffers as fb; use crate::stats::{Stat, Statistics, StatsSet}; use crate::visitor::ArrayVisitor; @@ -24,7 +24,7 @@ pub struct ArrayView { flatbuffer_loc: usize, // TODO(ngates): create an RC'd vector that can be lazily sliced. buffers: Vec, - ctx: Arc, + ctx: Arc, // TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array // metadata, but only the buffers from the selected columns. Therefore we need to know // which fb:Array children to skip when calculating how to slice into buffers. @@ -43,7 +43,7 @@ impl Debug for ArrayView { impl ArrayView { pub fn try_new( - ctx: Arc, + ctx: Arc, dtype: DType, flatbuffer: Buffer, flatbuffer_init: F, @@ -56,7 +56,7 @@ impl ArrayView { let flatbuffer_loc = array._tab.loc(); let encoding = ctx - .find_encoding(array.encoding()) + .lookup_encoding(array.encoding()) .ok_or_else(|| vortex_err!(InvalidSerde: "Encoding ID out of bounds"))?; if buffers.len() != Self::cumulative_nbuffers(array) { @@ -107,7 +107,7 @@ impl ArrayView { let child = self.array_child(idx)?; let flatbuffer_loc = child._tab.loc(); - let encoding = self.ctx.find_encoding(child.encoding())?; + let encoding = self.ctx.lookup_encoding(child.encoding())?; // Figure out how many buffers to skip... // We store them depth-first. @@ -277,41 +277,3 @@ impl IntoArray for ArrayView { Array::View(self) } } - -#[derive(Debug, Clone)] -pub struct ViewContext { - encodings: Vec, -} - -impl ViewContext { - pub fn new(encodings: Vec) -> Self { - Self { encodings } - } - - pub fn encodings(&self) -> &[EncodingRef] { - self.encodings.as_ref() - } - - pub fn find_encoding(&self, encoding_id: u16) -> Option { - self.encodings.get(encoding_id as usize).cloned() - } - - pub fn encoding_idx(&self, encoding_id: EncodingId) -> Option { - self.encodings - .iter() - .position(|e| e.id() == encoding_id) - .map(|i| i as u16) - } -} - -impl Default for ViewContext { - fn default() -> Self { - Self::from(&Context::default()) - } -} - -impl From<&Context> for ViewContext { - fn from(value: &Context) -> Self { - Self::new(value.encodings().collect_vec()) - } -} diff --git a/vortex-ipc/benches/ipc_array_reader_take.rs b/vortex-ipc/benches/ipc_array_reader_take.rs index c316988ffe..886a26d245 100644 --- a/vortex-ipc/benches/ipc_array_reader_take.rs +++ b/vortex-ipc/benches/ipc_array_reader_take.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use criterion::async_executor::FuturesExecutor; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use futures_executor::block_on; @@ -7,7 +9,7 @@ use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; use vortex::stream::ArrayStreamExt; use vortex::validity::Validity; -use vortex::{Context, IntoArray, ViewContext}; +use vortex::{Context, IntoArray}; use vortex_ipc::io::FuturesAdapter; use vortex_ipc::writer::ArrayWriter; use vortex_ipc::MessageReader; @@ -16,7 +18,7 @@ use vortex_ipc::MessageReader; // take from the first 20 batches and last batch // compare with arrow fn ipc_array_reader_take(c: &mut Criterion) { - let ctx = Context::default(); + let ctx = Arc::new(Context::default()); let indices = (0..20) .map(|i| i * 100_000 + 1) @@ -32,15 +34,9 @@ fn ipc_array_reader_take(c: &mut Criterion) { ) .into_array(); - let buffer = block_on(async { - ArrayWriter::new(vec![], ViewContext::from(&ctx)) - .write_context() - .await? - .write_array(array) - .await - }) - .unwrap() - .into_inner(); + let buffer = block_on(async { ArrayWriter::new(vec![]).write_array(array).await }) + .unwrap() + .into_inner(); let indices = indices.clone().into_array(); @@ -50,7 +46,7 @@ fn ipc_array_reader_take(c: &mut Criterion) { .await .unwrap(); let stream = msgs - .array_stream_from_messages(&ctx) + .array_stream_from_messages(ctx.clone()) .await .unwrap() .take_rows(indices.clone()) diff --git a/vortex-ipc/benches/ipc_take.rs b/vortex-ipc/benches/ipc_take.rs index a1b3ffd366..da3c304a19 100644 --- a/vortex-ipc/benches/ipc_take.rs +++ b/vortex-ipc/benches/ipc_take.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use vortex::array::primitive::PrimitiveArray; use vortex::compress::CompressionStrategy; use vortex::compute::take::take; -use vortex::{Context, IntoArray, ViewContext}; +use vortex::{Context, IntoArray}; use vortex_ipc::io::FuturesAdapter; use vortex_ipc::writer::ArrayWriter; use vortex_ipc::MessageReader; @@ -61,23 +61,17 @@ fn ipc_take(c: &mut Criterion) { let compressed = compressor.compress(&uncompressed).unwrap(); // Try running take over an ArrayView. - let buffer = block_on(async { - ArrayWriter::new(vec![], ViewContext::from(&ctx)) - .write_context() - .await? - .write_array(compressed) - .await - }) - .unwrap() - .into_inner(); + let buffer = block_on(async { ArrayWriter::new(vec![]).write_array(compressed).await }) + .unwrap() + .into_inner(); - let ctx_ref = &ctx; + let ctx_ref = &Arc::new(ctx); let ro_buffer = buffer.as_slice(); let indices_ref = &indices; b.to_async(FuturesExecutor).iter(|| async move { let mut msgs = MessageReader::try_new(FuturesAdapter(Cursor::new(ro_buffer))).await?; - let reader = msgs.array_stream_from_messages(ctx_ref).await?; + let reader = msgs.array_stream_from_messages(ctx_ref.clone()).await?; pin_mut!(reader); let array_view = reader.try_next().await?.unwrap(); black_box(take(&array_view, indices_ref)) diff --git a/vortex-ipc/flatbuffers/message.fbs b/vortex-ipc/flatbuffers/message.fbs index 418b3e959e..01ca8fef2f 100644 --- a/vortex-ipc/flatbuffers/message.fbs +++ b/vortex-ipc/flatbuffers/message.fbs @@ -7,14 +7,6 @@ enum Version: uint8 { V0 = 0, } -table Context { - encodings: [Encoding]; -} - -table Encoding { - id: string; -} - table Schema { dtype: vortex.dtype.DType; } @@ -41,7 +33,6 @@ table Page { } union MessageHeader { - Context, Schema, Chunk, Page, diff --git a/vortex-ipc/src/chunked_reader/mod.rs b/vortex-ipc/src/chunked_reader/mod.rs index 654c2f66df..0c987400d0 100644 --- a/vortex-ipc/src/chunked_reader/mod.rs +++ b/vortex-ipc/src/chunked_reader/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use vortex::{Array, ViewContext}; +use vortex::{Array, Context}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; @@ -11,7 +11,7 @@ mod take_rows; /// A reader for a chunked array. pub struct ChunkedArrayReader { read: R, - view_context: Arc, + context: Arc, dtype: DType, // One row per chunk + 1 row for the end of the last chunk. @@ -22,7 +22,7 @@ pub struct ChunkedArrayReader { impl ChunkedArrayReader { pub fn try_new( read: R, - view_context: Arc, + context: Arc, dtype: DType, byte_offsets: Array, row_offsets: Array, @@ -30,7 +30,7 @@ impl ChunkedArrayReader { Self::validate(&byte_offsets, &row_offsets)?; Ok(Self { read, - view_context, + context, dtype, byte_offsets, row_offsets, diff --git a/vortex-ipc/src/chunked_reader/take_rows.rs b/vortex-ipc/src/chunked_reader/take_rows.rs index f920294e48..93d9a33f48 100644 --- a/vortex-ipc/src/chunked_reader/take_rows.rs +++ b/vortex-ipc/src/chunked_reader/take_rows.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::ops::{Deref, Range}; +use std::ops::Range; use bytes::BytesMut; use futures_util::{stream, StreamExt, TryStreamExt}; @@ -148,9 +148,8 @@ impl ChunkedArrayReader { // MesssageReader. let buffer = self.read.read_at_into(byte_range.start, buffer).await?; - let reader = StreamArrayReader::try_new(buffer) + let reader = StreamArrayReader::try_new(buffer, self.context.clone()) .await? - .with_view_context(self.view_context.deref().clone()) .with_dtype(self.dtype.clone()); // Take the indices from the stream. @@ -218,11 +217,12 @@ struct ChunkIndices { #[cfg(test)] mod test { use std::io::Cursor; + use std::sync::Arc; use itertools::Itertools; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; - use vortex::{ArrayTrait, IntoArray, IntoCanonical, ViewContext}; + use vortex::{ArrayTrait, Context, IntoArray, IntoCanonical}; use vortex_buffer::Buffer; use vortex_dtype::PType; use vortex_error::VortexResult; @@ -238,11 +238,7 @@ mod test { )? .into_array(); - ArrayWriter::new(vec![], ViewContext::default()) - .write_context() - .await? - .write_array(c) - .await + ArrayWriter::new(vec![]).write_array(c).await } #[tokio::test] @@ -256,12 +252,11 @@ mod test { let buffer = Buffer::from(writer.into_inner()); let mut msgs = MessageReader::try_new(Cursor::new(buffer.clone())).await?; - let view_ctx = msgs.read_view_context(&Default::default()).await?; let dtype = msgs.read_dtype().await?; let mut reader = ChunkedArrayReader::try_new( buffer, - view_ctx, + Arc::new(Context::default()), dtype, byte_offsets.into_array(), row_offsets.into_array(), diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index 2600b3893c..95841a239b 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -1,6 +1,6 @@ pub use message_reader::*; pub use message_writer::*; -use vortex_error::{vortex_err, VortexError}; + pub mod chunked_reader; pub mod io; mod message_reader; @@ -38,45 +38,32 @@ pub mod flatbuffers { } } -pub(crate) const fn missing(field: &'static str) -> impl FnOnce() -> VortexError { - move || vortex_err!(InvalidSerde: "missing field: {}", field) -} - #[cfg(test)] pub mod test { + use std::sync::Arc; + use futures_util::io::Cursor; use futures_util::{pin_mut, StreamExt, TryStreamExt}; use itertools::Itertools; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; use vortex::encoding::ArrayEncoding; - use vortex::encoding::EncodingRef; use vortex::stream::ArrayStreamExt; - use vortex::{ArrayDType, Context, IntoArray, ViewContext}; - use vortex_alp::ALPEncoding; + use vortex::{ArrayDType, Context, IntoArray}; use vortex_error::VortexResult; - use vortex_fastlanes::BitPackedEncoding; use crate::io::FuturesAdapter; use crate::writer::ArrayWriter; use crate::MessageReader; pub async fn create_stream() -> Vec { - let ctx = Context::default().with_encodings([ - &ALPEncoding as EncodingRef, - &BitPackedEncoding as EncodingRef, - ]); - let array = PrimitiveArray::from(vec![0, 1, 2]).into_array(); let chunked_array = ChunkedArray::try_new(vec![array.clone(), array.clone()], array.dtype().clone()) .unwrap() .into_array(); - ArrayWriter::new(vec![], ViewContext::from(&ctx)) - .write_context() - .await - .unwrap() + ArrayWriter::new(vec![]) .write_array(array) .await .unwrap() @@ -87,10 +74,7 @@ pub mod test { } async fn write_ipc(array: A) -> Vec { - ArrayWriter::new(vec![], ViewContext::from(&Context::default())) - .write_context() - .await - .unwrap() + ArrayWriter::new(vec![]) .write_array(array.into_array()) .await .unwrap() @@ -104,11 +88,11 @@ pub mod test { let indices = PrimitiveArray::from(vec![1, 2, 10]).into_array(); - let ctx = Context::default(); + let ctx = Arc::new(Context::default()); let mut messages = MessageReader::try_new(FuturesAdapter(Cursor::new(buffer))) .await .unwrap(); - let reader = messages.array_stream_from_messages(&ctx).await?; + let reader = messages.array_stream_from_messages(ctx).await?; let result_iter = reader.take_rows(indices).unwrap(); pin_mut!(result_iter); @@ -134,9 +118,9 @@ pub mod test { let mut messages = MessageReader::try_new(FuturesAdapter(Cursor::new(buffer))).await?; - let ctx = Context::default(); + let ctx = Arc::new(Context::default()); let take_iter = messages - .array_stream_from_messages(&ctx) + .array_stream_from_messages(ctx) .await? .take_rows(indices)?; pin_mut!(take_iter); diff --git a/vortex-ipc/src/message_reader.rs b/vortex-ipc/src/message_reader.rs index cce349d25b..1b1273470a 100644 --- a/vortex-ipc/src/message_reader.rs +++ b/vortex-ipc/src/message_reader.rs @@ -6,14 +6,13 @@ use flatbuffers::{root, root_unchecked}; use futures_util::stream::try_unfold; use itertools::Itertools; use vortex::stream::{ArrayStream, ArrayStreamAdapter}; -use vortex::{Array, ArrayView, Context, IntoArray, ToArray, ViewContext}; +use vortex::{Array, ArrayView, Context, IntoArray, ToArray}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; use crate::flatbuffers::ipc as fb; use crate::io::VortexRead; -use crate::messages::SerdeContextDeserializer; pub struct MessageReader { read: R, @@ -138,23 +137,6 @@ impl MessageReader { Ok(buffers) } - pub async fn read_view_context<'a>( - &'a mut self, - ctx: &'a Context, - ) -> VortexResult> { - if self.peek().and_then(|m| m.header_as_context()).is_none() { - vortex_bail!("Expected context message"); - } - - let view_ctx: ViewContext = SerdeContextDeserializer { - fb: self.next().await?.header_as_context().unwrap(), - ctx, - } - .try_into()?; - - Ok(view_ctx.into()) - } - pub async fn read_dtype(&mut self) -> VortexResult { if self.peek().and_then(|m| m.header_as_schema()).is_none() { vortex_bail!("Expected schema message") @@ -174,7 +156,7 @@ impl MessageReader { pub async fn maybe_read_chunk( &mut self, - view_ctx: Arc, + ctx: Arc, dtype: DType, ) -> VortexResult> { if self.peek().and_then(|m| m.header_as_chunk()).is_none() { @@ -185,7 +167,7 @@ impl MessageReader { let flatbuffer = self.next_raw().await?; let view = ArrayView::try_new( - view_ctx, + ctx, dtype, flatbuffer, |flatbuffer| { @@ -207,27 +189,22 @@ impl MessageReader { /// Construct an ArrayStream pulling the ViewContext and DType from the stream. pub async fn array_stream_from_messages( &mut self, - ctx: &Context, + ctx: Arc, ) -> VortexResult { - let view_context = self.read_view_context(ctx).await?; let dtype = self.read_dtype().await?; - Ok(self.array_stream(view_context, dtype)) + Ok(self.array_stream(ctx, dtype)) } - pub fn array_stream( - &mut self, - view_context: Arc, - dtype: DType, - ) -> impl ArrayStream + '_ { + pub fn array_stream(&mut self, ctx: Arc, dtype: DType) -> impl ArrayStream + '_ { struct State<'a, R: VortexRead> { msgs: &'a mut MessageReader, - view_context: Arc, + ctx: Arc, dtype: DType, } let init = State { msgs: self, - view_context, + ctx, dtype: dtype.clone(), }; @@ -236,7 +213,7 @@ impl MessageReader { try_unfold(init, |state| async move { match state .msgs - .maybe_read_chunk(state.view_context.clone(), state.dtype.clone()) + .maybe_read_chunk(state.ctx.clone(), state.dtype.clone()) .await? { None => Ok(None), @@ -246,20 +223,16 @@ impl MessageReader { ) } - pub fn into_array_stream( - self, - view_context: Arc, - dtype: DType, - ) -> impl ArrayStream { + pub fn into_array_stream(self, ctx: Arc, dtype: DType) -> impl ArrayStream { struct State { msgs: MessageReader, - view_context: Arc, + ctx: Arc, dtype: DType, } let init = State { msgs: self, - view_context, + ctx, dtype: dtype.clone(), }; @@ -268,7 +241,7 @@ impl MessageReader { try_unfold(init, |mut state| async move { match state .msgs - .maybe_read_chunk(state.view_context.clone(), state.dtype.clone()) + .maybe_read_chunk(state.ctx.clone(), state.dtype.clone()) .await? { None => Ok(None), diff --git a/vortex-ipc/src/message_writer.rs b/vortex-ipc/src/message_writer.rs index d5a3bd7aa0..8406c351ed 100644 --- a/vortex-ipc/src/message_writer.rs +++ b/vortex-ipc/src/message_writer.rs @@ -2,15 +2,16 @@ use std::io; use flatbuffers::FlatBufferBuilder; use itertools::Itertools; -use vortex::{Array, ViewContext}; -use vortex_buffer::io_buf::IoBuf; + +use vortex::Array; use vortex_buffer::Buffer; +use vortex_buffer::io_buf::IoBuf; use vortex_dtype::DType; use vortex_flatbuffers::WriteFlatBuffer; -use crate::io::VortexWrite; -use crate::messages::{IPCChunk, IPCContext, IPCMessage, IPCPage, IPCSchema}; use crate::ALIGNMENT; +use crate::io::VortexWrite; +use crate::messages::{IPCChunk, IPCMessage, IPCPage, IPCSchema}; const ZEROS: [u8; 512] = [0u8; 512]; @@ -43,21 +44,16 @@ impl MessageWriter { self.pos } - pub async fn write_view_context(&mut self, view_ctx: &ViewContext) -> io::Result<()> { - self.write_message(IPCMessage::Context(IPCContext(view_ctx))) - .await - } - pub async fn write_dtype(&mut self, dtype: &DType) -> io::Result<()> { self.write_message(IPCMessage::Schema(IPCSchema(dtype))) .await } - pub async fn write_chunk(&mut self, view_ctx: &ViewContext, chunk: Array) -> io::Result<()> { + pub async fn write_chunk(&mut self, chunk: Array) -> io::Result<()> { let buffer_offsets = chunk.all_buffer_offsets(self.alignment); // Serialize the Chunk message. - self.write_message(IPCMessage::Chunk(IPCChunk(view_ctx, &chunk))) + self.write_message(IPCMessage::Chunk(IPCChunk(&chunk))) .await?; // Keep track of the offset to add padding after each buffer. diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index 40b89f9080..8cbc25dfb9 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -2,27 +2,23 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset}; use itertools::Itertools; use vortex::stats::ArrayStatistics; use vortex::{flatbuffers as fba, Array}; -use vortex::{Context, ViewContext}; use vortex_buffer::Buffer; use vortex_dtype::DType; -use vortex_error::{vortex_err, VortexError}; use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer}; use crate::flatbuffers::ipc as fb; use crate::flatbuffers::ipc::Compression; -use crate::{missing, ALIGNMENT}; +use crate::ALIGNMENT; pub enum IPCMessage<'a> { - Context(IPCContext<'a>), Schema(IPCSchema<'a>), Chunk(IPCChunk<'a>), Page(IPCPage<'a>), } -pub struct IPCContext<'a>(pub &'a ViewContext); pub struct IPCSchema<'a>(pub &'a DType); -pub struct IPCChunk<'a>(pub &'a ViewContext, pub &'a Array); -pub struct IPCArray<'a>(pub &'a ViewContext, pub &'a Array); +pub struct IPCChunk<'a>(pub &'a Array); +pub struct IPCArray<'a>(pub &'a Array); pub struct IPCPage<'a>(pub &'a Buffer); impl FlatBufferRoot for IPCMessage<'_> {} @@ -35,7 +31,6 @@ impl WriteFlatBuffer for IPCMessage<'_> { fbb: &mut FlatBufferBuilder<'fb>, ) -> WIPOffset> { let header = match self { - Self::Context(f) => f.write_flatbuffer(fbb).as_union_value(), Self::Schema(f) => f.write_flatbuffer(fbb).as_union_value(), Self::Chunk(f) => f.write_flatbuffer(fbb).as_union_value(), Self::Page(f) => f.write_flatbuffer(fbb).as_union_value(), @@ -44,7 +39,6 @@ impl WriteFlatBuffer for IPCMessage<'_> { let mut msg = fb::MessageBuilder::new(fbb); msg.add_version(Default::default()); msg.add_header_type(match self { - Self::Context(_) => fb::MessageHeader::Context, Self::Schema(_) => fb::MessageHeader::Schema, Self::Chunk(_) => fb::MessageHeader::Chunk, Self::Page(_) => fb::MessageHeader::Page, @@ -54,63 +48,6 @@ impl WriteFlatBuffer for IPCMessage<'_> { } } -impl<'a> WriteFlatBuffer for IPCContext<'a> { - type Target<'t> = fb::Context<'t>; - - fn write_flatbuffer<'fb>( - &self, - fbb: &mut FlatBufferBuilder<'fb>, - ) -> WIPOffset> { - let fb_encodings = self - .0 - .encodings() - .iter() - .map(|e| e.id()) - .map(|id| { - let encoding_id = fbb.create_string(id.as_ref()); - fb::Encoding::create( - fbb, - &fb::EncodingArgs { - id: Some(encoding_id), - }, - ) - }) - .collect_vec(); - let fb_encodings = fbb.create_vector(fb_encodings.as_slice()); - - fb::Context::create( - fbb, - &fb::ContextArgs { - encodings: Some(fb_encodings), - }, - ) - } -} - -pub struct SerdeContextDeserializer<'a> { - pub(crate) fb: fb::Context<'a>, - pub(crate) ctx: &'a Context, -} - -impl<'a> TryFrom> for ViewContext { - type Error = VortexError; - - fn try_from(deser: SerdeContextDeserializer<'a>) -> Result { - let fb_encodings = deser.fb.encodings().ok_or_else(missing("encodings"))?; - let mut encodings = Vec::with_capacity(fb_encodings.len()); - for fb_encoding in fb_encodings { - let encoding_id = fb_encoding.id().ok_or_else(missing("encoding.id"))?; - encodings.push( - deser - .ctx - .lookup_encoding(encoding_id) - .ok_or_else(|| vortex_err!("Stream uses unknown encoding {}", encoding_id))?, - ); - } - Ok(Self::new(encodings)) - } -} - impl<'a> WriteFlatBuffer for IPCSchema<'a> { type Target<'t> = fb::Schema<'t>; @@ -130,8 +67,8 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> { &self, fbb: &mut FlatBufferBuilder<'fb>, ) -> WIPOffset> { - let array_data = self.1; - let array = Some(IPCArray(self.0, array_data).write_flatbuffer(fbb)); + let array_data = self.0; + let array = Some(IPCArray(array_data).write_flatbuffer(fbb)); // Walk the ColumnData depth-first to compute the buffer offsets. let mut buffers = vec![]; @@ -167,14 +104,9 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { &self, fbb: &mut FlatBufferBuilder<'fb>, ) -> WIPOffset> { - let ctx = self.0; - let column_data = self.1; - - let encoding = ctx - .encoding_idx(column_data.encoding().id()) - // FIXME(ngates): return result from this writer? - .unwrap_or_else(|| panic!("Encoding not found: {:?}", column_data.encoding())); + let column_data = self.0; + let encoding = column_data.encoding().id().code(); let metadata = match column_data { Array::Data(d) => { let metadata = d @@ -190,11 +122,11 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { let children = column_data .children() .iter() - .map(|child| IPCArray(self.0, child).write_flatbuffer(fbb)) + .map(|child| IPCArray(child).write_flatbuffer(fbb)) .collect_vec(); let children = Some(fbb.create_vector(&children)); - let stats = Some(self.1.statistics().write_flatbuffer(fbb)); + let stats = Some(column_data.statistics().write_flatbuffer(fbb)); fba::Array::create( fbb, diff --git a/vortex-ipc/src/stream_reader/mod.rs b/vortex-ipc/src/stream_reader/mod.rs index 620d958a80..6368f310c1 100644 --- a/vortex-ipc/src/stream_reader/mod.rs +++ b/vortex-ipc/src/stream_reader/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures_util::stream::try_unfold; use futures_util::Stream; use vortex::stream::ArrayStream; -use vortex::{Context, ViewContext}; +use vortex::Context; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -14,40 +14,19 @@ use crate::MessageReader; pub struct StreamArrayReader { msgs: MessageReader, - view_context: Option>, + ctx: Arc, dtype: Option>, } impl StreamArrayReader { - pub async fn try_new(read: R) -> VortexResult { + pub async fn try_new(read: R, ctx: Arc) -> VortexResult { Ok(Self { msgs: MessageReader::try_new(read).await?, - view_context: None, + ctx, dtype: None, }) } - /// Manually configure the view context. - pub fn with_view_context(self, view_context: ViewContext) -> Self { - assert!(self.view_context.is_none(), "View context already set"); - Self { - view_context: Some(Arc::new(view_context)), - ..self - } - } - - /// Load the view context from the stream. - pub async fn load_view_context(mut self, ctx: &Context) -> VortexResult { - assert!(self.view_context.is_none(), "View context already set"); - self.view_context = Some(self.msgs.read_view_context(ctx).await?); - Ok(self) - } - - /// Retrieve the loaded view_context - pub fn view_context(&self) -> Option> { - self.view_context.clone() - } - pub fn with_dtype(self, dtype: DType) -> Self { assert!(self.dtype.is_none(), "DType already set"); Self { @@ -64,23 +43,13 @@ impl StreamArrayReader { /// Reads a single array from the stream. pub fn array_stream(&mut self) -> impl ArrayStream + '_ { - let view_context = self - .view_context - .as_ref() - .expect("View context not set") - .clone(); let dtype = self.dtype.as_ref().expect("DType not set").deref().clone(); - self.msgs.array_stream(view_context, dtype) + self.msgs.array_stream(self.ctx.clone(), dtype) } pub fn into_array_stream(self) -> impl ArrayStream { - let view_context = self - .view_context - .as_ref() - .expect("View context not set") - .clone(); let dtype = self.dtype.as_ref().expect("DType not set").deref().clone(); - self.msgs.into_array_stream(view_context, dtype) + self.msgs.into_array_stream(self.ctx.clone(), dtype) } /// Reads a single page from the stream. diff --git a/vortex-ipc/src/writer.rs b/vortex-ipc/src/writer.rs index e5ec84f0df..d6084f4f7f 100644 --- a/vortex-ipc/src/writer.rs +++ b/vortex-ipc/src/writer.rs @@ -1,38 +1,30 @@ use futures_util::{Stream, TryStreamExt}; use vortex::array::chunked::ChunkedArray; use vortex::stream::ArrayStream; -use vortex::{Array, ViewContext}; +use vortex::Array; use vortex_buffer::Buffer; use vortex_dtype::DType; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use crate::io::VortexWrite; use crate::MessageWriter; pub struct ArrayWriter { msgs: MessageWriter, - view_ctx: ViewContext, - view_ctx_range: Option, array_layouts: Vec, page_ranges: Vec, } impl ArrayWriter { - pub fn new(write: W, view_ctx: ViewContext) -> Self { + pub fn new(write: W) -> Self { Self { msgs: MessageWriter::new(write), - view_ctx, - view_ctx_range: None, array_layouts: vec![], page_ranges: vec![], } } - pub fn view_context_range(&self) -> Option { - self.view_ctx_range - } - pub fn array_layouts(&self) -> &[ArrayLayout] { &self.array_layouts } @@ -45,20 +37,6 @@ impl ArrayWriter { self.msgs.into_inner() } - pub async fn write_context(mut self) -> VortexResult { - if self.view_ctx_range.is_some() { - vortex_bail!("View context already written"); - } - - let begin = self.msgs.tell(); - self.msgs.write_view_context(&self.view_ctx).await?; - let end = self.msgs.tell(); - - self.view_ctx_range = Some(ByteRange { begin, end }); - - Ok(self) - } - async fn write_dtype(&mut self, dtype: &DType) -> VortexResult { let begin = self.msgs.tell(); self.msgs.write_dtype(dtype).await?; @@ -77,7 +55,7 @@ impl ArrayWriter { while let Some(chunk) = stream.try_next().await? { row_offset += chunk.len() as u64; row_offsets.push(row_offset); - self.msgs.write_chunk(&self.view_ctx, chunk).await?; + self.msgs.write_chunk(chunk).await?; byte_offsets.push(self.msgs.tell()); }