From 2976d3db43cf90d0989eb182d5fe36d7aedc465b Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 7 Mar 2024 18:26:42 +0000 Subject: [PATCH] Cleanup Dict encoding (#82) --- Cargo.lock | 33 +-- bench-vortex/Cargo.toml | 1 - .../src/array/primitive/compute/cast.rs | 4 +- .../src/array/primitive/compute/scalar_at.rs | 9 +- vortex-array/src/array/varbin/mod.rs | 36 ++-- vortex-array/src/array/varbin/values_iter.rs | 100 ++++++++++ vortex-dict/Cargo.toml | 5 +- vortex-dict/benches/dict_compress.rs | 66 ++++-- vortex-dict/src/compress.rs | 188 +++++------------- 9 files changed, 245 insertions(+), 197 deletions(-) create mode 100644 vortex-array/src/array/varbin/values_iter.rs diff --git a/Cargo.lock b/Cargo.lock index fa090b0586..82c682fcea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -367,7 +367,6 @@ dependencies = [ "itertools 0.12.1", "log", "parquet", - "rand", "reqwest", "simplelog", "vortex-alp", @@ -481,9 +480,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0ba8f7aaa012f30d5b2861462f6708eccd49c3c39863fe083a308035f63d723" +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" dependencies = [ "jobserver", "libc", @@ -506,9 +505,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.34" +version = "0.4.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" dependencies = [ "android-tzdata", "iana-time-zone", @@ -556,18 +555,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.1" +version = "4.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c918d541ef2913577a0f9566e9ce27cb35b6df072075769e0b26cb5a554520da" +checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.1" +version = "4.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f3e7391dad68afb0c2ede1bf619f579a3dc9c2ec67f089baa397123a2f3d1eb" +checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" dependencies = [ "anstyle", "clap_lex", @@ -1314,9 +1313,9 @@ checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libloading" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2caa5afb8bf9f3a2652760ce7d4f62d21c4d5a423e68466fca30df82f2330164" +checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", "windows-targets 0.52.4", @@ -1330,18 +1329,18 @@ checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "linkme" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a78816ac097580aa7fd9d2e9cc7395dda34367c07267a8657516d4ad5e2e3d3" +checksum = "bb2cfee0de9bd869589fb9a015e155946d1be5ff415cb844c2caccc6cc4b5db9" dependencies = [ "linkme-impl", ] [[package]] name = "linkme-impl" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee9023a564f8bf7fe3da285a50c3e70de0df3e2bf277ff7c4e76d66008ef93b0" +checksum = "adf157a4dc5a29b7b464aa8fe7edeff30076e07e13646a1c3874f58477dc99f8" dependencies = [ "proc-macro2", "quote", @@ -2849,12 +2848,14 @@ name = "vortex-dict" version = "0.1.0" dependencies = [ "ahash", - "divan", + "criterion", "half", "hashbrown", "linkme", "log", "num-traits", + "rand", + "simplelog", "vortex-array", ] diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index b8e343228a..8bdc8ed232 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -31,7 +31,6 @@ log = "0.4.20" [dev-dependencies] criterion = { version = "0.5.1", features = ["html_reports"] } simplelog = { version = "0.12.1", features = ["paris"] } -rand = "0.8.5" [[bench]] name = "compress_benchmark" diff --git a/vortex-array/src/array/primitive/compute/cast.rs b/vortex-array/src/array/primitive/compute/cast.rs index 2e5eb32947..2f7931e386 100644 --- a/vortex-array/src/array/primitive/compute/cast.rs +++ b/vortex-array/src/array/primitive/compute/cast.rs @@ -26,8 +26,8 @@ fn cast(array: &PrimitiveArray) -> VortexResult> { .typed_data::<$E>() .iter() // TODO(ngates): allow configurable checked/unchecked casting - .map(|v| { - T::from(*v).ok_or_else(|| { + .map(|&v| { + T::from(v).ok_or_else(|| { VortexError::ComputeError(format!("Failed to cast {} to {:?}", v, T::PTYPE).into()) }) }) diff --git a/vortex-array/src/array/primitive/compute/scalar_at.rs b/vortex-array/src/array/primitive/compute/scalar_at.rs index 968001c535..bf550ef10c 100644 --- a/vortex-array/src/array/primitive/compute/scalar_at.rs +++ b/vortex-array/src/array/primitive/compute/scalar_at.rs @@ -8,14 +8,7 @@ use crate::scalar::{NullableScalar, Scalar, ScalarRef}; impl ScalarAtFn for PrimitiveArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { - Ok( - match_each_native_ptype!(self.ptype, |$T| self.buffer.typed_data::<$T>() - .get(index) - .unwrap() - .clone() - .into() - ), - ) + Ok(match_each_native_ptype!(self.ptype, |$T| self.typed_data::<$T>()[index].into())) } else { Ok(NullableScalar::none(self.dtype().clone()).boxed()) } diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 1c6f0ed8e0..d48457719c 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -6,11 +6,12 @@ use arrow::array::{make_array, Array as ArrowArray, ArrayData, AsArray}; use arrow::buffer::NullBuffer; use arrow::datatypes::UInt8Type; use linkme::distributed_slice; -use num_traits::{AsPrimitive, FromPrimitive, Unsigned}; +use num_traits::{FromPrimitive, Unsigned}; use crate::array::bool::BoolArray; use crate::array::downcast::DowncastArrayBuiltin; use crate::array::primitive::PrimitiveArray; +use crate::array::varbin::values_iter::{VarBinIter, VarBinPrimitiveIter}; use crate::array::{ check_slice_bounds, check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, ENCODINGS, @@ -20,7 +21,6 @@ use crate::compute::scalar_at::scalar_at; use crate::dtype::{DType, IntWidth, Nullability, Signedness}; use crate::error::{VortexError, VortexResult}; use crate::formatter::{ArrayDisplay, ArrayFormatter}; -use crate::match_each_native_ptype; use crate::ptype::NativePType; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsSet}; @@ -28,6 +28,7 @@ use crate::stats::{Stats, StatsSet}; mod compute; mod serde; mod stats; +mod values_iter; #[derive(Debug, Clone)] pub struct VarBinArray { @@ -189,20 +190,23 @@ impl VarBinArray { } } - pub fn bytes_at(&self, index: usize) -> VortexResult> { - // check_index_bounds(self, index)?; - - let (start, end): (usize, usize) = if let Some(p) = self.offsets.maybe_primitive() { - match_each_native_ptype!(p.ptype(), |$P| { - let buf = p.buffer().typed_data::<$P>(); - (buf[index].as_(), buf[index + 1].as_()) + pub fn iter_primitive(&self) -> VortexResult { + self.bytes() + .maybe_primitive() + .zip(self.offsets().maybe_primitive()) + .ok_or_else(|| { + VortexError::ComputeError("Bytes array was not a primitive array".into()) }) - } else { - ( - scalar_at(self.offsets(), index)?.try_into()?, - scalar_at(self.offsets(), index + 1)?.try_into()?, - ) - }; + .map(|(b, o)| VarBinPrimitiveIter::new(b.typed_data::(), o)) + } + + pub fn iter(&self) -> VarBinIter { + VarBinIter::new(self.bytes(), self.offsets()) + } + + pub fn bytes_at(&self, index: usize) -> VortexResult> { + let start = scalar_at(self.offsets(), index)?.try_into()?; + let end = scalar_at(self.offsets(), index + 1)?.try_into()?; let sliced = self.bytes().slice(start, end)?; let arr_ref = sliced.iter_arrow().combine_chunks(); Ok(arr_ref.as_primitive::().values().to_vec()) @@ -382,11 +386,11 @@ impl<'a> FromIterator> for VarBinArray { #[cfg(test)] mod test { - use crate::array::Array; use arrow::array::{AsArray, GenericStringArray as ArrowStringArray}; use crate::array::primitive::PrimitiveArray; use crate::array::varbin::VarBinArray; + use crate::array::Array; use crate::arrow::CombineChunks; use crate::compute::scalar_at::scalar_at; use crate::dtype::{DType, Nullability}; diff --git a/vortex-array/src/array/varbin/values_iter.rs b/vortex-array/src/array/varbin/values_iter.rs new file mode 100644 index 0000000000..26723f97ee --- /dev/null +++ b/vortex-array/src/array/varbin/values_iter.rs @@ -0,0 +1,100 @@ +use arrow::array::AsArray; +use arrow::datatypes::UInt8Type; + +use crate::array::primitive::PrimitiveArray; +use crate::array::Array; +use crate::arrow::CombineChunks; +use crate::compute::scalar_at::scalar_at; +use crate::match_each_native_ptype; +use num_traits::AsPrimitive; + +#[derive(Debug)] +pub struct VarBinPrimitiveIter<'a> { + bytes: &'a [u8], + offsets: &'a PrimitiveArray, + last_offset: usize, + idx: usize, +} + +impl<'a> VarBinPrimitiveIter<'a> { + pub fn new(bytes: &'a [u8], offsets: &'a PrimitiveArray) -> Self { + assert!(offsets.len() > 1); + let last_offset = Self::offset_at(offsets, 0); + Self { + bytes, + offsets, + last_offset, + idx: 1, + } + } + + pub(self) fn offset_at(array: &'a PrimitiveArray, index: usize) -> usize { + match_each_native_ptype!(array.ptype(), |$P| { + array.typed_data::<$P>()[index].as_() + }) + } +} + +impl<'a> Iterator for VarBinPrimitiveIter<'a> { + type Item = &'a [u8]; + + fn next(&mut self) -> Option { + if self.idx == self.offsets.len() { + return None; + } + + let next_offset = Self::offset_at(self.offsets, self.idx); + let slice_bytes = &self.bytes[self.last_offset..next_offset]; + self.last_offset = next_offset; + self.idx += 1; + Some(slice_bytes) + } +} + +#[derive(Debug)] +pub struct VarBinIter<'a> { + bytes: &'a dyn Array, + offsets: &'a dyn Array, + last_offset: usize, + idx: usize, +} + +impl<'a> VarBinIter<'a> { + pub fn new(bytes: &'a dyn Array, offsets: &'a dyn Array) -> Self { + assert!(offsets.len() > 1); + let last_offset = scalar_at(offsets, 0).unwrap().try_into().unwrap(); + Self { + bytes, + offsets, + last_offset, + idx: 1, + } + } +} + +impl<'a> Iterator for VarBinIter<'a> { + type Item = Vec; + + fn next(&mut self) -> Option { + if self.idx == self.offsets.len() { + return None; + } + + let next_offset: usize = scalar_at(self.offsets, self.idx) + .unwrap() + .try_into() + .unwrap(); + let slice_bytes = self.bytes.slice(self.last_offset, next_offset).unwrap(); + self.last_offset = next_offset; + self.idx += 1; + // TODO(robert): iter as primitive vs arrow + Some( + slice_bytes + .iter_arrow() + .combine_chunks() + .as_primitive::() + .values() + .to_vec(), + ) + } +} diff --git a/vortex-dict/Cargo.toml b/vortex-dict/Cargo.toml index 4c426f9924..9dec75e82e 100644 --- a/vortex-dict/Cargo.toml +++ b/vortex-dict/Cargo.toml @@ -24,7 +24,10 @@ num-traits = "0.2.17" workspace = true [dev-dependencies] -divan = "0.1.14" +criterion = { version = "0.5.1", features = ["html_reports"] } +log = "0.4.20" +rand = "0.8.5" +simplelog = { version = "0.12.1", features = ["paris"] } [[bench]] name = "dict_compress" diff --git a/vortex-dict/benches/dict_compress.rs b/vortex-dict/benches/dict_compress.rs index a2f4ec0e73..88de92fa3f 100644 --- a/vortex-dict/benches/dict_compress.rs +++ b/vortex-dict/benches/dict_compress.rs @@ -1,21 +1,59 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use rand::distributions::{Alphanumeric, Uniform}; +use rand::prelude::SliceRandom; +use rand::{thread_rng, Rng}; use vortex::array::primitive::PrimitiveArray; use vortex::array::varbin::VarBinArray; -use vortex::array::{Array, ArrayRef}; -use vortex::dtype::DType; -use vortex::dtype::Nullability::NonNullable; -use vortex_dict::dict_encode_varbin; +use vortex::array::Array; -fn main() { - divan::main(); +fn gen_primitive_dict(len: usize, uniqueness: f64) -> PrimitiveArray { + let mut rng = thread_rng(); + let value_range = len as f64 * uniqueness; + let range = Uniform::new(-(value_range / 2.0) as i32, (value_range / 2.0) as i32); + let data: Vec = (0..len).map(|_| rng.sample(range)).collect(); + + PrimitiveArray::from(data) +} + +fn gen_varbin_dict(len: usize, uniqueness: f64) -> VarBinArray { + let mut rng = thread_rng(); + let uniq_cnt = (len as f64 * uniqueness) as usize; + let dict: Vec = (0..uniq_cnt) + .map(|_| { + (&mut rng) + .sample_iter(&Alphanumeric) + .take(16) + .map(char::from) + .collect() + }) + .collect(); + let words: Vec<&str> = (0..len) + .map(|_| dict.choose(&mut rng).unwrap().as_str()) + .collect(); + VarBinArray::from(words) +} + +fn dict_encode_primitive(arr: &PrimitiveArray) -> usize { + let (codes, values) = vortex_dict::dict_encode_primitive(arr); + (codes.nbytes() + values.nbytes()) / arr.nbytes() +} + +fn dict_encode_varbin(arr: &VarBinArray) -> usize { + let (codes, values) = vortex_dict::dict_encode_varbin(arr); + (codes.nbytes() + values.nbytes()) / arr.nbytes() } -#[divan::bench(args = [100_000, 10_000_000])] -fn dict_compress_varbin(n: usize) -> ArrayRef { - // Compress an array of 1-byte strings. - let offsets = PrimitiveArray::from((0..=n).map(|i| i as i64).collect::>()).boxed(); - let bytes = PrimitiveArray::from(vec![1u8; n]).boxed(); - let vb = VarBinArray::new(offsets, bytes, DType::Utf8(NonNullable), None); +fn dict_encode(c: &mut Criterion) { + let primitive_arr = gen_primitive_dict(1_000_000, 0.00005); + let varbin_arr = gen_varbin_dict(1_000_000, 0.00005); - let (_codes, values) = dict_encode_varbin(&vb); - values.boxed() + c.bench_function("dict_encode_primitives", |b| { + b.iter(|| black_box(dict_encode_primitive(&primitive_arr))); + }); + c.bench_function("dict_encode_varbin", |b| { + b.iter(|| black_box(dict_encode_varbin(&varbin_arr))); + }); } + +criterion_group!(benches, dict_encode); +criterion_main!(benches); diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index abb1ae083a..ab78e9e0b0 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -4,14 +4,12 @@ use ahash::RandomState; use hashbrown::hash_map::{Entry, RawEntryMut}; use hashbrown::HashMap; use log::debug; -use num_traits::{AsPrimitive, FromPrimitive, Unsigned}; +use num_traits::AsPrimitive; -use vortex::array::downcast::DowncastArrayBuiltin; -use vortex::array::primitive::PrimitiveArray; -use vortex::array::varbin::VarBinArray; -use vortex::array::{Array, ArrayKind, ArrayRef, CloneOptionalArray}; +use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; +use vortex::array::varbin::{VarBinArray, VarBinEncoding}; +use vortex::array::{Array, ArrayKind, ArrayRef}; use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; -use vortex::compute::scalar_at::scalar_at; use vortex::dtype::DType; use vortex::error::VortexResult; use vortex::match_each_native_ptype; @@ -29,10 +27,9 @@ impl EncodingCompression for DictEncoding { _config: &CompressConfig, ) -> Option<&dyn EncodingCompression> { // TODO(robert): Add support for VarBinView - if !matches!( - ArrayKind::from(array), - ArrayKind::Primitive(_) | ArrayKind::VarBin(_) - ) { + if array.encoding().id() != &PrimitiveEncoding::ID + && array.encoding().id() != &VarBinEncoding::ID + { debug!("Skipping Dict: not primitive or varbin"); return None; }; @@ -103,119 +100,45 @@ impl PartialEq for Value { impl Eq for Value {} -// TODO(robert): Use distinct count instead of len for width estimation pub fn dict_encode_primitive(array: &PrimitiveArray) -> (PrimitiveArray, PrimitiveArray) { match_each_native_ptype!(array.ptype(), |$P| { - if array.len() < u8::MAX as usize { - dict_encode_typed_primitive::(array) - } else if array.len() < u16::MAX as usize { - dict_encode_typed_primitive::(array) - } else if array.len() < u32::MAX as usize { - dict_encode_typed_primitive::(array) - } else { - dict_encode_typed_primitive::(array) - } + dict_encode_typed_primitive::<$P>(array) }) } /// Dictionary encode primitive array with given PType. /// Null values in the original array are encoded in the dictionary. -fn dict_encode_typed_primitive< - K: NativePType + Unsigned + FromPrimitive + AsPrimitive, - T: NativePType, ->( +fn dict_encode_typed_primitive( array: &PrimitiveArray, ) -> (PrimitiveArray, PrimitiveArray) { let mut lookup_dict: HashMap, u64> = HashMap::new(); - let mut codes: Vec = Vec::new(); + let mut codes: Vec = Vec::new(); let mut values: Vec = Vec::new(); - for v in array.buffer().typed_data::() { - let code: K = match lookup_dict.entry(Value(*v)) { - Entry::Occupied(o) => K::from_u64(*o.get()).unwrap(), + for &v in array.buffer().typed_data::() { + let code = match lookup_dict.entry(Value(v)) { + Entry::Occupied(o) => *o.get(), Entry::Vacant(vac) => { - let next_code = ::from_usize(values.len()).unwrap(); + let next_code = values.len() as u64; vac.insert(next_code.as_()); - values.push(*v); + values.push(v); next_code } }; codes.push(code) } - ( - PrimitiveArray::from_nullable(codes, array.validity().clone_optional()), - PrimitiveArray::from(values), - ) -} - -// TODO(robert): Estimation of offsets array width could be better if we had average size and distinct count -macro_rules! dict_encode_offsets_codes { - ($bytes_len:expr, $offsets_len:expr, | $_1:tt $codes:ident, $_2:tt $offsets:ident | $($body:tt)*) => ({ - macro_rules! __with__ {( $_1 $codes:ident, $_2 $offsets:ident ) => ( $($body)* )} - if $bytes_len < u32::MAX as usize { - if $offsets_len < u8::MAX as usize { - __with__! { u32, u8 } - } else if $offsets_len < u16::MAX as usize { - __with__! { u32, u16 } - } else if $offsets_len < u32::MAX as usize { - __with__! { u32, u32 } - } else { - __with__! { u32, u64 } - } - } else { - if $offsets_len < u8::MAX as usize { - __with__! { u64, u8 } - } else if $offsets_len < u16::MAX as usize { - __with__! { u64, u16 } - } else if $offsets_len < u32::MAX as usize { - __with__! { u64, u32 } - } else { - __with__! { u64, u64 } - } - } - }) + (PrimitiveArray::from(codes), PrimitiveArray::from(values)) } /// Dictionary encode varbin array. Specializes for primitive byte arrays to avoid double copying pub fn dict_encode_varbin(array: &VarBinArray) -> (PrimitiveArray, VarBinArray) { - if let Some(bytes) = array.bytes().maybe_primitive() { - let bytes = bytes.buffer().typed_data::(); - return if let Some(offsets) = array.offsets().maybe_primitive() { - match_each_native_ptype!(offsets.ptype(), |$P| { - let offsets = offsets.buffer().typed_data::<$P>(); - - dict_encode_offsets_codes!(bytes.len(), array.offsets().len(), |$O, $C| { - dict_encode_typed_varbin::<$O, $C, _, &[u8]>( - array.dtype().clone(), - |idx| bytes_at_primitive(offsets, bytes, idx), - array.len(), - array.validity() - ) - }) - }) - } else { - dict_encode_offsets_codes!(bytes.len(), array.offsets().len(), |$O, $C| { - dict_encode_typed_varbin::<$O, $C, _, &[u8]>( - array.dtype().clone(), - |idx| bytes_at(array.offsets(), bytes, idx), - array.len(), - array.validity() - ) - }) - }; - } - - dict_encode_offsets_codes!(array.bytes().len(), array.offsets().len(), |$O, $C| { - dict_encode_typed_varbin::<$O, $C, _, Vec>( - array.dtype().clone(), - |idx| array.bytes_at(idx).unwrap(), - array.len(), - array.validity() - ) - }) + array + .iter_primitive() + .map(|prim_iter| dict_encode_typed_varbin(array.dtype().clone(), prim_iter)) + .unwrap_or_else(|_| dict_encode_typed_varbin(array.dtype().clone(), array.iter())) } -fn bytes_at_primitive<'a, T: NativePType + AsPrimitive>( +fn lookup_bytes<'a, T: NativePType + AsPrimitive>( offsets: &'a [T], bytes: &'a [u8], idx: usize, @@ -225,47 +148,34 @@ fn bytes_at_primitive<'a, T: NativePType + AsPrimitive>( &bytes[begin..end] } -fn bytes_at<'a>(offsets: &'a dyn Array, bytes: &'a [u8], idx: usize) -> &'a [u8] { - let start: usize = scalar_at(offsets, idx).unwrap().try_into().unwrap(); - let stop: usize = scalar_at(offsets, idx + 1).unwrap().try_into().unwrap(); - &bytes[start..stop] -} - -fn dict_encode_typed_varbin( - dtype: DType, - value_lookup: V, - len: usize, - validity: Option<&dyn Array>, -) -> (PrimitiveArray, VarBinArray) +fn dict_encode_typed_varbin(dtype: DType, values: I) -> (PrimitiveArray, VarBinArray) where - O: NativePType + Unsigned + FromPrimitive + AsPrimitive, - K: NativePType + Unsigned + FromPrimitive + AsPrimitive, - V: Fn(usize) -> U, + I: Iterator, U: AsRef<[u8]>, { + let (lower, _) = values.size_hint(); let hasher = RandomState::new(); - let mut lookup_dict: HashMap = HashMap::with_hasher(()); - let mut codes: Vec = Vec::with_capacity(len); + let mut lookup_dict: HashMap = HashMap::with_hasher(()); + let mut codes: Vec = Vec::with_capacity(lower); let mut bytes: Vec = Vec::new(); - let mut offsets: Vec = Vec::new(); - offsets.push(O::zero()); + let mut offsets: Vec = Vec::new(); + offsets.push(0); - for i in 0..len { - let byte_val = value_lookup(i); + for byte_val in values { let byte_ref = byte_val.as_ref(); let value_hash = hasher.hash_one(byte_ref); let raw_entry = lookup_dict.raw_entry_mut().from_hash(value_hash, |idx| { - byte_ref == bytes_at_primitive(offsets.as_slice(), bytes.as_slice(), idx.as_()) + byte_ref == lookup_bytes(offsets.as_slice(), bytes.as_slice(), idx.as_()) }); - let code: K = match raw_entry { + let code = match raw_entry { RawEntryMut::Occupied(o) => *o.into_key(), RawEntryMut::Vacant(vac) => { - let next_code = ::from_usize(offsets.len() - 1).unwrap(); + let next_code = offsets.len() as u64 - 1; bytes.extend_from_slice(byte_ref); - offsets.push(::from_usize(bytes.len()).unwrap()); + offsets.push(bytes.len() as u64); vac.insert_with_hasher(value_hash, next_code, (), |idx| { - hasher.hash_one(bytes_at_primitive( + hasher.hash_one(lookup_bytes( offsets.as_slice(), bytes.as_slice(), idx.as_(), @@ -277,7 +187,7 @@ where codes.push(code) } ( - PrimitiveArray::from_nullable(codes, validity.clone_optional()), + PrimitiveArray::from(codes), VarBinArray::new( PrimitiveArray::from(offsets).boxed(), PrimitiveArray::from(bytes).boxed(), @@ -299,8 +209,8 @@ mod test { #[test] fn encode_primitive() { let arr = PrimitiveArray::from(vec![1, 1, 3, 3, 3]); - let (codes, values) = dict_encode_typed_primitive::(&arr); - assert_eq!(codes.buffer().typed_data::(), &[0, 0, 1, 1, 1]); + let (codes, values) = dict_encode_typed_primitive::(&arr); + assert_eq!(codes.buffer().typed_data::(), &[0, 0, 1, 1, 1]); assert_eq!(values.buffer().typed_data::(), &[1, 3]); } @@ -316,11 +226,11 @@ mod test { Some(3), None, ]); - let (codes, values) = dict_encode_typed_primitive::(&arr); - assert_eq!(codes.buffer().typed_data::(), &[0, 0, 1, 2, 2, 1, 2, 1]); - assert!(!codes.is_valid(2)); - assert!(!codes.is_valid(5)); - assert!(!codes.is_valid(7)); + let (codes, values) = dict_encode_typed_primitive::(&arr); + assert_eq!( + codes.buffer().typed_data::(), + &[0, 0, 1, 2, 2, 1, 2, 1] + ); assert_eq!(scalar_at(values.as_ref(), 0), Ok(1.into())); assert_eq!(scalar_at(values.as_ref(), 2), Ok(3.into())); } @@ -329,7 +239,7 @@ mod test { fn encode_varbin() { let arr = VarBinArray::from(vec!["hello", "world", "hello", "again", "world"]); let (codes, values) = dict_encode_varbin(&arr); - assert_eq!(codes.buffer().typed_data::(), &[0, 1, 0, 2, 1]); + assert_eq!(codes.buffer().typed_data::(), &[0, 1, 0, 2, 1]); assert_eq!( String::from_utf8(values.bytes_at(0).unwrap()).unwrap(), "hello" @@ -359,10 +269,10 @@ mod test { .into_iter() .collect(); let (codes, values) = dict_encode_varbin(&arr); - assert_eq!(codes.buffer().typed_data::(), &[0, 1, 2, 0, 1, 3, 2, 1]); - assert!(!codes.is_valid(1)); - assert!(!codes.is_valid(4)); - assert!(!codes.is_valid(7)); + assert_eq!( + codes.buffer().typed_data::(), + &[0, 1, 2, 0, 1, 3, 2, 1] + ); assert_eq!( String::from_utf8(values.bytes_at(0).unwrap()).unwrap(), "hello" @@ -387,9 +297,9 @@ mod test { "ab".as_bytes() ); assert_eq!( - values.offsets().as_primitive().typed_data::(), + values.offsets().as_primitive().typed_data::(), &[0, 1, 2] ); - assert_eq!(codes.typed_data::(), &[0u8, 0, 1, 1, 0, 1, 0, 1]); + assert_eq!(codes.typed_data::(), &[0u64, 0, 1, 1, 0, 1, 0, 1]); } }