From ced650937455b0f961f22cae29c6f8c6a3d0e6d0 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 8 Aug 2024 17:36:38 +0100 Subject: [PATCH 1/4] fix --- vortex-array/src/compress.rs | 5 +++++ vortex-sampling-compressor/src/compressors/alp.rs | 9 ++++++++- .../src/compressors/bitpacked.rs | 8 ++++++++ .../src/compressors/constant.rs | 9 ++++++++- .../src/compressors/date_time_parts.rs | 11 ++++++++++- vortex-sampling-compressor/src/compressors/delta.rs | 9 ++++++++- vortex-sampling-compressor/src/compressors/dict.rs | 9 ++++++++- vortex-sampling-compressor/src/compressors/for.rs | 9 ++++++++- vortex-sampling-compressor/src/compressors/mod.rs | 4 ++++ .../src/compressors/roaring_bool.rs | 9 ++++++++- .../src/compressors/roaring_int.rs | 9 ++++++++- vortex-sampling-compressor/src/compressors/runend.rs | 9 ++++++++- vortex-sampling-compressor/src/compressors/sparse.rs | 9 ++++++++- vortex-sampling-compressor/src/compressors/zigzag.rs | 9 ++++++++- 14 files changed, 107 insertions(+), 11 deletions(-) diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index cef8fd273c..51c695caed 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -1,9 +1,14 @@ +use std::collections::HashSet; + use vortex_error::VortexResult; +use crate::encoding::EncodingRef; use crate::Array; pub trait CompressionStrategy { fn compress(&self, array: &Array) -> VortexResult; + + fn used_encodings(&self) -> HashSet; } /// Check that compression did not alter the length of the validity array. diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index ead9077980..a0c1ef4f71 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -1,6 +1,9 @@ +use std::collections::HashSet; + use vortex::array::PrimitiveArray; +use vortex::encoding::EncodingRef; use vortex::{Array, ArrayDef, IntoArray}; -use vortex_alp::{alp_encode_components, match_each_alp_float_ptype, ALPArray, ALP}; +use vortex_alp::{alp_encode_components, match_each_alp_float_ptype, ALPArray, ALPEncoding, ALP}; use vortex_dtype::PType; use vortex_error::VortexResult; @@ -70,4 +73,8 @@ impl EncodingCompressor for ALPCompressor { )), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&ALPEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/bitpacked.rs b/vortex-sampling-compressor/src/compressors/bitpacked.rs index 25bd37ed0c..c45c44d5f4 100644 --- a/vortex-sampling-compressor/src/compressors/bitpacked.rs +++ b/vortex-sampling-compressor/src/compressors/bitpacked.rs @@ -1,9 +1,13 @@ +use std::collections::HashSet; + use vortex::array::PrimitiveArray; +use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; use vortex_error::{vortex_err, VortexResult}; use vortex_fastlanes::{ bitpack, bitpack_patches, count_exceptions, find_best_bit_width, BitPacked, BitPackedArray, + BitPackedEncoding, }; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; @@ -83,4 +87,8 @@ impl EncodingCompressor for BitPackedCompressor { )), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&BitPackedEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/constant.rs b/vortex-sampling-compressor/src/compressors/constant.rs index 7dd2290ccb..454165306e 100644 --- a/vortex-sampling-compressor/src/compressors/constant.rs +++ b/vortex-sampling-compressor/src/compressors/constant.rs @@ -1,5 +1,8 @@ -use vortex::array::{Constant, ConstantArray}; +use std::collections::HashSet; + +use vortex::array::{Constant, ConstantArray, ConstantEncoding}; use vortex::compute::unary::scalar_at; +use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; @@ -31,4 +34,8 @@ impl EncodingCompressor for ConstantCompressor { Some(CompressionTree::flat(self)), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&ConstantEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/date_time_parts.rs b/vortex-sampling-compressor/src/compressors/date_time_parts.rs index 6cf2da991a..6fc755a8ab 100644 --- a/vortex-sampling-compressor/src/compressors/date_time_parts.rs +++ b/vortex-sampling-compressor/src/compressors/date_time_parts.rs @@ -1,7 +1,12 @@ +use std::collections::HashSet; + use vortex::array::temporal::TemporalMetadata; use vortex::array::TemporalArray; +use vortex::encoding::EncodingRef; use vortex::{Array, ArrayDType, ArrayDef, IntoArray}; -use vortex_datetime_parts::{compress_temporal, DateTimeParts, DateTimePartsArray}; +use vortex_datetime_parts::{ + compress_temporal, DateTimeParts, DateTimePartsArray, DateTimePartsEncoding, +}; use vortex_error::VortexResult; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; @@ -58,4 +63,8 @@ impl EncodingCompressor for DateTimePartsCompressor { )), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&DateTimePartsEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/delta.rs b/vortex-sampling-compressor/src/compressors/delta.rs index 4e1f2eb6ec..cac2182490 100644 --- a/vortex-sampling-compressor/src/compressors/delta.rs +++ b/vortex-sampling-compressor/src/compressors/delta.rs @@ -1,7 +1,10 @@ +use std::collections::HashSet; + use vortex::array::PrimitiveArray; +use vortex::encoding::EncodingRef; use vortex::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; -use vortex_fastlanes::{delta_compress, Delta, DeltaArray}; +use vortex_fastlanes::{delta_compress, Delta, DeltaArray, DeltaEncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -51,4 +54,8 @@ impl EncodingCompressor for DeltaCompressor { Some(CompressionTree::new(self, vec![bases.path, deltas.path])), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&DeltaEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index cc936ef0a9..ec6d320e8e 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -1,7 +1,10 @@ +use std::collections::HashSet; + use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray}; +use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; -use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray}; +use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray, DictEncoding}; use vortex_error::VortexResult; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; @@ -69,4 +72,8 @@ impl EncodingCompressor for DictCompressor { Some(CompressionTree::new(self, vec![codes.path, values.path])), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&DictEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index 4b14086cc5..fff5353c54 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -1,10 +1,13 @@ +use std::collections::HashSet; + use vortex::array::PrimitiveArray; +use vortex::encoding::EncodingRef; use vortex::stats::{trailing_zeros, ArrayStatistics}; use vortex::validity::ArrayValidity; use vortex::{Array, ArrayDef, IntoArray}; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; -use vortex_fastlanes::{for_compress, FoR, FoRArray}; +use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -60,4 +63,8 @@ impl EncodingCompressor for FoRCompressor { Some(CompressionTree::new(self, vec![compressed_child.path])), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&FoREncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index c0623c0b02..5f7d1b5640 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -1,6 +1,8 @@ +use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; +use vortex::encoding::EncodingRef; use vortex::Array; use vortex_error::VortexResult; @@ -34,6 +36,8 @@ pub trait EncodingCompressor: Sync + Send + Debug { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult>; + + fn used_encodings(&self) -> HashSet; } pub type CompressorRef<'a> = &'a dyn EncodingCompressor; diff --git a/vortex-sampling-compressor/src/compressors/roaring_bool.rs b/vortex-sampling-compressor/src/compressors/roaring_bool.rs index 3e4040fc57..2cf7055f72 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_bool.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_bool.rs @@ -1,8 +1,11 @@ +use std::collections::HashSet; + +use vortex::encoding::EncodingRef; use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoArrayVariant}; use vortex_dtype::DType; use vortex_dtype::Nullability::NonNullable; use vortex_error::VortexResult; -use vortex_roaring::{roaring_bool_encode, RoaringBool}; +use vortex_roaring::{roaring_bool_encode, RoaringBool, RoaringBoolEncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -44,4 +47,8 @@ impl EncodingCompressor for RoaringBoolCompressor { Some(CompressionTree::flat(self)), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&RoaringBoolEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/roaring_int.rs b/vortex-sampling-compressor/src/compressors/roaring_int.rs index f88e3e95cc..56cea12fa2 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_int.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_int.rs @@ -1,7 +1,10 @@ +use std::collections::HashSet; + +use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoArrayVariant}; use vortex_error::VortexResult; -use vortex_roaring::{roaring_int_encode, RoaringInt}; +use vortex_roaring::{roaring_int_encode, RoaringInt, RoaringIntEncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -52,4 +55,8 @@ impl EncodingCompressor for RoaringIntCompressor { Some(CompressionTree::flat(self)), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&RoaringIntEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/runend.rs b/vortex-sampling-compressor/src/compressors/runend.rs index 6a30c8bf1c..4d97f3d368 100644 --- a/vortex-sampling-compressor/src/compressors/runend.rs +++ b/vortex-sampling-compressor/src/compressors/runend.rs @@ -1,9 +1,12 @@ +use std::collections::HashSet; + use vortex::array::Primitive; +use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; use vortex_runend::compress::runend_encode; -use vortex_runend::{RunEnd, RunEndArray}; +use vortex_runend::{RunEnd, RunEndArray, RunEndEncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -71,4 +74,8 @@ impl EncodingCompressor for RunEndCompressor { )), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&RunEndEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/sparse.rs b/vortex-sampling-compressor/src/compressors/sparse.rs index 1e562a588a..c951b8b85c 100644 --- a/vortex-sampling-compressor/src/compressors/sparse.rs +++ b/vortex-sampling-compressor/src/compressors/sparse.rs @@ -1,4 +1,7 @@ -use vortex::array::{Sparse, SparseArray}; +use std::collections::HashSet; + +use vortex::array::{Sparse, SparseArray, SparseEncoding}; +use vortex::encoding::EncodingRef; use vortex::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; @@ -47,4 +50,8 @@ impl EncodingCompressor for SparseCompressor { Some(CompressionTree::new(self, vec![indices.path, values.path])), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&SparseEncoding as EncodingRef]) + } } diff --git a/vortex-sampling-compressor/src/compressors/zigzag.rs b/vortex-sampling-compressor/src/compressors/zigzag.rs index f2b92a3883..81c1a17a70 100644 --- a/vortex-sampling-compressor/src/compressors/zigzag.rs +++ b/vortex-sampling-compressor/src/compressors/zigzag.rs @@ -1,4 +1,7 @@ -use vortex::array::PrimitiveArray; +use std::collections::HashSet; + +use vortex::array::{PrimitiveArray, SparseEncoding}; +use vortex::encoding::EncodingRef; use vortex::stats::{ArrayStatistics, Stat}; use vortex::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; @@ -51,4 +54,8 @@ impl EncodingCompressor for ZigZagCompressor { Some(CompressionTree::new(self, vec![compressed.path])), )) } + + fn used_encodings(&self) -> HashSet { + HashSet::from([&SparseEncoding as EncodingRef]) + } } From 227c532842ac20e25d4f23d609bcdfcef24144c9 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 8 Aug 2024 23:35:13 +0100 Subject: [PATCH 2/4] Fix vortex compressed benchmarks --- bench-vortex/src/tpch/mod.rs | 157 ++++++++++-------- encodings/alp/src/array.rs | 15 +- encodings/zigzag/src/zigzag.rs | 8 +- vortex-datafusion/examples/table_provider.rs | 3 +- vortex-datafusion/src/persistent/config.rs | 7 +- vortex-datafusion/src/persistent/execution.rs | 5 + vortex-datafusion/src/persistent/opener.rs | 9 +- vortex-datafusion/src/persistent/provider.rs | 10 +- .../src/compressors/zigzag.rs | 6 +- vortex-sampling-compressor/src/lib.rs | 8 + 10 files changed, 143 insertions(+), 85 deletions(-) diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 85d1b97da3..0f9ce3ec5c 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::fs; +use std::fs::create_dir_all; use std::path::Path; use std::sync::Arc; @@ -12,8 +13,9 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; use tokio::fs::OpenOptions; use vortex::array::{ChunkedArray, StructArray}; use vortex::arrow::FromArrowArray; +use vortex::compress::CompressionStrategy; use vortex::variants::StructArrayTrait; -use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; +use vortex::{Array, ArrayDType, Context, IntoArray, IntoArrayVariant}; use vortex_datafusion::memory::VortexMemTableOptions; use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions}; use vortex_datafusion::SessionContextExt; @@ -194,88 +196,100 @@ async fn register_vortex_file( schema: &Schema, enable_compression: bool, ) -> anyhow::Result<()> { - let path = if enable_compression { - file.with_extension("").with_extension("vtxcmp") + let vortex_dir = file.parent().unwrap().join(if enable_compression { + "vortex_compressed" } else { - file.with_extension("").with_extension("vtxucmp") - }; - let vtx_file = idempotent_async(&path, |vtx_file| async move { - let record_batches = session - .read_csv( - file.to_str().unwrap(), - CsvReadOptions::default() - .delimiter(b'|') - .has_header(false) - .file_extension("tbl") - .schema(schema), - ) - .await? - .collect() - .await?; + "vortex_uncompressed" + }); + create_dir_all(&vortex_dir)?; + let vtx_file = idempotent_async( + &vortex_dir + .join(file.file_name().unwrap()) + .with_extension("vxf"), + |vtx_file| async move { + let record_batches = session + .read_csv( + file.to_str().unwrap(), + CsvReadOptions::default() + .delimiter(b'|') + .has_header(false) + .file_extension("tbl") + .schema(schema), + ) + .await? + .collect() + .await?; - // Create a ChunkedArray from the set of chunks. - let sts = record_batches - .iter() - .cloned() - .map(Array::from) - .map(|a| a.into_struct().unwrap()) - .collect::>(); + // Create a ChunkedArray from the set of chunks. + let sts = record_batches + .iter() + .cloned() + .map(Array::from) + .map(|a| a.into_struct().unwrap()) + .collect::>(); - let mut arrays_map: HashMap, Vec> = HashMap::default(); - let mut types_map: HashMap, DType> = HashMap::default(); + let mut arrays_map: HashMap, Vec> = HashMap::default(); + let mut types_map: HashMap, DType> = HashMap::default(); - for st in sts.into_iter() { - let struct_dtype = st.dtype().as_struct().unwrap(); - let names = struct_dtype.names().iter(); - let types = struct_dtype.dtypes().iter(); + for st in sts.into_iter() { + let struct_dtype = st.dtype().as_struct().unwrap(); + let names = struct_dtype.names().iter(); + let types = struct_dtype.dtypes().iter(); - for (field_name, field_type) in names.zip(types) { - let val = arrays_map.entry(field_name.clone()).or_default(); - val.push(st.field_by_name(field_name).unwrap()); + for (field_name, field_type) in names.zip(types) { + let val = arrays_map.entry(field_name.clone()).or_default(); + val.push(st.field_by_name(field_name).unwrap()); - types_map.insert(field_name.clone(), field_type.clone()); + types_map.insert(field_name.clone(), field_type.clone()); + } } - } - - let fields = schema - .fields() - .iter() - .map(|field| { - let name: Arc = field.name().as_str().into(); - let dtype = types_map.get(&name).unwrap().clone(); - let chunks = arrays_map.remove(&name).unwrap(); - - ( - name.clone(), - ChunkedArray::try_new(chunks, dtype).unwrap().into_array(), - ) - }) - .collect::>(); - - let data = StructArray::from_fields(&fields).into_array(); - - let data = if enable_compression { - let compressor = SamplingCompressor::default(); - compressor.compress(&data, None)?.into_array() - } else { - data - }; - let f = OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(&vtx_file) - .await?; + let fields = schema + .fields() + .iter() + .map(|field| { + let name: Arc = field.name().as_str().into(); + let dtype = types_map.get(&name).unwrap().clone(); + let chunks = arrays_map.remove(&name).unwrap(); + + ( + name.clone(), + ChunkedArray::try_new(chunks, dtype).unwrap().into_array(), + ) + }) + .collect::>(); + + let data = StructArray::from_fields(&fields).into_array(); + + let data = if enable_compression { + let compressor = SamplingCompressor::default(); + compressor.compress(&data, None)?.into_array() + } else { + data + }; + + let f = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&vtx_file) + .await?; - let mut writer = LayoutWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + let mut writer = LayoutWriter::new(f); + writer = writer.write_array_columns(data).await?; + writer.finalize().await?; - anyhow::Ok(()) - }) + anyhow::Ok(()) + }, + ) .await?; + let ctx = if enable_compression { + Arc::new(Context::default().with_encodings(SamplingCompressor::default().used_encodings())) + } else { + Arc::new(Context::default()) + }; + let f = OpenOptions::new() .read(true) .write(true) @@ -294,6 +308,7 @@ async fn register_vortex_file( vtx_file.to_str().unwrap().to_string(), file_size, )], + ctx, ), )?; diff --git a/encodings/alp/src/array.rs b/encodings/alp/src/array.rs index cfc648ba51..61dbcc5c85 100644 --- a/encodings/alp/src/array.rs +++ b/encodings/alp/src/array.rs @@ -71,9 +71,18 @@ impl ALPArray { } pub fn encoded(&self) -> Array { - self.array() - .child(0, &self.metadata().encoded_dtype, self.len()) - .expect("Missing encoded array") + let child_opt = self + .array() + .child(0, &self.metadata().encoded_dtype, self.len()); + if child_opt.is_none() { + match self.array() { + Array::Data(_) => {} + Array::View(v) => { + dbg!(v.flatbuffer()); + } + } + } + child_opt.expect("Missing encoded array") } #[inline] diff --git a/encodings/zigzag/src/zigzag.rs b/encodings/zigzag/src/zigzag.rs index eb067611d0..0026c4a9f9 100644 --- a/encodings/zigzag/src/zigzag.rs +++ b/encodings/zigzag/src/zigzag.rs @@ -5,12 +5,14 @@ use vortex::validity::{ArrayValidity, LogicalValidity}; use vortex::variants::{ArrayVariants, PrimitiveArrayTrait}; use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor}; use vortex::{ - impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoArray, IntoCanonical, + impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoArray, IntoArrayVariant, + IntoCanonical, }; use vortex_dtype::{DType, PType}; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::compress::zigzag_encode; +use crate::zigzag_decode; impl_encoding!("vortex.zigzag", 21u16, ZigZag); @@ -83,6 +85,8 @@ impl ArrayStatisticsCompute for ZigZagArray {} impl IntoCanonical for ZigZagArray { fn into_canonical(self) -> VortexResult { - todo!("ZigZagArray::flatten") + Ok(Canonical::Primitive(zigzag_decode( + &self.encoded().into_primitive()?, + ))) } } diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs index 5d689bb2a9..693a40f5c1 100644 --- a/vortex-datafusion/examples/table_provider.rs +++ b/vortex-datafusion/examples/table_provider.rs @@ -11,7 +11,7 @@ use tokio::fs::OpenOptions; use url::Url; use vortex::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray}; use vortex::validity::Validity; -use vortex::IntoArray; +use vortex::{Context, IntoArray}; use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions}; use vortex_datafusion::persistent::provider::VortexFileTableProvider; use vortex_serde::layouts::writer::LayoutWriter; @@ -66,6 +66,7 @@ async fn main() -> anyhow::Result<()> { Field::new("numbers", DataType::UInt32, false), ])), vec![VortexFile::new(p, file_size)], + Arc::new(Context::default()), ); let provider = Arc::new(VortexFileTableProvider::try_new(url, config)?); diff --git a/vortex-datafusion/src/persistent/config.rs b/vortex-datafusion/src/persistent/config.rs index fccd646aa7..b870532591 100644 --- a/vortex-datafusion/src/persistent/config.rs +++ b/vortex-datafusion/src/persistent/config.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + use arrow_schema::SchemaRef; use chrono::TimeZone as _; use datafusion::datasource::listing::PartitionedFile; use object_store::path::Path; use object_store::ObjectMeta; +use vortex::Context; #[derive(Clone)] pub struct VortexFile { @@ -33,13 +36,15 @@ impl VortexFile { pub struct VortexTableOptions { pub(crate) data_files: Vec, pub(crate) schema: Option, + pub(crate) ctx: Arc, } impl VortexTableOptions { - pub fn new(schema: SchemaRef, data_files: Vec) -> Self { + pub fn new(schema: SchemaRef, data_files: Vec, ctx: Arc) -> Self { Self { data_files, schema: Some(schema), + ctx, } } } diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 3c54f0e729..792cb68459 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -9,6 +9,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, }; +use vortex::Context; use crate::persistent::opener::VortexFileOpener; @@ -18,6 +19,7 @@ pub struct VortexExec { metrics: ExecutionPlanMetricsSet, predicate: Option>, plan_properties: PlanProperties, + ctx: Arc, } impl VortexExec { @@ -26,6 +28,7 @@ impl VortexExec { metrics: ExecutionPlanMetricsSet, projection: Option<&Vec>, predicate: Option>, + ctx: Arc, ) -> DFResult { let projected_schema = project_schema(&file_scan_config.file_schema, projection)?; let plan_properties = PlanProperties::new( @@ -39,6 +42,7 @@ impl VortexExec { metrics, predicate, plan_properties, + ctx, }) } pub(crate) fn into_arc(self) -> Arc { @@ -88,6 +92,7 @@ impl ExecutionPlan for VortexExec { .runtime_env() .object_store(&self.file_scan_config.object_store_url)?; let opener = VortexFileOpener { + ctx: self.ctx.clone(), object_store, projection: self.file_scan_config.projection.clone(), batch_size: None, diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index fbdba24bb2..d3e275c42c 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -6,12 +6,14 @@ use datafusion_common::Result as DFResult; use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, TryStreamExt}; use object_store::ObjectStore; +use vortex::Context; use vortex_serde::io::ObjectStoreReadAt; use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder; -use vortex_serde::layouts::reader::context::LayoutDeserializer; +use vortex_serde::layouts::reader::context::{LayoutContext, LayoutDeserializer}; use vortex_serde::layouts::reader::projections::Projection; pub struct VortexFileOpener { + pub ctx: Arc, pub object_store: Arc, pub batch_size: Option, pub projection: Option>, @@ -23,7 +25,10 @@ impl FileOpener for VortexFileOpener { let read_at = ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone()); - let mut builder = VortexLayoutReaderBuilder::new(read_at, LayoutDeserializer::default()); + let mut builder = VortexLayoutReaderBuilder::new( + read_at, + LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())), + ); if let Some(batch_size) = self.batch_size { builder = builder.with_batch_size(batch_size); diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs index 3cb9cfdc81..d80aabd573 100644 --- a/vortex-datafusion/src/persistent/provider.rs +++ b/vortex-datafusion/src/persistent/provider.rs @@ -79,8 +79,14 @@ impl TableProvider for VortexFileTableProvider { ) .with_projection(projection.cloned()); - let exec = - VortexExec::try_new(file_scan_config, metrics, projection, predicate)?.into_arc(); + let exec = VortexExec::try_new( + file_scan_config, + metrics, + projection, + predicate, + self.config.ctx.clone(), + )? + .into_arc(); Ok(exec) } diff --git a/vortex-sampling-compressor/src/compressors/zigzag.rs b/vortex-sampling-compressor/src/compressors/zigzag.rs index 81c1a17a70..b2ff107f89 100644 --- a/vortex-sampling-compressor/src/compressors/zigzag.rs +++ b/vortex-sampling-compressor/src/compressors/zigzag.rs @@ -1,11 +1,11 @@ use std::collections::HashSet; -use vortex::array::{PrimitiveArray, SparseEncoding}; +use vortex::array::PrimitiveArray; use vortex::encoding::EncodingRef; use vortex::stats::{ArrayStatistics, Stat}; use vortex::{Array, ArrayDef, IntoArray}; use vortex_error::VortexResult; -use vortex_zigzag::{zigzag_encode, ZigZag, ZigZagArray}; +use vortex_zigzag::{zigzag_encode, ZigZag, ZigZagArray, ZigZagEncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -56,6 +56,6 @@ impl EncodingCompressor for ZigZagCompressor { } fn used_encodings(&self) -> HashSet { - HashSet::from([&SparseEncoding as EncodingRef]) + HashSet::from([&ZigZagEncoding as EncodingRef]) } } diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index ef6f56f82c..32e7f49fa8 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -5,6 +5,7 @@ use log::{debug, info, warn}; use vortex::array::{Chunked, ChunkedArray, Constant, Struct, StructArray}; use vortex::compress::{check_dtype_unchanged, check_validity_unchanged, CompressionStrategy}; use vortex::compute::slice; +use vortex::encoding::EncodingRef; use vortex::validity::Validity; use vortex::variants::StructArrayTrait; use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoCanonical}; @@ -70,6 +71,13 @@ impl CompressionStrategy for SamplingCompressor<'_> { fn compress(&self, array: &Array) -> VortexResult { Self::compress(self, array, None).map(|c| c.into_array()) } + + fn used_encodings(&self) -> HashSet { + self.compressors + .iter() + .flat_map(|c| c.used_encodings()) + .collect() + } } impl Default for SamplingCompressor<'_> { From 67d37a6ec46d38cc519700d40e01ee88e01f0a50 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 9 Aug 2024 14:40:01 +0100 Subject: [PATCH 3/4] less --- encodings/alp/src/array.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/encodings/alp/src/array.rs b/encodings/alp/src/array.rs index 61dbcc5c85..cfc648ba51 100644 --- a/encodings/alp/src/array.rs +++ b/encodings/alp/src/array.rs @@ -71,18 +71,9 @@ impl ALPArray { } pub fn encoded(&self) -> Array { - let child_opt = self - .array() - .child(0, &self.metadata().encoded_dtype, self.len()); - if child_opt.is_none() { - match self.array() { - Array::Data(_) => {} - Array::View(v) => { - dbg!(v.flatbuffer()); - } - } - } - child_opt.expect("Missing encoded array") + self.array() + .child(0, &self.metadata().encoded_dtype, self.len()) + .expect("Missing encoded array") } #[inline] From 583c0a4372d602f706076eb43ea4446d9333e2a6 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 9 Aug 2024 14:46:41 +0100 Subject: [PATCH 4/4] smaller diff --- bench-vortex/src/tpch/mod.rs | 140 +++++++++++++++++------------------ 1 file changed, 69 insertions(+), 71 deletions(-) diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 0f9ce3ec5c..60c9f9522b 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -202,86 +202,84 @@ async fn register_vortex_file( "vortex_uncompressed" }); create_dir_all(&vortex_dir)?; - let vtx_file = idempotent_async( - &vortex_dir - .join(file.file_name().unwrap()) - .with_extension("vxf"), - |vtx_file| async move { - let record_batches = session - .read_csv( - file.to_str().unwrap(), - CsvReadOptions::default() - .delimiter(b'|') - .has_header(false) - .file_extension("tbl") - .schema(schema), - ) - .await? - .collect() - .await?; + let output_file = &vortex_dir + .join(file.file_name().unwrap()) + .with_extension("vxf"); + let vtx_file = idempotent_async(output_file, |vtx_file| async move { + let record_batches = session + .read_csv( + file.to_str().unwrap(), + CsvReadOptions::default() + .delimiter(b'|') + .has_header(false) + .file_extension("tbl") + .schema(schema), + ) + .await? + .collect() + .await?; - // Create a ChunkedArray from the set of chunks. - let sts = record_batches - .iter() - .cloned() - .map(Array::from) - .map(|a| a.into_struct().unwrap()) - .collect::>(); + // Create a ChunkedArray from the set of chunks. + let sts = record_batches + .iter() + .cloned() + .map(Array::from) + .map(|a| a.into_struct().unwrap()) + .collect::>(); - let mut arrays_map: HashMap, Vec> = HashMap::default(); - let mut types_map: HashMap, DType> = HashMap::default(); + let mut arrays_map: HashMap, Vec> = HashMap::default(); + let mut types_map: HashMap, DType> = HashMap::default(); - for st in sts.into_iter() { - let struct_dtype = st.dtype().as_struct().unwrap(); - let names = struct_dtype.names().iter(); - let types = struct_dtype.dtypes().iter(); + for st in sts.into_iter() { + let struct_dtype = st.dtype().as_struct().unwrap(); + let names = struct_dtype.names().iter(); + let types = struct_dtype.dtypes().iter(); - for (field_name, field_type) in names.zip(types) { - let val = arrays_map.entry(field_name.clone()).or_default(); - val.push(st.field_by_name(field_name).unwrap()); + for (field_name, field_type) in names.zip(types) { + let val = arrays_map.entry(field_name.clone()).or_default(); + val.push(st.field_by_name(field_name).unwrap()); - types_map.insert(field_name.clone(), field_type.clone()); - } + types_map.insert(field_name.clone(), field_type.clone()); } + } + + let fields = schema + .fields() + .iter() + .map(|field| { + let name: Arc = field.name().as_str().into(); + let dtype = types_map.get(&name).unwrap().clone(); + let chunks = arrays_map.remove(&name).unwrap(); + + ( + name.clone(), + ChunkedArray::try_new(chunks, dtype).unwrap().into_array(), + ) + }) + .collect::>(); - let fields = schema - .fields() - .iter() - .map(|field| { - let name: Arc = field.name().as_str().into(); - let dtype = types_map.get(&name).unwrap().clone(); - let chunks = arrays_map.remove(&name).unwrap(); - - ( - name.clone(), - ChunkedArray::try_new(chunks, dtype).unwrap().into_array(), - ) - }) - .collect::>(); - - let data = StructArray::from_fields(&fields).into_array(); - - let data = if enable_compression { - let compressor = SamplingCompressor::default(); - compressor.compress(&data, None)?.into_array() - } else { - data - }; - - let f = OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(&vtx_file) - .await?; + let data = StructArray::from_fields(&fields).into_array(); + + let data = if enable_compression { + let compressor = SamplingCompressor::default(); + compressor.compress(&data, None)?.into_array() + } else { + data + }; - let mut writer = LayoutWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + let f = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&vtx_file) + .await?; - anyhow::Ok(()) - }, - ) + let mut writer = LayoutWriter::new(f); + writer = writer.write_array_columns(data).await?; + writer.finalize().await?; + + anyhow::Ok(()) + }) .await?; let ctx = if enable_compression {