From 68d1ebb422aa06cbc2f3430ce0fa2def6ffa484d Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 10:19:02 +0100 Subject: [PATCH 01/10] EncodingCtx --- bench-vortex/src/data_downloads.rs | 5 +- bench-vortex/src/lib.rs | 12 ++-- bench-vortex/src/reader.rs | 6 +- pyvortex/src/lib.rs | 10 ---- vortex-array/src/compress.rs | 17 +++--- vortex-array/src/context.rs | 64 +++++++++++++-------- vortex-array/src/view.rs | 47 +++++++++++++-- vortex-ipc/benches/ipc_array_reader_take.rs | 4 +- vortex-ipc/benches/ipc_take.rs | 4 +- vortex-ipc/src/lib.rs | 9 ++- vortex-ipc/src/messages.rs | 24 +++++--- vortex-ipc/src/reader.rs | 63 ++++++++++---------- vortex-ipc/src/writer.rs | 17 ++++-- 13 files changed, 162 insertions(+), 120 deletions(-) diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs index 8554739e0..6c9019bed 100644 --- a/bench-vortex/src/data_downloads.rs +++ b/bench-vortex/src/data_downloads.rs @@ -11,7 +11,7 @@ use log::info; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::runtime::Runtime; use vortex::arrow::FromArrowType; -use vortex::{IntoArray, SerdeContext, ToArrayData}; +use vortex::{Context, IntoArray, ToArrayData}; use vortex_dtype::DType; use vortex_error::{VortexError, VortexResult}; use vortex_ipc::writer::StreamWriter; @@ -59,9 +59,8 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa // FIXME(ngates): #157 the compressor should handle batch size. let reader = builder.with_batch_size(BATCH_SIZE).build().unwrap(); - let ctx = SerdeContext::default(); let mut write = File::create(path).unwrap(); - let mut writer = StreamWriter::try_new(&mut write, ctx).unwrap(); + let mut writer = StreamWriter::try_new(&mut write, &Context::default()).unwrap(); let dtype = DType::from_arrow(reader.schema()); writer.write_schema(&dtype).unwrap(); diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index b90e91d4c..8904944a2 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -15,7 +15,7 @@ use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowType; use vortex::compress::{CompressConfig, CompressCtx}; -use vortex::encoding::{EncodingRef, VORTEX_ENCODINGS}; +use vortex::encoding::EncodingRef; use vortex::{IntoArray, OwnedArray, ToArrayData}; use vortex_alp::ALPEncoding; use vortex_datetime_parts::DateTimePartsEncoding; @@ -105,10 +105,6 @@ pub fn setup_logger(level: LevelFilter) { } pub fn enumerate_arrays() -> Vec { - println!( - "FOUND {:?}", - VORTEX_ENCODINGS.iter().map(|e| e.id()).collect_vec() - ); vec![ &ALPEncoding, &DictEncoding, @@ -227,7 +223,7 @@ mod test { use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use vortex::arrow::FromArrowArray; use vortex::compute::as_arrow::as_arrow; - use vortex::{ArrayData, IntoArray}; + use vortex::{ArrayData, Context, IntoArray}; use vortex_ipc::reader::StreamReader; use vortex_ipc::writer::StreamWriter; @@ -255,12 +251,12 @@ mod test { let mut buf = Vec::::new(); { - let mut writer = StreamWriter::try_new(&mut buf, Default::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut buf, &Context::default()).unwrap(); writer.write_array(&vortex_array).unwrap(); } let mut read = buf.as_slice(); - let mut reader = StreamReader::try_new(&mut read).unwrap(); + let mut reader = StreamReader::try_new(&mut read, &Context::default()).unwrap(); reader.read_array().unwrap(); } } diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index e1687dd1d..aef9fad11 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -20,7 +20,7 @@ use tokio::runtime::Runtime; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowType; use vortex::compute::take::take; -use vortex::{IntoArray, OwnedArray, SerdeContext, ToArrayData, ToStatic}; +use vortex::{Context, IntoArray, OwnedArray, ToArrayData, ToStatic}; use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_ipc::iter::FallibleLendingIterator; @@ -34,7 +34,7 @@ pub const BATCH_SIZE: usize = 65_536; pub fn open_vortex(path: &Path) -> VortexResult { let mut file = File::open(path)?; - let mut reader = StreamReader::try_new(&mut file)?; + let mut reader = StreamReader::try_new(&mut file, &Context::default())?; let mut reader = reader.next()?.unwrap(); let dtype = reader.dtype().clone(); let mut chunks = vec![]; @@ -50,7 +50,7 @@ pub fn rewrite_parquet_as_vortex( ) -> VortexResult<()> { let chunked = compress_parquet_to_vortex(parquet_path.as_path())?; - let mut writer = StreamWriter::try_new(write, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(write, &Context::default()).unwrap(); writer.write_array(&chunked.into_array()).unwrap(); Ok(()) } diff --git a/pyvortex/src/lib.rs b/pyvortex/src/lib.rs index bc8fa8572..42a64fa3c 100644 --- a/pyvortex/src/lib.rs +++ b/pyvortex/src/lib.rs @@ -1,8 +1,6 @@ use dtype::PyDType; -use log::debug; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; -use vortex::encoding::VORTEX_ENCODINGS; use vortex_dtype::{DType, PType}; use crate::array::*; @@ -18,14 +16,6 @@ mod vortex_arrow; fn _lib(_py: Python, m: &PyModule) -> PyResult<()> { pyo3_log::init(); - debug!( - "Discovered encodings: {:?}", - VORTEX_ENCODINGS - .iter() - .map(|e| e.id().to_string()) - .collect::>() - ); - m.add_function(wrap_pyfunction!(encode::encode, m)?)?; // m.add_function(wrap_pyfunction!(compress::compress, m)?)?; diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index 25e2b37c5..b3c19a6f0 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -12,7 +12,7 @@ use crate::array::sparse::SparseEncoding; use crate::array::varbin::VarBinEncoding; use crate::compute::scalar_at::scalar_at; use crate::compute::slice::slice; -use crate::encoding::{ArrayEncoding, EncodingRef, VORTEX_ENCODINGS}; +use crate::encoding::{ArrayEncoding, EncodingRef}; use crate::sampling::stratified_slices; use crate::stats::ArrayStatistics; use crate::validity::Validity; @@ -56,7 +56,6 @@ pub struct CompressConfig { // TODO(ngates): can each encoding define their own configs? pub ree_average_run_threshold: f32, encodings: HashSet, - disabled_encodings: HashSet, } impl Default for CompressConfig { @@ -75,7 +74,6 @@ impl Default for CompressConfig { &StructEncoding, &VarBinEncoding, ]), - disabled_encodings: HashSet::new(), } } } @@ -96,14 +94,13 @@ impl CompressConfig { pub fn with_disabled>(self, disabled_encodings: E) -> Self { let mut new_self = self.clone(); disabled_encodings.into_iter().for_each(|e| { - new_self.disabled_encodings.insert(e); + new_self.encodings.remove(e); }); new_self } - pub fn is_enabled(&self, kind: EncodingRef) -> bool { - (self.encodings.is_empty() || self.encodings.contains(&kind)) - && !self.disabled_encodings.contains(&kind) + pub fn is_enabled(&self, encoding: EncodingRef) -> bool { + self.encodings.contains(&encoding) } } @@ -113,6 +110,7 @@ pub struct CompressCtx { // TODO(ngates): put this back to a reference options: Arc, depth: u8, + /// A set of encodings disabled for this ctx. disabled_encodings: HashSet, } @@ -274,9 +272,10 @@ pub fn sampled_compression(array: &Array, ctx: &CompressCtx) -> VortexResult = VORTEX_ENCODINGS + let mut candidates: Vec<&dyn EncodingCompression> = ctx + .options + .encodings .iter() - .filter(|&encoding| ctx.options().is_enabled(*encoding)) .filter(|&encoding| !ctx.disabled_encodings.contains(encoding)) .map(|encoding| encoding.compression()) .filter(|compression| { diff --git a/vortex-array/src/context.rs b/vortex-array/src/context.rs index 57482b01f..10c4fb29a 100644 --- a/vortex-array/src/context.rs +++ b/vortex-array/src/context.rs @@ -1,40 +1,54 @@ -use std::sync::Arc; +use std::collections::HashMap; -use crate::encoding::EncodingId; -use crate::encoding::{EncodingRef, VORTEX_ENCODINGS}; +use crate::array::bool::BoolEncoding; +use crate::array::chunked::ChunkedEncoding; +use crate::array::constant::ConstantEncoding; +use crate::array::extension::ExtensionEncoding; +use crate::array::primitive::PrimitiveEncoding; +use crate::array::r#struct::StructEncoding; +use crate::array::sparse::SparseEncoding; +use crate::array::varbin::VarBinEncoding; +use crate::array::varbinview::VarBinViewEncoding; +use crate::encoding::EncodingRef; -/// TODO(ngates): I'm not too sure about this construct. Where it should live, or what scope it -/// should have. -#[derive(Debug)] -pub struct SerdeContext { - encodings: Arc<[EncodingRef]>, +#[derive(Debug, Clone)] +pub struct Context { + encodings: HashMap, } -impl SerdeContext { - pub fn new(encodings: Arc<[EncodingRef]>) -> Self { - Self { encodings } +impl Context { + pub fn with_encoding(mut self, encoding: EncodingRef) -> Self { + self.encodings.insert(encoding.id().to_string(), encoding); + self } - pub fn encodings(&self) -> &[EncodingRef] { - self.encodings.as_ref() + pub fn encodings(&self) -> impl Iterator + '_ { + self.encodings.iter().map(|(_, encoding)| encoding).cloned() } - pub fn find_encoding(&self, encoding_id: u16) -> Option { - self.encodings.get(encoding_id as usize).cloned() - } - - pub fn encoding_idx(&self, encoding_id: EncodingId) -> Option { - self.encodings - .iter() - .position(|e| e.id() == encoding_id) - .map(|i| i as u16) + pub fn lookup_encoding(&self, encoding_id: &str) -> Option { + self.encodings.get(encoding_id).cloned() } } -impl Default for SerdeContext { +impl Default for Context { fn default() -> Self { - Self { - encodings: VORTEX_ENCODINGS.iter().cloned().collect::>().into(), + Context { + encodings: HashMap::from_iter( + [ + &BoolEncoding as EncodingRef, + &ChunkedEncoding, + &ConstantEncoding, + &ExtensionEncoding, + &PrimitiveEncoding, + &SparseEncoding, + &StructEncoding, + &VarBinEncoding, + &VarBinViewEncoding, + ] + .iter() + .map(|e| (e.id().to_string(), *e)), + ), } } } diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 925fe9739..69b034e3b 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -1,13 +1,14 @@ use std::fmt::{Debug, Formatter}; +use itertools::Itertools; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; use crate::buffer::Buffer; -use crate::encoding::EncodingRef; +use crate::encoding::{EncodingId, EncodingRef}; use crate::flatbuffers::array as fb; use crate::stats::{EmptyStatistics, Statistics}; -use crate::SerdeContext; +use crate::Context; use crate::{Array, IntoArray, ToArray}; #[derive(Clone)] @@ -16,7 +17,7 @@ pub struct ArrayView<'v> { dtype: &'v DType, array: fb::Array<'v>, buffers: &'v [Buffer<'v>], - ctx: &'v SerdeContext, + ctx: &'v ViewContext, // TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array // metadata, but only the buffers from the selected columns. Therefore we need to know // which fb:Array children to skip when calculating how to slice into buffers. @@ -36,7 +37,7 @@ impl<'a> Debug for ArrayView<'a> { impl<'v> ArrayView<'v> { pub fn try_new( - ctx: &'v SerdeContext, + ctx: &'v ViewContext, dtype: &'v DType, array: fb::Array<'v>, buffers: &'v [Buffer], @@ -151,3 +152,41 @@ impl<'v> IntoArray<'v> for ArrayView<'v> { Array::View(self) } } + +#[derive(Debug)] +pub struct ViewContext { + encodings: Vec, +} + +impl ViewContext { + pub fn new(encodings: Vec) -> Self { + Self { encodings } + } + + pub fn encodings(&self) -> &[EncodingRef] { + self.encodings.as_ref() + } + + pub fn find_encoding(&self, encoding_id: u16) -> Option { + self.encodings.get(encoding_id as usize).cloned() + } + + pub fn encoding_idx(&self, encoding_id: EncodingId) -> Option { + self.encodings + .iter() + .position(|e| e.id() == encoding_id) + .map(|i| i as u16) + } +} + +impl Default for ViewContext { + fn default() -> Self { + todo!("FIXME(ngates): which encodings to enable?") + } +} + +impl From<&Context> for ViewContext { + fn from(value: &Context) -> Self { + ViewContext::new(value.encodings().collect_vec()) + } +} diff --git a/vortex-ipc/benches/ipc_array_reader_take.rs b/vortex-ipc/benches/ipc_array_reader_take.rs index 3003cb41a..14d1d6ed6 100644 --- a/vortex-ipc/benches/ipc_array_reader_take.rs +++ b/vortex-ipc/benches/ipc_array_reader_take.rs @@ -4,7 +4,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use fallible_iterator::FallibleIterator; use itertools::Itertools; use vortex::array::primitive::PrimitiveArray; -use vortex::{IntoArray, SerdeContext}; +use vortex::{IntoArray, ViewContext}; use vortex_dtype::{DType, Nullability, PType}; use vortex_ipc::iter::FallibleLendingIterator; use vortex_ipc::reader::StreamReader; @@ -24,7 +24,7 @@ fn ipc_array_reader_take(c: &mut Criterion) { let mut buffer = vec![]; { let mut cursor = Cursor::new(&mut buffer); - let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, ViewContext::default()).unwrap(); writer .write_schema(&DType::Primitive(PType::I32, Nullability::Nullable)) .unwrap(); diff --git a/vortex-ipc/benches/ipc_take.rs b/vortex-ipc/benches/ipc_take.rs index 14cc68ef2..a5e145b45 100644 --- a/vortex-ipc/benches/ipc_take.rs +++ b/vortex-ipc/benches/ipc_take.rs @@ -11,7 +11,7 @@ use itertools::Itertools; use vortex::array::primitive::PrimitiveArray; use vortex::compress::CompressCtx; use vortex::compute::take::take; -use vortex::{IntoArray, SerdeContext}; +use vortex::{IntoArray, ViewContext}; use vortex_ipc::iter::FallibleLendingIterator; use vortex_ipc::reader::StreamReader; use vortex_ipc::writer::StreamWriter; @@ -59,7 +59,7 @@ fn ipc_take(c: &mut Criterion) { let mut buffer = vec![]; { let mut cursor = Cursor::new(&mut buffer); - let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, ViewContext::default()).unwrap(); writer.write_array(&compressed).unwrap(); } b.iter(|| { diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index d0c8ed0a0..6d05aa324 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -41,7 +41,7 @@ mod tests { use vortex::array::primitive::PrimitiveArray; use vortex::array::r#struct::StructArray; use vortex::validity::Validity; - use vortex::SerdeContext; + use vortex::Context; use vortex::{IntoArray, IntoArrayData}; use crate::iter::FallibleLendingIterator; @@ -69,17 +69,16 @@ mod tests { .into_array(); // let batch = ColumnBatch::from(&arr.to_array()); - + let ctx = Context::default(); let mut cursor = Cursor::new(Vec::new()); - let ctx = SerdeContext::default(); { - let mut writer = StreamWriter::try_new_unbuffered(&mut cursor, ctx).unwrap(); + let mut writer = StreamWriter::try_new_unbuffered(&mut cursor, &ctx).unwrap(); writer.write_array(&arr).unwrap(); } cursor.flush().unwrap(); cursor.set_position(0); - let mut ipc_reader = StreamReader::try_new_unbuffered(cursor).unwrap(); + let mut ipc_reader = StreamReader::try_new_unbuffered(cursor, &ctx).unwrap(); // Read some number of arrays off the stream. while let Some(array_reader) = ipc_reader.next().unwrap() { diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index e9ec46dd7..1e1e224c2 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -1,8 +1,7 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset}; use itertools::Itertools; -use vortex::encoding::find_encoding; use vortex::flatbuffers::array as fba; -use vortex::{ArrayData, SerdeContext}; +use vortex::{ArrayData, Context, ViewContext}; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexError}; use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer}; @@ -17,10 +16,10 @@ pub(crate) enum IPCMessage<'a> { Chunk(IPCChunk<'a>), } -pub(crate) struct IPCContext<'a>(pub &'a SerdeContext); +pub(crate) struct IPCContext<'a>(pub &'a ViewContext); pub(crate) struct IPCSchema<'a>(pub &'a DType); -pub(crate) struct IPCChunk<'a>(pub &'a SerdeContext, pub &'a ArrayData); -pub(crate) struct IPCArray<'a>(pub &'a SerdeContext, pub &'a ArrayData); +pub(crate) struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData); +pub(crate) struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData); impl FlatBufferRoot for IPCMessage<'_> {} impl WriteFlatBuffer for IPCMessage<'_> { @@ -81,16 +80,23 @@ impl<'a> WriteFlatBuffer for IPCContext<'a> { } } -impl<'a> TryFrom> for SerdeContext { +pub struct SerdeContextDeserializer<'a> { + pub(crate) fb: fb::Context<'a>, + pub(crate) ctx: &'a Context, +} + +impl<'a> TryFrom> for ViewContext { type Error = VortexError; - fn try_from(value: fb::Context<'a>) -> Result { - let fb_encodings = value.encodings().ok_or_else(missing("encodings"))?; + fn try_from(deser: SerdeContextDeserializer<'a>) -> Result { + let fb_encodings = deser.fb.encodings().ok_or_else(missing("encodings"))?; let mut encodings = Vec::with_capacity(fb_encodings.len()); for fb_encoding in fb_encodings { let encoding_id = fb_encoding.id().ok_or_else(missing("encoding.id"))?; encodings.push( - find_encoding(encoding_id) + deser + .ctx + .lookup_encoding(encoding_id) .ok_or_else(|| vortex_err!("Stream uses unknown encoding {}", encoding_id))?, ); } diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index 4cea5305a..2da577dc5 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -15,7 +15,7 @@ use vortex::compute::slice::slice; use vortex::compute::take::take; use vortex::stats::{ArrayStatistics, Stat}; use vortex::{ - Array, ArrayDType, ArrayView, IntoArray, OwnedArray, SerdeContext, ToArray, ToStatic, + Array, ArrayDType, ArrayView, Context, IntoArray, OwnedArray, ToArray, ToStatic, ViewContext, }; use vortex_dtype::{match_each_integer_ptype, DType}; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; @@ -24,22 +24,22 @@ use vortex_scalar::Scalar; use crate::flatbuffers::ipc::Message; use crate::iter::{FallibleLendingIterator, FallibleLendingIteratorāļžItem}; +use crate::messages::SerdeContextDeserializer; -#[allow(dead_code)] pub struct StreamReader { read: R, messages: StreamMessageReader, - ctx: SerdeContext, + ctx: ViewContext, } impl StreamReader> { - pub fn try_new(read: R) -> VortexResult { - Self::try_new_unbuffered(BufReader::new(read)) + pub fn try_new(read: R, ctx: &Context) -> VortexResult { + Self::try_new_unbuffered(BufReader::new(read), ctx) } } impl StreamReader { - pub fn try_new_unbuffered(mut read: R) -> VortexResult { + pub fn try_new_unbuffered(mut read: R, ctx: &Context) -> VortexResult { let mut messages = StreamMessageReader::try_new(&mut read)?; match messages.peek() { None => vortex_bail!("IPC stream is empty"), @@ -50,16 +50,16 @@ impl StreamReader { } } - let ctx: SerdeContext = messages - .next(&mut read)? - .header_as_context() - .unwrap() - .try_into()?; + let view_ctx: ViewContext = SerdeContextDeserializer { + fb: messages.next(&mut read)?.header_as_context().unwrap(), + ctx: &ctx, + } + .try_into()?; Ok(Self { read, messages, - ctx, + ctx: view_ctx, }) } @@ -124,7 +124,7 @@ impl FallibleLendingIterator for StreamReader { #[allow(dead_code)] pub struct StreamArrayReader<'a, R: Read> { - ctx: &'a SerdeContext, + ctx: &'a ViewContext, read: &'a mut R, messages: &'a mut StreamMessageReader, dtype: DType, @@ -385,7 +385,7 @@ mod tests { use vortex::array::chunked::{Chunked, ChunkedArray}; use vortex::array::primitive::{Primitive, PrimitiveArray, PrimitiveEncoding}; use vortex::encoding::{ArrayEncoding, EncodingId}; - use vortex::{Array, ArrayDType, ArrayDef, IntoArray, OwnedArray, SerdeContext}; + use vortex::{Array, ArrayDType, ArrayDef, Context, IntoArray, OwnedArray}; use vortex_alp::{ALPArray, ALPEncoding}; use vortex_dtype::NativePType; use vortex_error::VortexResult; @@ -397,6 +397,7 @@ mod tests { #[test] fn test_read_write() { + let ctx = Context::default(); let array = PrimitiveArray::from(vec![0, 1, 2]).into_array(); let chunked_array = ChunkedArray::try_new(vec![array.clone(), array.clone()], array.dtype().clone()) @@ -406,7 +407,7 @@ mod tests { let mut buffer = vec![]; let mut cursor = Cursor::new(&mut buffer); { - let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &ctx).unwrap(); writer.write_array(&array).unwrap(); writer.write_array(&chunked_array).unwrap(); } @@ -416,7 +417,7 @@ mod tests { cursor.set_position(0); { - let mut reader = StreamReader::try_new_unbuffered(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new_unbuffered(&mut cursor, &ctx).unwrap(); let first = reader.read_array().unwrap(); assert_eq!(first.encoding().id(), Primitive::ID); let second = reader.read_array().unwrap(); @@ -475,14 +476,13 @@ mod tests { { let mut cursor = Cursor::new(&mut buffer); { - let mut writer = - StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); writer.write_array(&data).unwrap(); } } let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &Context::default()).unwrap(); let array_reader = reader.next().unwrap().unwrap(); let mut result_iter = array_reader.take(&indices).unwrap(); let result = result_iter.next().unwrap(); @@ -544,14 +544,13 @@ mod tests { { let mut cursor = Cursor::new(&mut buffer); { - let mut writer = - StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); writer.write_array(&chunked).unwrap(); } } let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &Context::default()).unwrap(); let array_reader = reader.next().unwrap().unwrap(); let mut take_iter = array_reader.take(&indices).unwrap(); let next = take_iter.next().unwrap().unwrap(); @@ -601,15 +600,14 @@ mod tests { { let mut cursor = Cursor::new(&mut buffer); { - let mut writer = - StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); writer.write_array(&chunked).unwrap(); writer.write_array(&data).unwrap(); } } let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &Context::default()).unwrap(); let array_reader = reader.next().unwrap().unwrap(); { @@ -663,14 +661,13 @@ mod tests { { let mut cursor = Cursor::new(&mut buffer); { - let mut writer = - StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); writer.write_array(&chunked).unwrap(); } } let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &Context::default()).unwrap(); let array_reader = reader.next().unwrap().unwrap(); let mut iter = array_reader.take(&indices).unwrap(); @@ -696,14 +693,13 @@ mod tests { { let mut cursor = Cursor::new(&mut buffer); { - let mut writer = - StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); writer.write_array(data).unwrap(); } } let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &Context::default()).unwrap(); let array_reader = reader.next().unwrap().unwrap(); let mut take_iter = array_reader.take(&indices).unwrap(); @@ -726,14 +722,13 @@ mod tests { { let mut cursor = Cursor::new(&mut buffer); { - let mut writer = - StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); writer.write_array(data).unwrap(); } } let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &Context::default()).unwrap(); let array_reader = reader.next().unwrap().unwrap(); let mut result_iter = array_reader.take(indices)?; let result = result_iter.next().unwrap(); diff --git a/vortex-ipc/src/writer.rs b/vortex-ipc/src/writer.rs index f8e6516a3..799dadcb9 100644 --- a/vortex-ipc/src/writer.rs +++ b/vortex-ipc/src/writer.rs @@ -2,7 +2,7 @@ use std::io::{BufWriter, Write}; use itertools::Itertools; use vortex::array::chunked::ChunkedArray; -use vortex::{Array, ArrayDType, SerdeContext, ToArrayData}; +use vortex::{Array, ArrayDType, Context, ToArrayData, ViewContext}; use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_flatbuffers::FlatBufferWriter; @@ -13,20 +13,25 @@ use crate::ALIGNMENT; #[allow(dead_code)] pub struct StreamWriter { write: W, - ctx: SerdeContext, + ctx: ViewContext, } impl StreamWriter> { - pub fn try_new(write: W, ctx: SerdeContext) -> VortexResult { + pub fn try_new(write: W, ctx: &Context) -> VortexResult { Self::try_new_unbuffered(BufWriter::new(write), ctx) } } impl StreamWriter { - pub fn try_new_unbuffered(mut write: W, ctx: SerdeContext) -> VortexResult { + pub fn try_new_unbuffered(mut write: W, ctx: &Context) -> VortexResult { + let view_ctx = ViewContext::from(ctx); + // Write the IPC context to the stream - write.write_message(&IPCMessage::Context(IPCContext(&ctx)), ALIGNMENT)?; - Ok(Self { write, ctx }) + write.write_message(&IPCMessage::Context(IPCContext(&view_ctx)), ALIGNMENT)?; + Ok(Self { + write, + ctx: view_ctx, + }) } pub fn write_array(&mut self, array: &Array) -> VortexResult<()> { From f50fdeb2cf773c18cc0461f77a94896a5c8687cf Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 10:21:52 +0100 Subject: [PATCH 02/10] EncodingCtx --- Cargo.lock | 28 ---------------------------- Cargo.toml | 5 ----- vortex-alp/Cargo.toml | 1 - vortex-array/Cargo.toml | 1 - vortex-array/src/context.rs | 2 +- vortex-array/src/encoding.rs | 11 ----------- vortex-array/src/implementation.rs | 4 ---- vortex-array/src/lib.rs | 1 - vortex-datetime-parts/Cargo.toml | 1 - vortex-dict/Cargo.toml | 1 - vortex-fastlanes/Cargo.toml | 1 - vortex-ree/Cargo.toml | 1 - vortex-roaring/Cargo.toml | 1 - vortex-zigzag/Cargo.toml | 1 - 14 files changed, 1 insertion(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e6483b0a..38df597b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2808,26 +2808,6 @@ dependencies = [ "libc", ] -[[package]] -name = "linkme" -version = "0.3.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2cfee0de9bd869589fb9a015e155946d1be5ff415cb844c2caccc6cc4b5db9" -dependencies = [ - "linkme-impl", -] - -[[package]] -name = "linkme-impl" -version = "0.3.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adf157a4dc5a29b7b464aa8fe7edeff30076e07e13646a1c3874f58477dc99f8" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.60", -] - [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -4918,7 +4898,6 @@ version = "0.1.0" dependencies = [ "divan", "itertools 0.12.1", - "linkme", "num-traits", "serde", "vortex-array", @@ -4942,7 +4921,6 @@ dependencies = [ "humansize", "itertools 0.12.1", "lazy_static", - "linkme", "log", "num-traits", "num_enum 0.7.2", @@ -4960,7 +4938,6 @@ dependencies = [ name = "vortex-datetime-parts" version = "0.1.0" dependencies = [ - "linkme", "log", "serde", "vortex-array", @@ -4976,7 +4953,6 @@ dependencies = [ "ahash", "criterion", "hashbrown", - "linkme", "log", "num-traits", "rand", @@ -5022,7 +4998,6 @@ dependencies = [ "criterion", "fastlanez", "itertools 0.12.1", - "linkme", "num-traits", "rand", "serde", @@ -5074,7 +5049,6 @@ name = "vortex-ree" version = "0.1.0" dependencies = [ "itertools 0.12.1", - "linkme", "num-traits", "serde", "vortex-array", @@ -5089,7 +5063,6 @@ version = "0.1.0" dependencies = [ "arrow-buffer", "croaring", - "linkme", "log", "num-traits", "serde", @@ -5118,7 +5091,6 @@ dependencies = [ name = "vortex-zigzag" version = "0.1.0" dependencies = [ - "linkme", "serde", "vortex-array", "vortex-dtype", diff --git a/Cargo.toml b/Cargo.toml index ec5e2182a..b4246ee46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,6 @@ humansize = "2.1.3" itertools = "0.12.1" lazy_static = "1.4.0" leb128 = "0.2.5" -linkme = "0.3.25" log = "0.4.21" num-traits = "0.2.18" num_enum = "0.7.2" @@ -91,7 +90,3 @@ warnings = "deny" [workspace.lints.clippy] all = "deny" - -[profile.dev] -# Need this for linkme crate to work. See https://github.com/dtolnay/linkme/issues/61 -codegen-units = 1 diff --git a/vortex-alp/Cargo.toml b/vortex-alp/Cargo.toml index db1060e3e..9789ad5b5 100644 --- a/vortex-alp/Cargo.toml +++ b/vortex-alp/Cargo.toml @@ -16,7 +16,6 @@ workspace = true [dependencies] itertools = { workspace = true } -linkme = { workspace = true } num-traits = { workspace = true } serde = { workspace = true, features = ["derive"] } vortex-array = { path = "../vortex-array" } diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index a2ed8ebf1..8b561d827 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -28,7 +28,6 @@ flexbuffers = { workspace = true } humansize = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } -linkme = { workspace = true } log = { workspace = true } num-traits = { workspace = true } num_enum = { workspace = true } diff --git a/vortex-array/src/context.rs b/vortex-array/src/context.rs index 10c4fb29a..c9d22d53a 100644 --- a/vortex-array/src/context.rs +++ b/vortex-array/src/context.rs @@ -23,7 +23,7 @@ impl Context { } pub fn encodings(&self) -> impl Iterator + '_ { - self.encodings.iter().map(|(_, encoding)| encoding).cloned() + self.encodings.values().cloned() } pub fn lookup_encoding(&self, encoding_id: &str) -> Option { diff --git a/vortex-array/src/encoding.rs b/vortex-array/src/encoding.rs index 7f2a9c42e..9fc6fbf6b 100644 --- a/vortex-array/src/encoding.rs +++ b/vortex-array/src/encoding.rs @@ -2,7 +2,6 @@ use std::any::Any; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; -use linkme::distributed_slice; use vortex_error::VortexResult; use crate::compress::EncodingCompression; @@ -31,18 +30,8 @@ impl AsRef for EncodingId { } } -#[distributed_slice] -pub static VORTEX_ENCODINGS: [EncodingRef] = [..]; - pub type EncodingRef = &'static dyn ArrayEncoding; -pub fn find_encoding(id: &str) -> Option { - VORTEX_ENCODINGS - .iter() - .find(|&x| x.id().as_ref() == id) - .cloned() -} - /// Object-safe encoding trait for an array. pub trait ArrayEncoding: 'static + Sync + Send + Debug { fn as_any(&self) -> &dyn Any; diff --git a/vortex-array/src/implementation.rs b/vortex-array/src/implementation.rs index d3491abac..3efa2a904 100644 --- a/vortex-array/src/implementation.rs +++ b/vortex-array/src/implementation.rs @@ -48,7 +48,6 @@ macro_rules! impl_encoding { ArrayEncodingExt, EncodingId, EncodingRef, - VORTEX_ENCODINGS, }; use $crate::stats::StatsSet; use std::any::Any; @@ -136,9 +135,6 @@ macro_rules! impl_encoding { /// The array encoding #[derive(Debug)] pub struct [<$Name Encoding>]; - #[$crate::linkme::distributed_slice(VORTEX_ENCODINGS)] - #[allow(non_upper_case_globals)] - static []: EncodingRef = &[<$Name Encoding>]; impl ArrayEncoding for [<$Name Encoding>] { fn as_any(&self) -> &dyn Any { self diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index a12525873..ae04d6832 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -26,7 +26,6 @@ pub use context::*; pub use data::*; pub use flatten::*; pub use implementation::*; -pub use linkme; pub use metadata::*; pub use typed::*; pub use view::*; diff --git a/vortex-datetime-parts/Cargo.toml b/vortex-datetime-parts/Cargo.toml index cbcf57f4c..59b2dc73d 100644 --- a/vortex-datetime-parts/Cargo.toml +++ b/vortex-datetime-parts/Cargo.toml @@ -11,6 +11,5 @@ vortex-array = { "path" = "../vortex-array" } vortex-error = { path = "../vortex-error" } vortex-dtype = { "path" = "../vortex-dtype" } vortex-scalar = { path = "../vortex-scalar" } -linkme = { workspace = true } log = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/vortex-dict/Cargo.toml b/vortex-dict/Cargo.toml index b9857b82a..29c8b0acf 100644 --- a/vortex-dict/Cargo.toml +++ b/vortex-dict/Cargo.toml @@ -14,7 +14,6 @@ rust-version = { workspace = true } [dependencies] ahash = { workspace = true } hashbrown = { workspace = true } -linkme = { workspace = true } num-traits = { workspace = true } serde = { workspace = true } vortex-array = { path = "../vortex-array" } diff --git a/vortex-fastlanes/Cargo.toml b/vortex-fastlanes/Cargo.toml index 0187225dd..8b472a573 100644 --- a/vortex-fastlanes/Cargo.toml +++ b/vortex-fastlanes/Cargo.toml @@ -18,7 +18,6 @@ workspace = true arrayref = { workspace = true } fastlanez = { path = "../fastlanez" } itertools = { workspace = true } -linkme = { workspace = true } num-traits = { workspace = true } serde = { workspace = true } vortex-array = { path = "../vortex-array" } diff --git a/vortex-ree/Cargo.toml b/vortex-ree/Cargo.toml index 4996bfa46..7d4caff55 100644 --- a/vortex-ree/Cargo.toml +++ b/vortex-ree/Cargo.toml @@ -13,7 +13,6 @@ rust-version = { workspace = true } [dependencies] itertools = { workspace = true } -linkme = { workspace = true } num-traits = { workspace = true } serde = { workspace = true } vortex-array = { path = "../vortex-array" } diff --git a/vortex-roaring/Cargo.toml b/vortex-roaring/Cargo.toml index a31d9015a..3a7274121 100644 --- a/vortex-roaring/Cargo.toml +++ b/vortex-roaring/Cargo.toml @@ -17,7 +17,6 @@ vortex-array = { path = "../vortex-array" } vortex-error = { path = "../vortex-error" } vortex-dtype = { path = "../vortex-dtype" } vortex-scalar = { path = "../vortex-scalar" } -linkme = { workspace = true } croaring = { workspace = true } num-traits = { workspace = true } log = { workspace = true } diff --git a/vortex-zigzag/Cargo.toml b/vortex-zigzag/Cargo.toml index 43f5d3930..8c2ab6464 100644 --- a/vortex-zigzag/Cargo.toml +++ b/vortex-zigzag/Cargo.toml @@ -12,7 +12,6 @@ edition = { workspace = true } rust-version = { workspace = true } [dependencies] -linkme = { workspace = true } vortex-array = { path = "../vortex-array" } vortex-error = { path = "../vortex-error" } vortex-dtype = { path = "../vortex-dtype" } From aec53c7992bd440f61bad4dab93283f214ac27af Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 10:49:26 +0100 Subject: [PATCH 03/10] EncodingCtx --- bench-vortex/src/lib.rs | 32 ++--- bench-vortex/src/reader.rs | 10 +- vortex-alp/src/compress.rs | 4 +- vortex-array/Cargo.toml | 2 +- vortex-array/src/array/sparse/compress.rs | 4 +- vortex-array/src/compress.rs | 127 +++++++----------- vortex-array/src/context.rs | 6 + vortex-array/src/implementation.rs | 2 +- vortex-datetime-parts/src/compress.rs | 6 +- vortex-dict/src/compress.rs | 11 +- vortex-fastlanes/benches/bitpacking_take.rs | 6 +- vortex-fastlanes/src/bitpacking/compress.rs | 33 ++--- .../src/bitpacking/compute/mod.rs | 34 ++--- vortex-fastlanes/src/bitpacking/mod.rs | 48 +++---- vortex-fastlanes/src/delta/compress.rs | 23 ++-- vortex-fastlanes/src/for/compress.rs | 36 ++--- vortex-fastlanes/src/for/compute.rs | 5 +- vortex-ipc/benches/ipc_take.rs | 8 +- vortex-ipc/src/messages.rs | 2 +- vortex-ipc/src/reader.rs | 4 +- vortex-ree/src/compress.rs | 13 +- vortex-ree/src/compute.rs | 2 +- vortex-roaring/src/boolean/compress.rs | 4 +- vortex-roaring/src/integer/compress.rs | 4 +- vortex-zigzag/src/compress.rs | 15 +-- 25 files changed, 191 insertions(+), 250 deletions(-) diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 8904944a2..d0b031753 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -3,7 +3,6 @@ use std::env::temp_dir; use std::fs::{create_dir_all, File}; use std::path::{Path, PathBuf}; -use std::sync::Arc; use arrow_array::RecordBatchReader; use humansize::DECIMAL; @@ -14,9 +13,9 @@ use parquet::arrow::ProjectionMask; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowType; -use vortex::compress::{CompressConfig, CompressCtx}; +use vortex::compress::Compressor; use vortex::encoding::EncodingRef; -use vortex::{IntoArray, OwnedArray, ToArrayData}; +use vortex::{Context, IntoArray, OwnedArray, ToArrayData}; use vortex_alp::ALPEncoding; use vortex_datetime_parts::DateTimePartsEncoding; use vortex_dict::DictEncoding; @@ -104,9 +103,9 @@ pub fn setup_logger(level: LevelFilter) { .unwrap(); } -pub fn enumerate_arrays() -> Vec { - vec![ - &ALPEncoding, +pub fn ctx() -> Context { + Context::default().with_encodings([ + &ALPEncoding as EncodingRef, &DictEncoding, &BitPackedEncoding, &FoREncoding, @@ -117,13 +116,7 @@ pub fn enumerate_arrays() -> Vec { // &RoaringIntEncoding, // Doesn't offer anything more than FoR really // &ZigZagEncoding, - ] -} - -pub fn compress_ctx() -> CompressCtx { - let cfg = CompressConfig::new().with_enabled(enumerate_arrays()); - info!("Compression config {cfg:?}"); - CompressCtx::new(Arc::new(cfg)) + ]) } pub fn compress_taxi_data() -> OwnedArray { @@ -143,7 +136,6 @@ pub fn compress_taxi_data() -> OwnedArray { .build() .unwrap(); - let ctx = compress_ctx(); let schema = reader.schema(); let mut uncompressed_size: usize = 0; let chunks = reader @@ -152,7 +144,9 @@ pub fn compress_taxi_data() -> OwnedArray { .map(|batch| batch.to_array_data().into_array()) .map(|array| { uncompressed_size += array.nbytes(); - ctx.clone().compress(&array, None).unwrap() + Compressor::new(&ctx(), &Default::default()) + .compress(&array, None) + .unwrap() }) .collect_vec(); @@ -222,13 +216,14 @@ mod test { use log::LevelFilter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use vortex::arrow::FromArrowArray; + use vortex::compress::Compressor; use vortex::compute::as_arrow::as_arrow; use vortex::{ArrayData, Context, IntoArray}; use vortex_ipc::reader::StreamReader; use vortex_ipc::writer::StreamWriter; use crate::taxi_data::taxi_data_parquet; - use crate::{compress_ctx, compress_taxi_data, setup_logger}; + use crate::{compress_taxi_data, ctx, setup_logger}; #[ignore] #[test] @@ -286,13 +281,14 @@ mod test { let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let reader = builder.with_limit(1).build().unwrap(); - let ctx = compress_ctx(); for record_batch in reader.map(|batch_result| batch_result.unwrap()) { let struct_arrow: ArrowStructArray = record_batch.into(); let arrow_array: ArrowArrayRef = Arc::new(struct_arrow); let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array(); - let compressed = ctx.clone().compress(&vortex_array, None).unwrap(); + let compressed = Compressor::new(&ctx(), &Default::default()) + .compress(&vortex_array, None) + .unwrap(); let compressed_as_arrow = as_arrow(&compressed).unwrap(); assert_eq!(compressed_as_arrow.deref(), arrow_array.deref()); } diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index aef9fad11..730bda210 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -19,6 +19,7 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::runtime::Runtime; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowType; +use vortex::compress::Compressor; use vortex::compute::take::take; use vortex::{Context, IntoArray, OwnedArray, ToArrayData, ToStatic}; use vortex_dtype::DType; @@ -27,7 +28,7 @@ use vortex_ipc::iter::FallibleLendingIterator; use vortex_ipc::reader::StreamReader; use vortex_ipc::writer::StreamWriter; -use crate::compress_ctx; +use crate::ctx; pub const BATCH_SIZE: usize = 65_536; @@ -55,7 +56,7 @@ pub fn rewrite_parquet_as_vortex( Ok(()) } -pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult { +pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult> { let taxi_pq = File::open(parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?; @@ -63,13 +64,14 @@ pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult> { let like_alp = like.map(|like_array| like_array.as_array_ref()); let like_exponents = like diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 8b561d827..afda03561 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -16,7 +16,7 @@ name = "vortex" path = "src/lib.rs" [lints] -workspace = true +# workspace = true [dependencies] arrow-array = { workspace = true } diff --git a/vortex-array/src/array/sparse/compress.rs b/vortex-array/src/array/sparse/compress.rs index 5e27edadc..414349dcf 100644 --- a/vortex-array/src/array/sparse/compress.rs +++ b/vortex-array/src/array/sparse/compress.rs @@ -1,7 +1,7 @@ use vortex_error::VortexResult; use crate::array::sparse::{Sparse, SparseArray, SparseEncoding}; -use crate::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use crate::compress::{CompressConfig, Compressor, EncodingCompression}; use crate::{Array, ArrayDef, ArrayTrait, IntoArray, OwnedArray}; impl EncodingCompression for SparseEncoding { @@ -21,7 +21,7 @@ impl EncodingCompression for SparseEncoding { &self, array: &Array, like: Option<&Array>, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { let sparse_array = SparseArray::try_from(array)?; let sparse_like = like.map(|la| SparseArray::try_from(la).unwrap()); diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index b3c19a6f0..7ab1c65ec 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -1,22 +1,21 @@ use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; -use std::sync::Arc; use log::{debug, info, warn}; use vortex_error::{vortex_bail, VortexResult}; -use crate::array::chunked::{Chunked, ChunkedArray, ChunkedEncoding}; +use crate::array::chunked::{Chunked, ChunkedArray}; use crate::array::constant::{Constant, ConstantArray}; -use crate::array::r#struct::{Struct, StructArray, StructEncoding}; -use crate::array::sparse::SparseEncoding; -use crate::array::varbin::VarBinEncoding; +use crate::array::r#struct::{Struct, StructArray}; use crate::compute::scalar_at::scalar_at; use crate::compute::slice::slice; use crate::encoding::{ArrayEncoding, EncodingRef}; use crate::sampling::stratified_slices; use crate::stats::ArrayStatistics; use crate::validity::Validity; -use crate::{compute, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, OwnedArray, ToStatic}; +use crate::{ + compute, Array, ArrayDType, ArrayDef, ArrayTrait, Context, IntoArray, OwnedArray, ToStatic, +}; pub trait EncodingCompression: ArrayEncoding { fn cost(&self) -> u8 { @@ -35,7 +34,7 @@ pub trait EncodingCompression: ArrayEncoding { &self, _array: &Array, _like: Option<&Array>, - _ctx: CompressCtx, + _ctx: Compressor, ) -> VortexResult { vortex_bail!(NotImplemented: "compress", self.id()) } @@ -55,7 +54,6 @@ pub struct CompressConfig { max_depth: u8, // TODO(ngates): can each encoding define their own configs? pub ree_average_run_threshold: f32, - encodings: HashSet, } impl Default for CompressConfig { @@ -68,63 +66,33 @@ impl Default for CompressConfig { sample_count: 8, max_depth: 3, ree_average_run_threshold: 2.0, - encodings: HashSet::from([ - &ChunkedEncoding as EncodingRef, - &SparseEncoding, - &StructEncoding, - &VarBinEncoding, - ]), } } } -impl CompressConfig { - pub fn new() -> Self { - Self::default() - } - - pub fn with_enabled>(self, encodings: E) -> Self { - let mut new_self = self.clone(); - encodings.into_iter().for_each(|e| { - new_self.encodings.insert(e); - }); - new_self - } - - pub fn with_disabled>(self, disabled_encodings: E) -> Self { - let mut new_self = self.clone(); - disabled_encodings.into_iter().for_each(|e| { - new_self.encodings.remove(e); - }); - new_self - } - - pub fn is_enabled(&self, encoding: EncodingRef) -> bool { - self.encodings.contains(&encoding) - } -} - #[derive(Debug, Clone)] -pub struct CompressCtx { +pub struct Compressor<'a> { + ctx: &'a Context, + options: &'a CompressConfig, + path: Vec, - // TODO(ngates): put this back to a reference - options: Arc, depth: u8, /// A set of encodings disabled for this ctx. disabled_encodings: HashSet, } -impl Display for CompressCtx { +impl Display for Compressor<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "[{}|{}]", self.depth, self.path.join(".")) } } -impl CompressCtx { - pub fn new(options: Arc) -> Self { +impl<'a> Compressor<'a> { + pub fn new(ctx: &'a Context, options: &'a CompressConfig) -> Self { Self { - path: Vec::new(), + ctx, options, + path: Vec::new(), depth: 0, disabled_encodings: HashSet::new(), } @@ -152,8 +120,8 @@ impl CompressCtx { } #[inline] - pub fn options(&self) -> Arc { - self.options.clone() + pub fn options(&self) -> &CompressConfig { + &self.options } pub fn excluding(&self, encoding: EncodingRef) -> Self { @@ -162,8 +130,6 @@ impl CompressCtx { cloned } - // We don't take a reference to self to force the caller to think about whether to use - // an auxilliary ctx. pub fn compress(&self, arr: &Array, like: Option<&Array>) -> VortexResult { if arr.is_empty() { return Ok(arr.to_static()); @@ -174,7 +140,7 @@ impl CompressCtx { if let Some(compressed) = l .encoding() .compression() - .can_compress(arr, self.options().as_ref()) + .can_compress(arr, self.options()) .map(|c| c.compress(arr, Some(l), self.for_encoding(l.encoding().compression()))) { let compressed = compressed?; @@ -208,7 +174,7 @@ impl CompressCtx { Ok(compressed) } - pub fn compress_validity<'a>(&self, validity: Validity<'a>) -> VortexResult> { + pub fn compress_validity<'v>(&self, validity: Validity<'v>) -> VortexResult> { match validity { Validity::Array(a) => Ok(Validity::Array(self.compress(&a, None)?)), a => Ok(a), @@ -258,13 +224,10 @@ impl CompressCtx { } } -impl Default for CompressCtx { - fn default() -> Self { - Self::new(Arc::new(CompressConfig::default())) - } -} - -pub fn sampled_compression(array: &Array, ctx: &CompressCtx) -> VortexResult> { +pub fn sampled_compression( + array: &Array, + compressor: &Compressor, +) -> VortexResult> { // First, we try constant compression and shortcut any sampling. if !array.is_empty() && array.statistics().compute_is_constant().unwrap_or(false) { return Ok(Some( @@ -272,21 +235,20 @@ pub fn sampled_compression(array: &Array, ctx: &CompressCtx) -> VortexResult = ctx - .options - .encodings - .iter() - .filter(|&encoding| !ctx.disabled_encodings.contains(encoding)) + let mut candidates: Vec<&dyn EncodingCompression> = compressor + .ctx + .encodings() + .filter(|&encoding| !compressor.disabled_encodings.contains(encoding)) .map(|encoding| encoding.compression()) .filter(|compression| { if compression - .can_compress(array, ctx.options().as_ref()) + .can_compress(array, compressor.options()) .is_some() { - if ctx.depth + compression.cost() > ctx.options.max_depth { + if compressor.depth + compression.cost() > compressor.options.max_depth { debug!( "{} skipping encoding {} due to depth", - ctx, + compressor, compression.id() ); return false; @@ -297,12 +259,12 @@ pub fn sampled_compression(array: &Array, ctx: &CompressCtx) -> VortexResult VortexResult VortexResult>(), )?; - find_best_compression(candidates, &sample, ctx)? + find_best_compression(candidates, &sample, compressor)? .map(|(compression, best)| { - info!("{} compressing array {} like {}", ctx, array, best); - ctx.for_encoding(compression).compress(array, Some(&best)) + info!("{} compressing array {} like {}", compressor, array, best); + compressor + .for_encoding(compression) + .compress(array, Some(&best)) }) .transpose() } @@ -349,7 +315,7 @@ pub fn sampled_compression(array: &Array, ctx: &CompressCtx) -> VortexResult( candidates: Vec<&'a dyn EncodingCompression>, sample: &Array, - ctx: &CompressCtx, + ctx: &Compressor, ) -> VortexResult> { let mut best = None; let mut best_ratio = 1.0; @@ -360,10 +326,7 @@ fn find_best_compression<'a>( compression.id(), sample ); - if compression - .can_compress(sample, ctx.options.as_ref()) - .is_none() - { + if compression.can_compress(sample, ctx.options()).is_none() { continue; } let compressed_sample = diff --git a/vortex-array/src/context.rs b/vortex-array/src/context.rs index c9d22d53a..a876c3db8 100644 --- a/vortex-array/src/context.rs +++ b/vortex-array/src/context.rs @@ -22,6 +22,12 @@ impl Context { self } + pub fn with_encodings>(mut self, encodings: E) -> Self { + self.encodings + .extend(encodings.into_iter().map(|e| (e.id().to_string(), e))); + self + } + pub fn encodings(&self) -> impl Iterator + '_ { self.encodings.values().cloned() } diff --git a/vortex-array/src/implementation.rs b/vortex-array/src/implementation.rs index 3efa2a904..33e4631d5 100644 --- a/vortex-array/src/implementation.rs +++ b/vortex-array/src/implementation.rs @@ -78,7 +78,7 @@ macro_rules! impl_encoding { pub fn array(&'a self) -> &'a Array<'a> { self.typed.array() } - pub fn metadata(&'a self) -> &'a [<$Name Metadata>] { + fn metadata(&'a self) -> &'a [<$Name Metadata>] { self.typed.metadata() } diff --git a/vortex-datetime-parts/src/compress.rs b/vortex-datetime-parts/src/compress.rs index dadc9cd3d..8ddac6812 100644 --- a/vortex-datetime-parts/src/compress.rs +++ b/vortex-datetime-parts/src/compress.rs @@ -1,6 +1,6 @@ use vortex::array::datetime::{LocalDateTimeArray, TimeUnit}; use vortex::array::primitive::PrimitiveArray; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::compute::cast::cast; use vortex::{Array, ArrayTrait, IntoArray, OwnedArray}; use vortex_dtype::PType; @@ -24,7 +24,7 @@ impl EncodingCompression for DateTimePartsEncoding { &self, array: &Array, like: Option<&Array>, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { compress_localdatetime( LocalDateTimeArray::try_from(array)?, @@ -37,7 +37,7 @@ impl EncodingCompression for DateTimePartsEncoding { fn compress_localdatetime( array: LocalDateTimeArray, like: Option, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { let timestamps = cast(&array.timestamps(), PType::I64.into())?.flatten_primitive()?; diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index 5a44e957b..5a53c298b 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -7,7 +7,7 @@ use num_traits::AsPrimitive; use vortex::accessor::ArrayAccessor; use vortex::array::primitive::{Primitive, PrimitiveArray}; use vortex::array::varbin::{VarBin, VarBinArray}; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::ArrayStatistics; use vortex::validity::Validity; use vortex::{Array, ArrayDType, ArrayDef, IntoArray, OwnedArray, ToArray}; @@ -46,7 +46,7 @@ impl EncodingCompression for DictEncoding { &self, array: &Array, like: Option<&Array>, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { let dict_like = like.map(|like_arr| DictArray::try_from(like_arr).unwrap()); let dict_like_ref = dict_like.as_ref(); @@ -254,6 +254,7 @@ mod test { use vortex::array::primitive::PrimitiveArray; use vortex::array::varbin::VarBinArray; use vortex::compute::scalar_at::scalar_at; + use vortex::ToArray; use vortex_scalar::PrimitiveScalar; use crate::compress::{dict_encode_typed_primitive, dict_encode_varbin}; @@ -284,15 +285,15 @@ mod test { &[1, 1, 0, 2, 2, 0, 2, 0] ); assert_eq!( - scalar_at(values.array(), 0).unwrap(), + scalar_at(&values.to_array(), 0).unwrap(), PrimitiveScalar::nullable::(None).into() ); assert_eq!( - scalar_at(values.array(), 1).unwrap(), + scalar_at(&values.to_array(), 1).unwrap(), PrimitiveScalar::nullable(Some(1)).into() ); assert_eq!( - scalar_at(values.array(), 2).unwrap(), + scalar_at(&values.to_array(), 2).unwrap(), PrimitiveScalar::nullable(Some(3)).into() ); } diff --git a/vortex-fastlanes/benches/bitpacking_take.rs b/vortex-fastlanes/benches/bitpacking_take.rs index 609ba835f..0466287c6 100644 --- a/vortex-fastlanes/benches/bitpacking_take.rs +++ b/vortex-fastlanes/benches/bitpacking_take.rs @@ -6,7 +6,7 @@ use rand::distributions::Uniform; use rand::{thread_rng, Rng}; use vortex::array::primitive::PrimitiveArray; use vortex::array::sparse::SparseArray; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::compute::take::take; use vortex::encoding::EncodingRef; use vortex_fastlanes::{BitPackedArray, BitPackedEncoding}; @@ -19,7 +19,7 @@ fn values(len: usize, bits: usize) -> Vec { fn bench_take(c: &mut Criterion) { let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); + let ctx = Compressor::new(Arc::new(cfg)); let values = values(1_000_000, 8); let uncompressed = PrimitiveArray::from(values.clone()); @@ -57,7 +57,7 @@ fn bench_take(c: &mut Criterion) { fn bench_patched_take(c: &mut Criterion) { let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); + let ctx = Compressor::new(Arc::new(cfg)); let big_base2 = 1048576; let num_exceptions = 10000; diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 81753c875..ea1010a0d 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -2,7 +2,7 @@ use arrayref::array_ref; use fastlanez::TryBitPack; use vortex::array::primitive::PrimitiveArray; use vortex::array::sparse::{Sparse, SparseArray}; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::compute::cast::cast; use vortex::stats::ArrayStatistics; use vortex::validity::Validity; @@ -12,7 +12,7 @@ use vortex_dtype::{match_each_integer_ptype, NativePType, PType}; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_scalar::Scalar; -use crate::{match_integers_by_width, BitPackedArray, BitPackedEncoding}; +use crate::{match_integers_by_width, BitPackedArray, BitPackedEncoding, OwnedBitPackedArray}; impl EncodingCompression for BitPackedEncoding { fn cost(&self) -> u8 { @@ -48,7 +48,7 @@ impl EncodingCompression for BitPackedEncoding { &self, array: &Array, like: Option<&Array>, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { let parray = array.as_primitive(); let bit_width_freq = parray.statistics().compute_bit_width_freq()?; @@ -88,7 +88,7 @@ impl EncodingCompression for BitPackedEncoding { pub(crate) fn bitpack_encode( array: PrimitiveArray<'_>, bit_width: usize, -) -> VortexResult { +) -> VortexResult { let bit_width_freq = array.statistics().compute_bit_width_freq()?; let num_exceptions = count_exceptions(bit_width, &bit_width_freq); @@ -356,13 +356,15 @@ fn count_exceptions(bit_width: usize, bit_width_freq: &[usize]) -> usize { #[cfg(test)] mod test { - use std::sync::Arc; - - use vortex::encoding::{ArrayEncoding, EncodingRef}; - use vortex::ToArray; + use vortex::encoding::ArrayEncoding; + use vortex::{Context, ToArray}; use super::*; + fn ctx() -> Context { + Context::default().with_encoding(&BitPackedEncoding) + } + #[test] fn test_best_bit_width() { // 10 1-bit values, 20 2-bit, etc. @@ -373,10 +375,7 @@ mod test { #[test] fn test_compress() { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); - - let compressed = ctx + let compressed = Compressor::new(&ctx(), &Default::default()) .compress( PrimitiveArray::from(Vec::from_iter((0..10_000).map(|i| (i % 63) as u8))).array(), None, @@ -395,11 +394,13 @@ mod test { } fn compression_roundtrip(n: usize) { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); - let values = PrimitiveArray::from(Vec::from_iter((0..n).map(|i| (i % 2047) as u16))); - let compressed = ctx.compress(values.array(), None).unwrap(); + let compressed = Compressor::new( + &crate::bitpacking::compress::test::ctx(), + &Default::default(), + ) + .compress(values.array(), None) + .unwrap(); let compressed = BitPackedArray::try_from(compressed).unwrap(); let decompressed = compressed.to_array().flatten_primitive().unwrap(); assert_eq!(decompressed.typed_data::(), values.typed_data::()); diff --git a/vortex-fastlanes/src/bitpacking/compute/mod.rs b/vortex-fastlanes/src/bitpacking/compute/mod.rs index 1806b5561..52b8d4089 100644 --- a/vortex-fastlanes/src/bitpacking/compute/mod.rs +++ b/vortex-fastlanes/src/bitpacking/compute/mod.rs @@ -175,29 +175,29 @@ fn do_patch_for_take_primitive( #[cfg(test)] mod test { - use std::sync::Arc; - use itertools::Itertools; use rand::distributions::Uniform; use rand::{thread_rng, Rng}; use vortex::array::primitive::{Primitive, PrimitiveArray}; use vortex::array::sparse::SparseArray; - use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; + use vortex::compress::Compressor; use vortex::compute::scalar_at::scalar_at; use vortex::compute::take::take; - use vortex::encoding::EncodingRef; - use vortex::{ArrayDef, IntoArray}; + use vortex::{ArrayDef, Context, IntoArray}; use crate::{BitPackedArray, BitPackedEncoding}; + fn ctx() -> Context { + Context::default().with_encoding(&BitPackedEncoding) + } + #[test] fn take_indices() { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); - let indices = PrimitiveArray::from(vec![0, 125, 2047, 2049, 2151, 2790]); let unpacked = PrimitiveArray::from((0..4096).map(|i| (i % 63) as u8).collect::>()); - let bitpacked = ctx.compress(unpacked.array(), None).unwrap(); + let bitpacked = Compressor::new(&ctx(), &Default::default()) + .compress(unpacked.array(), None) + .unwrap(); let result = take(&bitpacked, indices.array()).unwrap(); assert_eq!(result.encoding().id(), Primitive::ID); let primitive_result = result.flatten_primitive().unwrap(); @@ -207,16 +207,10 @@ mod test { #[test] fn take_random_indices() { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); - let num_patches: usize = 128; let values = (0..u16::MAX as u32 + num_patches as u32).collect::>(); let uncompressed = PrimitiveArray::from(values.clone()); - let packed = BitPackedEncoding {} - .compress(uncompressed.array(), None, ctx) - .unwrap(); - let packed = BitPackedArray::try_from(packed).unwrap(); + let packed = BitPackedArray::encode(uncompressed.array(), 16).unwrap(); assert!(packed.patches().is_some()); let patches = SparseArray::try_from(packed.patches().unwrap()).unwrap(); @@ -254,15 +248,9 @@ mod test { #[test] fn test_scalar_at() { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); - let values = (0u32..257).collect_vec(); let uncompressed = PrimitiveArray::from(values.clone()).into_array(); - let packed = BitPackedEncoding - .compress(&uncompressed, None, ctx) - .unwrap(); - let packed = BitPackedArray::try_from(packed).unwrap(); + let packed = BitPackedArray::encode(&uncompressed, 8).unwrap(); assert!(packed.patches().is_some()); let patches = SparseArray::try_from(packed.patches().unwrap()).unwrap(); diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index 2deeddc05..070981bba 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -119,7 +119,7 @@ impl BitPackedArray<'_> { )) } - pub fn encode(array: Array<'_>, bit_width: usize) -> VortexResult { + pub fn encode(array: &Array<'_>, bit_width: usize) -> VortexResult { if let Ok(parray) = PrimitiveArray::try_from(array) { Ok(bitpack_encode(parray, bit_width)?) } else { @@ -192,33 +192,22 @@ macro_rules! match_integers_by_width { #[cfg(test)] mod test { - use std::sync::Arc; - use vortex::array::primitive::PrimitiveArray; - use vortex::compress::{CompressConfig, CompressCtx}; use vortex::compute::scalar_at::scalar_at; use vortex::compute::slice::slice; - use vortex::encoding::EncodingRef; use vortex::IntoArray; - use crate::{BitPackedArray, BitPackedEncoding}; + use crate::BitPackedArray; #[test] fn slice_within_block() { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); - - let compressed = slice( - &ctx.compress( - PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()) - .array(), - None, - ) - .unwrap(), - 768, - 9999, + let packed = BitPackedArray::encode( + &PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()).array(), + 16, ) .unwrap(); + + let compressed = slice(packed.array(), 768, 9999).unwrap(); assert_eq!( scalar_at(&compressed, 0).unwrap(), ((768 % 63) as u8).into() @@ -231,20 +220,13 @@ mod test { #[test] fn slice_block_boundary() { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = CompressCtx::new(Arc::new(cfg)); - - let compressed = slice( - &ctx.compress( - PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()) - .array(), - None, - ) - .unwrap(), - 7168, - 9216, + let packed = BitPackedArray::encode( + &PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()).array(), + 16, ) .unwrap(); + + let compressed = slice(packed.array(), 7168, 9216).unwrap(); assert_eq!( scalar_at(&compressed, 0).unwrap(), ((7168 % 63) as u8).into() @@ -259,7 +241,7 @@ mod test { fn test_encode() { let values = vec![Some(1), None, Some(1), None, Some(1), None, Some(u64::MAX)]; let uncompressed = PrimitiveArray::from_nullable_vec(values); - let packed = BitPackedArray::encode(uncompressed.into_array(), 1).unwrap(); + let packed = BitPackedArray::encode(uncompressed.array(), 1).unwrap(); let expected = &[1, 0, 1, 0, 1, 0, u64::MAX]; let results = packed .into_array() @@ -274,9 +256,9 @@ mod test { fn test_encode_too_wide() { let values = vec![Some(1u8), None, Some(1), None, Some(1), None]; let uncompressed = PrimitiveArray::from_nullable_vec(values); - let _packed = BitPackedArray::encode(uncompressed.clone().into_array(), 8) + let _packed = BitPackedArray::encode(uncompressed.array(), 8) .expect_err("Cannot pack value into the same width"); - let _packed = BitPackedArray::encode(uncompressed.into_array(), 9) + let _packed = BitPackedArray::encode(uncompressed.array(), 9) .expect_err("Cannot pack value into larger width"); } } diff --git a/vortex-fastlanes/src/delta/compress.rs b/vortex-fastlanes/src/delta/compress.rs index 1cd6e3270..337652460 100644 --- a/vortex-fastlanes/src/delta/compress.rs +++ b/vortex-fastlanes/src/delta/compress.rs @@ -4,7 +4,7 @@ use arrayref::array_ref; use fastlanez::{transpose, untranspose_into, Delta}; use num_traits::{WrappingAdd, WrappingSub}; use vortex::array::primitive::PrimitiveArray; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::compute::fill::fill_forward; use vortex::validity::Validity; use vortex::{Array, IntoArray, OwnedArray}; @@ -35,7 +35,7 @@ impl EncodingCompression for DeltaEncoding { &self, array: &Array, like: Option<&Array>, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { let parray = PrimitiveArray::try_from(array)?; let like_delta = like.map(|l| DeltaArray::try_from(l).unwrap()); @@ -191,15 +191,13 @@ where #[cfg(test)] mod test { - use std::sync::Arc; - - use vortex::encoding::{ArrayEncoding, EncodingRef}; + use vortex::encoding::ArrayEncoding; + use vortex::Context; use super::*; - fn compress_ctx() -> CompressCtx { - let cfg = CompressConfig::new().with_enabled([&DeltaEncoding as EncodingRef]); - CompressCtx::new(Arc::new(cfg)) + fn ctx() -> Context { + Context::default().with_encoding(&DeltaEncoding) } #[test] @@ -215,9 +213,12 @@ mod test { } fn do_roundtrip_test(input: Vec) { - let ctx = compress_ctx(); - let compressed = DeltaEncoding {} - .compress(PrimitiveArray::from(input.clone()).array(), None, ctx) + let compressed = DeltaEncoding + .compress( + PrimitiveArray::from(input.clone()).array(), + None, + Compressor::new(&ctx(), &Default::default()), + ) .unwrap(); assert_eq!(compressed.encoding().id(), DeltaEncoding.id()); diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index 8d9e5e01f..5569088af 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -2,7 +2,7 @@ use itertools::Itertools; use num_traits::{PrimInt, WrappingAdd, WrappingSub}; use vortex::array::constant::ConstantArray; use vortex::array::primitive::PrimitiveArray; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::{ArrayStatistics, Stat}; use vortex::{Array, ArrayDType, ArrayTrait, IntoArray, OwnedArray}; use vortex_dtype::{match_each_integer_ptype, NativePType, PType}; @@ -42,7 +42,7 @@ impl EncodingCompression for FoREncoding { &self, array: &Array, like: Option<&Array>, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { let parray = PrimitiveArray::try_from(array)?; let shift = trailing_zeros(array); @@ -141,29 +141,27 @@ fn trailing_zeros(array: &Array) -> u8 { #[cfg(test)] mod test { - use std::sync::Arc; use vortex::compute::scalar_at::ScalarAtFn; use vortex::encoding::{ArrayEncoding, EncodingRef}; + use vortex::Context; use super::*; use crate::BitPackedEncoding; - fn compress_ctx() -> CompressCtx { - let cfg = CompressConfig::new() - // We need some BitPacking else we will need choose FoR. - .with_enabled([&FoREncoding as EncodingRef, &BitPackedEncoding]); - CompressCtx::new(Arc::new(cfg)) + fn ctx() -> Context { + // We need some BitPacking else we will need choose FoR. + Context::default().with_encodings([&FoREncoding as EncodingRef, &BitPackedEncoding]) } #[test] fn test_compress() { - let ctx = compress_ctx(); - // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let compressed = ctx.compress(array.array(), None).unwrap(); + let compressed = Compressor::new(&ctx(), &Default::default()) + .compress(array.array(), None) + .unwrap(); assert_eq!(compressed.encoding().id(), FoREncoding.id()); assert_eq!( u32::try_from(FoRArray::try_from(compressed).unwrap().reference()).unwrap(), @@ -173,11 +171,11 @@ mod test { #[test] fn test_decompress() { - let ctx = compress_ctx(); - // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let compressed = ctx.compress(array.array(), None).unwrap(); + let compressed = Compressor::new(&ctx(), &Default::default()) + .compress(array.array(), None) + .unwrap(); assert_eq!(compressed.encoding().id(), FoREncoding.id()); let decompressed = compressed.flatten_primitive().unwrap(); @@ -186,11 +184,15 @@ mod test { #[test] fn test_overflow() { - let ctx = compress_ctx(); - // Create a range offset by a million let array = PrimitiveArray::from((i8::MIN..i8::MAX).collect_vec()); - let compressed = FoREncoding {}.compress(array.array(), None, ctx).unwrap(); + let compressed = FoREncoding {} + .compress( + array.array(), + None, + Compressor::new(&ctx(), &Default::default()), + ) + .unwrap(); let compressed = FoRArray::try_from(compressed).unwrap(); assert_eq!(i8::MIN, compressed.reference().try_into().unwrap()); diff --git a/vortex-fastlanes/src/for/compute.rs b/vortex-fastlanes/src/for/compute.rs index 205dc1c1a..6fb012279 100644 --- a/vortex-fastlanes/src/for/compute.rs +++ b/vortex-fastlanes/src/for/compute.rs @@ -68,8 +68,9 @@ impl SliceFn for FoRArray<'_> { #[cfg(test)] mod test { use vortex::array::primitive::PrimitiveArray; - use vortex::compress::{CompressCtx, EncodingCompression}; + use vortex::compress::{Compressor, EncodingCompression}; use vortex::compute::scalar_at::scalar_at; + use vortex::Context; use crate::FoREncoding; @@ -79,7 +80,7 @@ mod test { .compress( PrimitiveArray::from(vec![11, 15, 19]).array(), None, - CompressCtx::default(), + Compressor::new(&Context::default(), &Default::default()), ) .unwrap(); assert_eq!(scalar_at(&forarr, 0).unwrap(), 11.into()); diff --git a/vortex-ipc/benches/ipc_take.rs b/vortex-ipc/benches/ipc_take.rs index a5e145b45..7809cc79c 100644 --- a/vortex-ipc/benches/ipc_take.rs +++ b/vortex-ipc/benches/ipc_take.rs @@ -9,9 +9,9 @@ use arrow_schema::{DataType, Field, Schema}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use itertools::Itertools; use vortex::array::primitive::PrimitiveArray; -use vortex::compress::CompressCtx; +use vortex::compress::Compressor; use vortex::compute::take::take; -use vortex::{IntoArray, ViewContext}; +use vortex::{Context, IntoArray, ViewContext}; use vortex_ipc::iter::FallibleLendingIterator; use vortex_ipc::reader::StreamReader; use vortex_ipc::writer::StreamWriter; @@ -52,14 +52,14 @@ fn ipc_take(c: &mut Criterion) { group.bench_function("vortex", |b| { let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array(); let uncompressed = PrimitiveArray::from((0i32..3_000_000).rev().collect_vec()).into_array(); - let ctx = CompressCtx::default(); + let ctx = Compressor::default(); let compressed = ctx.compress(&uncompressed, None).unwrap(); // Try running take over an ArrayView. let mut buffer = vec![]; { let mut cursor = Cursor::new(&mut buffer); - let mut writer = StreamWriter::try_new(&mut cursor, ViewContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); writer.write_array(&compressed).unwrap(); } b.iter(|| { diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index 1e1e224c2..4c0209d25 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -100,7 +100,7 @@ impl<'a> TryFrom> for ViewContext { .ok_or_else(|| vortex_err!("Stream uses unknown encoding {}", encoding_id))?, ); } - Ok(Self::new(encodings.into())) + Ok(Self::new(encodings)) } } diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index 2da577dc5..a3aa9c981 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -52,7 +52,7 @@ impl StreamReader { let view_ctx: ViewContext = SerdeContextDeserializer { fb: messages.next(&mut read)?.header_as_context().unwrap(), - ctx: &ctx, + ctx, } .try_into()?; @@ -520,7 +520,7 @@ mod tests { fn test_write_read_bitpacked() { // NB: the order is reversed here to ensure we aren't grabbing indexes instead of values let uncompressed = PrimitiveArray::from((0i64..3_000).rev().collect_vec()); - let packed = BitPackedArray::encode(uncompressed.into_array(), 5).unwrap(); + let packed = BitPackedArray::encode(uncompressed.array(), 5).unwrap(); let expected = &[2989i64, 2988, 2987, 2986]; test_base_case(&packed.into_array(), expected, PrimitiveEncoding.id()); diff --git a/vortex-ree/src/compress.rs b/vortex-ree/src/compress.rs index 4b54855e2..116d07a46 100644 --- a/vortex-ree/src/compress.rs +++ b/vortex-ree/src/compress.rs @@ -3,7 +3,7 @@ use std::cmp::min; use itertools::Itertools; use num_traits::{AsPrimitive, FromPrimitive}; use vortex::array::primitive::{Primitive, PrimitiveArray}; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::{ArrayStatistics, Stat}; use vortex::validity::Validity; use vortex::{Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, OwnedArray}; @@ -39,18 +39,19 @@ impl EncodingCompression for REEEncoding { &self, array: &Array, like: Option<&Array>, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { let ree_like = like.map(|like_arr| REEArray::try_from(like_arr).unwrap()); let ree_like_ref = ree_like.as_ref(); let primitive_array = array.as_primitive(); let (ends, values) = ree_encode(&primitive_array); - let compressed_ends = ctx - .auxiliary("ends") - .compress(ends.array(), ree_like_ref.map(|ree| ree.ends()).as_ref())?; + let compressed_ends = ctx.auxiliary("ends").compress( + &ends.into_array(), + ree_like_ref.map(|ree| ree.ends()).as_ref(), + )?; let compressed_values = ctx.named("values").excluding(&REEEncoding).compress( - values.array(), + &values.into_array(), ree_like_ref.map(|ree| ree.values()).as_ref(), )?; diff --git a/vortex-ree/src/compute.rs b/vortex-ree/src/compute.rs index 4d2739eef..9344a8c03 100644 --- a/vortex-ree/src/compute.rs +++ b/vortex-ree/src/compute.rs @@ -45,7 +45,7 @@ impl TakeFn for REEArray<'_> { }); take( &self.values(), - PrimitiveArray::from(physical_indices).array(), + &PrimitiveArray::from(physical_indices).into_array(), ) } } diff --git a/vortex-roaring/src/boolean/compress.rs b/vortex-roaring/src/boolean/compress.rs index 78f3be302..ade17ea6d 100644 --- a/vortex-roaring/src/boolean/compress.rs +++ b/vortex-roaring/src/boolean/compress.rs @@ -1,6 +1,6 @@ use croaring::Bitmap; use vortex::array::bool::BoolArray; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::{Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, OwnedArray}; use vortex_dtype::DType; use vortex_dtype::Nullability::NonNullable; @@ -36,7 +36,7 @@ impl EncodingCompression for RoaringBoolEncoding { &self, array: &Array, _like: Option<&Array>, - _ctx: CompressCtx, + _ctx: Compressor, ) -> VortexResult { roaring_encode(array.clone().flatten_bool()?).map(move |a| a.into_array()) } diff --git a/vortex-roaring/src/integer/compress.rs b/vortex-roaring/src/integer/compress.rs index a67cc1277..e35bb33a1 100644 --- a/vortex-roaring/src/integer/compress.rs +++ b/vortex-roaring/src/integer/compress.rs @@ -2,7 +2,7 @@ use croaring::Bitmap; use log::debug; use num_traits::NumCast; use vortex::array::primitive::PrimitiveArray; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDType, ArrayDef, IntoArray, OwnedArray, ToStatic}; use vortex_dtype::{NativePType, PType}; @@ -50,7 +50,7 @@ impl EncodingCompression for RoaringIntEncoding { &self, array: &Array, _like: Option<&Array>, - _ctx: CompressCtx, + _ctx: Compressor, ) -> VortexResult { let parray = array.clone().flatten_primitive()?; Ok(roaring_encode(parray).into_array().to_static()) diff --git a/vortex-zigzag/src/compress.rs b/vortex-zigzag/src/compress.rs index 090effd1b..f08a64bb8 100644 --- a/vortex-zigzag/src/compress.rs +++ b/vortex-zigzag/src/compress.rs @@ -1,5 +1,5 @@ use vortex::array::primitive::PrimitiveArray; -use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::{ArrayStatistics, Stat}; use vortex::validity::Validity; use vortex::{Array, IntoArray, OwnedArray}; @@ -37,7 +37,7 @@ impl EncodingCompression for ZigZagEncoding { &self, array: &Array, like: Option<&Array>, - ctx: CompressCtx, + ctx: Compressor, ) -> VortexResult { let zigzag_like = like.map(|like_arr| ZigZagArray::try_from(like_arr).unwrap()); let encoded = zigzag_encode(&array.as_primitive())?; @@ -105,20 +105,17 @@ where #[cfg(test)] mod test { - use std::sync::Arc; - use vortex::encoding::{ArrayEncoding, EncodingRef}; + use vortex::Context; use vortex_fastlanes::BitPackedEncoding; use super::*; #[test] fn test_compress() { - let cfg = CompressConfig::new() - .with_enabled([&ZigZagEncoding as EncodingRef, &BitPackedEncoding]); - let ctx = CompressCtx::new(Arc::new(cfg)); - - let compressed = ctx + let ctx = + Context::default().with_encodings([&ZigZagEncoding as EncodingRef, &BitPackedEncoding]); + let compressed = Compressor::new(&ctx, &Default::default()) .compress( PrimitiveArray::from(Vec::from_iter((-10_000..10_000).map(|i| i as i64))).array(), None, From 6cef097ad359b7b0ef81b8ab698babe7f2a277a7 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 10:51:42 +0100 Subject: [PATCH 04/10] EncodingCtx --- vortex-array/src/compress.rs | 2 +- vortex-fastlanes/benches/bitpacking_take.rs | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index 7ab1c65ec..6ae5f26c0 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -121,7 +121,7 @@ impl<'a> Compressor<'a> { #[inline] pub fn options(&self) -> &CompressConfig { - &self.options + self.options } pub fn excluding(&self, encoding: EncodingRef) -> Self { diff --git a/vortex-fastlanes/benches/bitpacking_take.rs b/vortex-fastlanes/benches/bitpacking_take.rs index 0466287c6..a84df7d84 100644 --- a/vortex-fastlanes/benches/bitpacking_take.rs +++ b/vortex-fastlanes/benches/bitpacking_take.rs @@ -9,6 +9,7 @@ use vortex::array::sparse::SparseArray; use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::compute::take::take; use vortex::encoding::EncodingRef; +use vortex::Context; use vortex_fastlanes::{BitPackedArray, BitPackedEncoding}; fn values(len: usize, bits: usize) -> Vec { @@ -18,13 +19,13 @@ fn values(len: usize, bits: usize) -> Vec { } fn bench_take(c: &mut Criterion) { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = Compressor::new(Arc::new(cfg)); + let ctx = Context::default().with_encoding(&BitPackedEncoding); + let compressor = Compressor::new(&ctx, &Default::default()); let values = values(1_000_000, 8); let uncompressed = PrimitiveArray::from(values.clone()); let packed = BitPackedEncoding {} - .compress(uncompressed.array(), None, ctx) + .compress(uncompressed.array(), None, compressor) .unwrap(); let stratified_indices: PrimitiveArray = (0..10).map(|i| i * 10_000).collect::>().into(); From e2b6b62782307e59f892a54f861d005852e98e16 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 10:54:33 +0100 Subject: [PATCH 05/10] EncodingCtx --- vortex-fastlanes/benches/bitpacking_take.rs | 21 ++++++++++++--------- vortex-ipc/benches/ipc_array_reader_take.rs | 8 +++++--- vortex-ipc/benches/ipc_take.rs | 14 +++++++++----- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/vortex-fastlanes/benches/bitpacking_take.rs b/vortex-fastlanes/benches/bitpacking_take.rs index a84df7d84..2168dd174 100644 --- a/vortex-fastlanes/benches/bitpacking_take.rs +++ b/vortex-fastlanes/benches/bitpacking_take.rs @@ -1,14 +1,11 @@ -use std::sync::Arc; - use criterion::{black_box, criterion_group, criterion_main, Criterion}; use itertools::Itertools; use rand::distributions::Uniform; use rand::{thread_rng, Rng}; use vortex::array::primitive::PrimitiveArray; use vortex::array::sparse::SparseArray; -use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; +use vortex::compress::{Compressor, EncodingCompression}; use vortex::compute::take::take; -use vortex::encoding::EncodingRef; use vortex::Context; use vortex_fastlanes::{BitPackedArray, BitPackedEncoding}; @@ -20,12 +17,15 @@ fn values(len: usize, bits: usize) -> Vec { fn bench_take(c: &mut Criterion) { let ctx = Context::default().with_encoding(&BitPackedEncoding); - let compressor = Compressor::new(&ctx, &Default::default()); let values = values(1_000_000, 8); let uncompressed = PrimitiveArray::from(values.clone()); let packed = BitPackedEncoding {} - .compress(uncompressed.array(), None, compressor) + .compress( + uncompressed.array(), + None, + Compressor::new(&ctx, &Default::default()), + ) .unwrap(); let stratified_indices: PrimitiveArray = (0..10).map(|i| i * 10_000).collect::>().into(); @@ -57,8 +57,7 @@ fn bench_take(c: &mut Criterion) { } fn bench_patched_take(c: &mut Criterion) { - let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); - let ctx = Compressor::new(Arc::new(cfg)); + let ctx = Context::default().with_encoding(&BitPackedEncoding); let big_base2 = 1048576; let num_exceptions = 10000; @@ -66,7 +65,11 @@ fn bench_patched_take(c: &mut Criterion) { let uncompressed = PrimitiveArray::from(values.clone()); let packed = BitPackedEncoding {} - .compress(uncompressed.array(), None, ctx) + .compress( + uncompressed.array(), + None, + Compressor::new(&ctx, &Default::default()), + ) .unwrap(); let packed = BitPackedArray::try_from(packed).unwrap(); assert!(packed.patches().is_some()); diff --git a/vortex-ipc/benches/ipc_array_reader_take.rs b/vortex-ipc/benches/ipc_array_reader_take.rs index 14d1d6ed6..962c16596 100644 --- a/vortex-ipc/benches/ipc_array_reader_take.rs +++ b/vortex-ipc/benches/ipc_array_reader_take.rs @@ -4,7 +4,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use fallible_iterator::FallibleIterator; use itertools::Itertools; use vortex::array::primitive::PrimitiveArray; -use vortex::{IntoArray, ViewContext}; +use vortex::{Context, IntoArray}; use vortex_dtype::{DType, Nullability, PType}; use vortex_ipc::iter::FallibleLendingIterator; use vortex_ipc::reader::StreamReader; @@ -14,6 +14,8 @@ use vortex_ipc::writer::StreamWriter; // take from the first 20 batches and last batch // compare with arrow fn ipc_array_reader_take(c: &mut Criterion) { + let ctx = Context::default(); + let indices = (0..20) .map(|i| i * 100_000 + 1) .chain([98 * 100_000 + 1]) @@ -24,7 +26,7 @@ fn ipc_array_reader_take(c: &mut Criterion) { let mut buffer = vec![]; { let mut cursor = Cursor::new(&mut buffer); - let mut writer = StreamWriter::try_new(&mut cursor, ViewContext::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &ctx).unwrap(); writer .write_schema(&DType::Primitive(PType::I32, Nullability::Nullable)) .unwrap(); @@ -37,7 +39,7 @@ fn ipc_array_reader_take(c: &mut Criterion) { b.iter(|| { let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &ctx).unwrap(); let array_reader = reader.next().unwrap().unwrap(); let mut iterator = array_reader.take(&indices).unwrap(); while let Some(arr) = iterator.next().unwrap() { diff --git a/vortex-ipc/benches/ipc_take.rs b/vortex-ipc/benches/ipc_take.rs index 7809cc79c..218a32521 100644 --- a/vortex-ipc/benches/ipc_take.rs +++ b/vortex-ipc/benches/ipc_take.rs @@ -1,3 +1,5 @@ +extern crate core; + use std::io::Cursor; use std::sync::Arc; @@ -11,7 +13,7 @@ use itertools::Itertools; use vortex::array::primitive::PrimitiveArray; use vortex::compress::Compressor; use vortex::compute::take::take; -use vortex::{Context, IntoArray, ViewContext}; +use vortex::{Context, IntoArray}; use vortex_ipc::iter::FallibleLendingIterator; use vortex_ipc::reader::StreamReader; use vortex_ipc::writer::StreamWriter; @@ -52,19 +54,21 @@ fn ipc_take(c: &mut Criterion) { group.bench_function("vortex", |b| { let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array(); let uncompressed = PrimitiveArray::from((0i32..3_000_000).rev().collect_vec()).into_array(); - let ctx = Compressor::default(); - let compressed = ctx.compress(&uncompressed, None).unwrap(); + let ctx = Context::default(); + let compressed = Compressor::new(&ctx, &Default::default()) + .compress(&uncompressed, None) + .unwrap(); // Try running take over an ArrayView. let mut buffer = vec![]; { let mut cursor = Cursor::new(&mut buffer); - let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &ctx).unwrap(); writer.write_array(&compressed).unwrap(); } b.iter(|| { let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &ctx).unwrap(); let mut array_reader = reader.next().unwrap().unwrap(); let array_view = array_reader.next().unwrap().unwrap(); black_box(take(&array_view, &indices)) From b4a82fac8d62dd81c2778bd5b5fe8e017008ee91 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 10:54:50 +0100 Subject: [PATCH 06/10] EncodingCtx --- vortex-array/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index afda03561..8b561d827 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -16,7 +16,7 @@ name = "vortex" path = "src/lib.rs" [lints] -# workspace = true +workspace = true [dependencies] arrow-array = { workspace = true } From 85012ef40c2633e78edd60b74aa130533f9088ee Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 10:59:25 +0100 Subject: [PATCH 07/10] EncodingCtx --- bench-vortex/Cargo.toml | 1 - bench-vortex/src/data_downloads.rs | 6 ++-- bench-vortex/src/lib.rs | 45 +++++++++++++++--------------- bench-vortex/src/reader.rs | 10 +++---- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 9a9577110..b8063b322 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -48,7 +48,6 @@ vortex-roaring = { path = "../vortex-roaring" } [dev-dependencies] criterion = { workspace = true } - [[bench]] name = "compress_benchmark" harness = false diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs index 6c9019bed..83f0f47a2 100644 --- a/bench-vortex/src/data_downloads.rs +++ b/bench-vortex/src/data_downloads.rs @@ -11,13 +11,13 @@ use log::info; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::runtime::Runtime; use vortex::arrow::FromArrowType; -use vortex::{Context, IntoArray, ToArrayData}; +use vortex::{IntoArray, ToArrayData}; use vortex_dtype::DType; use vortex_error::{VortexError, VortexResult}; use vortex_ipc::writer::StreamWriter; -use crate::idempotent; use crate::reader::BATCH_SIZE; +use crate::{idempotent, CTX}; pub fn download_data(fname: PathBuf, data_url: &str) -> PathBuf { idempotent(&fname, |path| { @@ -60,7 +60,7 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa let reader = builder.with_batch_size(BATCH_SIZE).build().unwrap(); let mut write = File::create(path).unwrap(); - let mut writer = StreamWriter::try_new(&mut write, &Context::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut write, &CTX).unwrap(); let dtype = DType::from_arrow(reader.schema()); writer.write_schema(&dtype).unwrap(); diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index d0b031753..d27163559 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -7,6 +7,7 @@ use std::path::{Path, PathBuf}; use arrow_array::RecordBatchReader; use humansize::DECIMAL; use itertools::Itertools; +use lazy_static::lazy_static; use log::{info, LevelFilter}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ProjectionMask; @@ -35,6 +36,22 @@ pub mod reader; pub mod taxi_data; pub mod vortex_utils; +lazy_static! { + pub static ref CTX: Context = Context::default().with_encodings([ + &ALPEncoding as EncodingRef, + &DictEncoding, + &BitPackedEncoding, + &FoREncoding, + &DateTimePartsEncoding, + // &DeltaEncoding, Blows up the search space too much. + &REEEncoding, + &RoaringBoolEncoding, + // &RoaringIntEncoding, + // Doesn't offer anything more than FoR really + // &ZigZagEncoding, + ]); +} + /// Creates a file if it doesn't already exist. /// NB: Does NOT modify the given path to ensure that it resides in the data directory. pub fn idempotent( @@ -103,22 +120,6 @@ pub fn setup_logger(level: LevelFilter) { .unwrap(); } -pub fn ctx() -> Context { - Context::default().with_encodings([ - &ALPEncoding as EncodingRef, - &DictEncoding, - &BitPackedEncoding, - &FoREncoding, - &DateTimePartsEncoding, - // &DeltaEncoding, Blows up the search space too much. - &REEEncoding, - &RoaringBoolEncoding, - // &RoaringIntEncoding, - // Doesn't offer anything more than FoR really - // &ZigZagEncoding, - ]) -} - pub fn compress_taxi_data() -> OwnedArray { let file = File::open(taxi_data_parquet()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); @@ -144,7 +145,7 @@ pub fn compress_taxi_data() -> OwnedArray { .map(|batch| batch.to_array_data().into_array()) .map(|array| { uncompressed_size += array.nbytes(); - Compressor::new(&ctx(), &Default::default()) + Compressor::new(&CTX, &Default::default()) .compress(&array, None) .unwrap() }) @@ -218,12 +219,12 @@ mod test { use vortex::arrow::FromArrowArray; use vortex::compress::Compressor; use vortex::compute::as_arrow::as_arrow; - use vortex::{ArrayData, Context, IntoArray}; + use vortex::{ArrayData, IntoArray}; use vortex_ipc::reader::StreamReader; use vortex_ipc::writer::StreamWriter; use crate::taxi_data::taxi_data_parquet; - use crate::{compress_taxi_data, ctx, setup_logger}; + use crate::{compress_taxi_data, setup_logger, CTX}; #[ignore] #[test] @@ -246,12 +247,12 @@ mod test { let mut buf = Vec::::new(); { - let mut writer = StreamWriter::try_new(&mut buf, &Context::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut buf, &CTX).unwrap(); writer.write_array(&vortex_array).unwrap(); } let mut read = buf.as_slice(); - let mut reader = StreamReader::try_new(&mut read, &Context::default()).unwrap(); + let mut reader = StreamReader::try_new(&mut read, &CTX).unwrap(); reader.read_array().unwrap(); } } @@ -286,7 +287,7 @@ mod test { let arrow_array: ArrowArrayRef = Arc::new(struct_arrow); let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array(); - let compressed = Compressor::new(&ctx(), &Default::default()) + let compressed = Compressor::new(&CTX, &Default::default()) .compress(&vortex_array, None) .unwrap(); let compressed_as_arrow = as_arrow(&compressed).unwrap(); diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 730bda210..9339f81ce 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -21,21 +21,21 @@ use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowType; use vortex::compress::Compressor; use vortex::compute::take::take; -use vortex::{Context, IntoArray, OwnedArray, ToArrayData, ToStatic}; +use vortex::{IntoArray, OwnedArray, ToArrayData, ToStatic}; use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_ipc::iter::FallibleLendingIterator; use vortex_ipc::reader::StreamReader; use vortex_ipc::writer::StreamWriter; -use crate::ctx; +use crate::CTX; pub const BATCH_SIZE: usize = 65_536; pub fn open_vortex(path: &Path) -> VortexResult { let mut file = File::open(path)?; - let mut reader = StreamReader::try_new(&mut file, &Context::default())?; + let mut reader = StreamReader::try_new(&mut file, &CTX)?; let mut reader = reader.next()?.unwrap(); let dtype = reader.dtype().clone(); let mut chunks = vec![]; @@ -51,7 +51,7 @@ pub fn rewrite_parquet_as_vortex( ) -> VortexResult<()> { let chunked = compress_parquet_to_vortex(parquet_path.as_path())?; - let mut writer = StreamWriter::try_new(write, &Context::default()).unwrap(); + let mut writer = StreamWriter::try_new(write, &CTX).unwrap(); writer.write_array(&chunked.into_array()).unwrap(); Ok(()) } @@ -69,7 +69,7 @@ pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult Date: Fri, 3 May 2024 11:03:45 +0100 Subject: [PATCH 08/10] EncodingCtx --- bench-vortex/src/lib.rs | 8 ++------ bench-vortex/src/reader.rs | 4 +--- vortex-array/src/compress.rs | 10 +++++++--- vortex-fastlanes/benches/bitpacking_take.rs | 12 ++---------- vortex-fastlanes/src/bitpacking/compress.rs | 11 ++++------- vortex-fastlanes/src/bitpacking/compute/mod.rs | 2 +- vortex-fastlanes/src/delta/compress.rs | 2 +- vortex-fastlanes/src/for/compress.rs | 10 +++------- vortex-fastlanes/src/for/compute.rs | 2 +- vortex-ipc/benches/ipc_take.rs | 4 +--- vortex-zigzag/src/compress.rs | 2 +- 11 files changed, 24 insertions(+), 43 deletions(-) diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index d27163559..a63e9a406 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -145,9 +145,7 @@ pub fn compress_taxi_data() -> OwnedArray { .map(|batch| batch.to_array_data().into_array()) .map(|array| { uncompressed_size += array.nbytes(); - Compressor::new(&CTX, &Default::default()) - .compress(&array, None) - .unwrap() + Compressor::new(&CTX).compress(&array, None).unwrap() }) .collect_vec(); @@ -287,9 +285,7 @@ mod test { let arrow_array: ArrowArrayRef = Arc::new(struct_arrow); let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array(); - let compressed = Compressor::new(&CTX, &Default::default()) - .compress(&vortex_array, None) - .unwrap(); + let compressed = Compressor::new(&CTX).compress(&vortex_array, None).unwrap(); let compressed_as_arrow = as_arrow(&compressed).unwrap(); assert_eq!(compressed_as_arrow.deref(), arrow_array.deref()); } diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 9339f81ce..f21f5ccc0 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -69,9 +69,7 @@ pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult { ctx: &'a Context, - options: &'a CompressConfig, + options: CompressConfig, path: Vec, depth: u8, @@ -88,7 +88,11 @@ impl Display for Compressor<'_> { } impl<'a> Compressor<'a> { - pub fn new(ctx: &'a Context, options: &'a CompressConfig) -> Self { + pub fn new(ctx: &'a Context) -> Self { + Self::new_with_options(ctx, Default::default()) + } + + pub fn new_with_options(ctx: &'a Context, options: CompressConfig) -> Self { Self { ctx, options, @@ -121,7 +125,7 @@ impl<'a> Compressor<'a> { #[inline] pub fn options(&self) -> &CompressConfig { - self.options + &self.options } pub fn excluding(&self, encoding: EncodingRef) -> Self { diff --git a/vortex-fastlanes/benches/bitpacking_take.rs b/vortex-fastlanes/benches/bitpacking_take.rs index 2168dd174..0ea455baa 100644 --- a/vortex-fastlanes/benches/bitpacking_take.rs +++ b/vortex-fastlanes/benches/bitpacking_take.rs @@ -21,11 +21,7 @@ fn bench_take(c: &mut Criterion) { let values = values(1_000_000, 8); let uncompressed = PrimitiveArray::from(values.clone()); let packed = BitPackedEncoding {} - .compress( - uncompressed.array(), - None, - Compressor::new(&ctx, &Default::default()), - ) + .compress(uncompressed.array(), None, Compressor::new(&ctx)) .unwrap(); let stratified_indices: PrimitiveArray = (0..10).map(|i| i * 10_000).collect::>().into(); @@ -65,11 +61,7 @@ fn bench_patched_take(c: &mut Criterion) { let uncompressed = PrimitiveArray::from(values.clone()); let packed = BitPackedEncoding {} - .compress( - uncompressed.array(), - None, - Compressor::new(&ctx, &Default::default()), - ) + .compress(uncompressed.array(), None, Compressor::new(&ctx)) .unwrap(); let packed = BitPackedArray::try_from(packed).unwrap(); assert!(packed.patches().is_some()); diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index ea1010a0d..bfa6c1ef7 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -375,7 +375,7 @@ mod test { #[test] fn test_compress() { - let compressed = Compressor::new(&ctx(), &Default::default()) + let compressed = Compressor::new(&ctx()) .compress( PrimitiveArray::from(Vec::from_iter((0..10_000).map(|i| (i % 63) as u8))).array(), None, @@ -395,12 +395,9 @@ mod test { fn compression_roundtrip(n: usize) { let values = PrimitiveArray::from(Vec::from_iter((0..n).map(|i| (i % 2047) as u16))); - let compressed = Compressor::new( - &crate::bitpacking::compress::test::ctx(), - &Default::default(), - ) - .compress(values.array(), None) - .unwrap(); + let compressed = Compressor::new(&ctx()) + .compress(values.array(), None) + .unwrap(); let compressed = BitPackedArray::try_from(compressed).unwrap(); let decompressed = compressed.to_array().flatten_primitive().unwrap(); assert_eq!(decompressed.typed_data::(), values.typed_data::()); diff --git a/vortex-fastlanes/src/bitpacking/compute/mod.rs b/vortex-fastlanes/src/bitpacking/compute/mod.rs index 52b8d4089..dbb396ead 100644 --- a/vortex-fastlanes/src/bitpacking/compute/mod.rs +++ b/vortex-fastlanes/src/bitpacking/compute/mod.rs @@ -195,7 +195,7 @@ mod test { fn take_indices() { let indices = PrimitiveArray::from(vec![0, 125, 2047, 2049, 2151, 2790]); let unpacked = PrimitiveArray::from((0..4096).map(|i| (i % 63) as u8).collect::>()); - let bitpacked = Compressor::new(&ctx(), &Default::default()) + let bitpacked = Compressor::new(&ctx()) .compress(unpacked.array(), None) .unwrap(); let result = take(&bitpacked, indices.array()).unwrap(); diff --git a/vortex-fastlanes/src/delta/compress.rs b/vortex-fastlanes/src/delta/compress.rs index 337652460..b3d8f623e 100644 --- a/vortex-fastlanes/src/delta/compress.rs +++ b/vortex-fastlanes/src/delta/compress.rs @@ -217,7 +217,7 @@ mod test { .compress( PrimitiveArray::from(input.clone()).array(), None, - Compressor::new(&ctx(), &Default::default()), + Compressor::new(&ctx()), ) .unwrap(); diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index 5569088af..3faf308be 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -159,7 +159,7 @@ mod test { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let compressed = Compressor::new(&ctx(), &Default::default()) + let compressed = Compressor::new(&ctx()) .compress(array.array(), None) .unwrap(); assert_eq!(compressed.encoding().id(), FoREncoding.id()); @@ -173,7 +173,7 @@ mod test { fn test_decompress() { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let compressed = Compressor::new(&ctx(), &Default::default()) + let compressed = Compressor::new(&ctx()) .compress(array.array(), None) .unwrap(); assert_eq!(compressed.encoding().id(), FoREncoding.id()); @@ -187,11 +187,7 @@ mod test { // Create a range offset by a million let array = PrimitiveArray::from((i8::MIN..i8::MAX).collect_vec()); let compressed = FoREncoding {} - .compress( - array.array(), - None, - Compressor::new(&ctx(), &Default::default()), - ) + .compress(array.array(), None, Compressor::new(&ctx())) .unwrap(); let compressed = FoRArray::try_from(compressed).unwrap(); assert_eq!(i8::MIN, compressed.reference().try_into().unwrap()); diff --git a/vortex-fastlanes/src/for/compute.rs b/vortex-fastlanes/src/for/compute.rs index 6fb012279..0706a2a89 100644 --- a/vortex-fastlanes/src/for/compute.rs +++ b/vortex-fastlanes/src/for/compute.rs @@ -80,7 +80,7 @@ mod test { .compress( PrimitiveArray::from(vec![11, 15, 19]).array(), None, - Compressor::new(&Context::default(), &Default::default()), + Compressor::new(&Context::default()), ) .unwrap(); assert_eq!(scalar_at(&forarr, 0).unwrap(), 11.into()); diff --git a/vortex-ipc/benches/ipc_take.rs b/vortex-ipc/benches/ipc_take.rs index 218a32521..6af3b089c 100644 --- a/vortex-ipc/benches/ipc_take.rs +++ b/vortex-ipc/benches/ipc_take.rs @@ -55,9 +55,7 @@ fn ipc_take(c: &mut Criterion) { let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array(); let uncompressed = PrimitiveArray::from((0i32..3_000_000).rev().collect_vec()).into_array(); let ctx = Context::default(); - let compressed = Compressor::new(&ctx, &Default::default()) - .compress(&uncompressed, None) - .unwrap(); + let compressed = Compressor::new(&ctx).compress(&uncompressed, None).unwrap(); // Try running take over an ArrayView. let mut buffer = vec![]; diff --git a/vortex-zigzag/src/compress.rs b/vortex-zigzag/src/compress.rs index f08a64bb8..bc5b86a4d 100644 --- a/vortex-zigzag/src/compress.rs +++ b/vortex-zigzag/src/compress.rs @@ -115,7 +115,7 @@ mod test { fn test_compress() { let ctx = Context::default().with_encodings([&ZigZagEncoding as EncodingRef, &BitPackedEncoding]); - let compressed = Compressor::new(&ctx, &Default::default()) + let compressed = Compressor::new(&ctx) .compress( PrimitiveArray::from(Vec::from_iter((-10_000..10_000).map(|i| i as i64))).array(), None, From 6c8f469035ad5b2a1ac39fe766c3cd9817fe375d Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 11:10:14 +0100 Subject: [PATCH 09/10] EncodingCtx --- vortex-fastlanes/src/bitpacking/mod.rs | 4 ++-- vortex-ipc/src/lib.rs | 2 -- vortex-ipc/src/messages.rs | 2 +- vortex-ipc/src/reader.rs | 23 ++++++++++++++++------- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index 070981bba..5d09a13f4 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -203,7 +203,7 @@ mod test { fn slice_within_block() { let packed = BitPackedArray::encode( &PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()).array(), - 16, + 7, ) .unwrap(); @@ -222,7 +222,7 @@ mod test { fn slice_block_boundary() { let packed = BitPackedArray::encode( &PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()).array(), - 16, + 7, ) .unwrap(); diff --git a/vortex-ipc/src/lib.rs b/vortex-ipc/src/lib.rs index 6d05aa324..dbc462019 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-ipc/src/lib.rs @@ -1,5 +1,3 @@ -extern crate core; - use vortex_error::{vortex_err, VortexError}; pub const ALIGNMENT: usize = 64; diff --git a/vortex-ipc/src/messages.rs b/vortex-ipc/src/messages.rs index 4c0209d25..46a744b5c 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-ipc/src/messages.rs @@ -165,7 +165,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { let encoding = ctx .encoding_idx(column_data.encoding().id()) - // TODO(ngates): return result from this writer? + // FIXME(ngates): return result from this writer? .unwrap_or_else(|| panic!("Encoding not found: {:?}", column_data.encoding())); let metadata = Some( diff --git a/vortex-ipc/src/reader.rs b/vortex-ipc/src/reader.rs index a3aa9c981..2b15c41e9 100644 --- a/vortex-ipc/src/reader.rs +++ b/vortex-ipc/src/reader.rs @@ -384,12 +384,12 @@ mod tests { use itertools::Itertools; use vortex::array::chunked::{Chunked, ChunkedArray}; use vortex::array::primitive::{Primitive, PrimitiveArray, PrimitiveEncoding}; - use vortex::encoding::{ArrayEncoding, EncodingId}; + use vortex::encoding::{ArrayEncoding, EncodingId, EncodingRef}; use vortex::{Array, ArrayDType, ArrayDef, Context, IntoArray, OwnedArray}; use vortex_alp::{ALPArray, ALPEncoding}; use vortex_dtype::NativePType; use vortex_error::VortexResult; - use vortex_fastlanes::BitPackedArray; + use vortex_fastlanes::{BitPackedArray, BitPackedEncoding}; use crate::iter::FallibleLendingIterator; use crate::reader::StreamReader; @@ -397,7 +397,10 @@ mod tests { #[test] fn test_read_write() { - let ctx = Context::default(); + let ctx = Context::default().with_encodings([ + &ALPEncoding as EncodingRef, + &BitPackedEncoding as EncodingRef, + ]); let array = PrimitiveArray::from(vec![0, 1, 2]).into_array(); let chunked_array = ChunkedArray::try_new(vec![array.clone(), array.clone()], array.dtype().clone()) @@ -687,19 +690,22 @@ mod tests { expected: &[T], expected_encoding_id: EncodingId, ) { + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let indices = PrimitiveArray::from(vec![10i32, 11, 12, 13, 100_000, 2_999_999, 2_999_999]) .into_array(); let mut buffer = vec![]; { let mut cursor = Cursor::new(&mut buffer); { - let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &ctx).unwrap(); writer.write_array(data).unwrap(); } } let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor, &Context::default()).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &ctx).unwrap(); let array_reader = reader.next().unwrap().unwrap(); let mut take_iter = array_reader.take(&indices).unwrap(); @@ -718,17 +724,20 @@ mod tests { data: &'a Array, indices: &'a Array, ) -> VortexResult { + let ctx = + Context::default().with_encodings([&ALPEncoding as EncodingRef, &BitPackedEncoding]); + let mut buffer = vec![]; { let mut cursor = Cursor::new(&mut buffer); { - let mut writer = StreamWriter::try_new(&mut cursor, &Context::default()).unwrap(); + let mut writer = StreamWriter::try_new(&mut cursor, &ctx).unwrap(); writer.write_array(data).unwrap(); } } let mut cursor = Cursor::new(&buffer); - let mut reader = StreamReader::try_new(&mut cursor, &Context::default()).unwrap(); + let mut reader = StreamReader::try_new(&mut cursor, &ctx).unwrap(); let array_reader = reader.next().unwrap().unwrap(); let mut result_iter = array_reader.take(indices)?; let result = result_iter.next().unwrap(); From 25156e9669d13337e2879c50e25bf0b9e2ac19c2 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 3 May 2024 11:54:27 +0100 Subject: [PATCH 10/10] EncodingCtx --- vortex-fastlanes/src/bitpacking/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index 5d09a13f4..2b521870b 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -202,7 +202,7 @@ mod test { #[test] fn slice_within_block() { let packed = BitPackedArray::encode( - &PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()).array(), + PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()).array(), 7, ) .unwrap(); @@ -221,7 +221,7 @@ mod test { #[test] fn slice_block_boundary() { let packed = BitPackedArray::encode( - &PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()).array(), + PrimitiveArray::from((0..10_000).map(|i| (i % 63) as u8).collect::>()).array(), 7, ) .unwrap();