From 9d652f4d2715e014e4db93b6bb5b1a71056f431e Mon Sep 17 00:00:00 2001
From: Robert Kruszewski <github@robertk.io>
Date: Wed, 21 Aug 2024 10:24:45 +0100
Subject: [PATCH] Faster canonicalization (#663)

---
 vortex-array/src/array/chunked/canonical.rs | 103 ++++++++++----------
 1 file changed, 54 insertions(+), 49 deletions(-)

diff --git a/vortex-array/src/array/chunked/canonical.rs b/vortex-array/src/array/chunked/canonical.rs
index abd35a96e1..9cdcea07a0 100644
--- a/vortex-array/src/array/chunked/canonical.rs
+++ b/vortex-array/src/array/chunked/canonical.rs
@@ -1,17 +1,17 @@
-use arrow_buffer::{BooleanBuffer, Buffer, MutableBuffer};
+use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer};
 use itertools::Itertools;
-use vortex_dtype::{DType, Nullability, PType, StructDType};
+use vortex_dtype::Nullability::NonNullable;
+use vortex_dtype::{DType, PType, StructDType};
 use vortex_error::{vortex_bail, vortex_err, ErrString, VortexResult};
 
-use crate::accessor::ArrayAccessor;
 use crate::array::chunked::ChunkedArray;
 use crate::array::extension::ExtensionArray;
 use crate::array::null::NullArray;
 use crate::array::primitive::PrimitiveArray;
 use crate::array::struct_::StructArray;
-use crate::array::varbin::builder::VarBinBuilder;
 use crate::array::varbin::VarBinArray;
 use crate::array::BoolArray;
+use crate::compute::unary::try_cast;
 use crate::validity::Validity;
 use crate::variants::StructArrayTrait;
 use crate::{
@@ -20,12 +20,21 @@ use crate::{
 
 impl IntoCanonical for ChunkedArray {
     fn into_canonical(self) -> VortexResult<Canonical> {
-        try_canonicalize_chunks(self.chunks().collect(), self.dtype())
+        try_canonicalize_chunks(
+            self.chunks().collect(),
+            if self.dtype().is_nullable() {
+                self.logical_validity().into_validity()
+            } else {
+                Validity::NonNullable
+            },
+            self.dtype(),
+        )
     }
 }
 
 pub(crate) fn try_canonicalize_chunks(
     chunks: Vec<Array>,
+    validity: Validity,
     dtype: &DType,
 ) -> VortexResult<Canonical> {
     if chunks.is_empty() {
@@ -44,7 +53,7 @@ pub(crate) fn try_canonicalize_chunks(
         // Structs can have their internal field pointers swizzled to push the chunking down
         // one level internally without copying or decompressing any data.
         DType::Struct(struct_dtype, _) => {
-            let struct_array = swizzle_struct_chunks(chunks.as_slice(), struct_dtype)?;
+            let struct_array = swizzle_struct_chunks(chunks.as_slice(), validity, struct_dtype)?;
             Ok(Canonical::Struct(struct_array))
         }
 
@@ -100,20 +109,20 @@ pub(crate) fn try_canonicalize_chunks(
             todo!()
         }
 
-        DType::Bool(nullability) => {
-            let bool_array = pack_bools(chunks.as_slice(), *nullability)?;
+        DType::Bool(_) => {
+            let bool_array = pack_bools(chunks.as_slice(), validity)?;
             Ok(Canonical::Bool(bool_array))
         }
-        DType::Primitive(ptype, nullability) => {
-            let prim_array = pack_primitives(chunks.as_slice(), *ptype, *nullability)?;
+        DType::Primitive(ptype, _) => {
+            let prim_array = pack_primitives(chunks.as_slice(), *ptype, validity)?;
             Ok(Canonical::Primitive(prim_array))
         }
-        DType::Utf8(nullability) => {
-            let varbin_array = pack_varbin(chunks.as_slice(), dtype, *nullability)?;
+        DType::Utf8(_) => {
+            let varbin_array = pack_varbin(chunks.as_slice(), validity, dtype)?;
             Ok(Canonical::VarBin(varbin_array))
         }
-        DType::Binary(nullability) => {
-            let varbin_array = pack_varbin(chunks.as_slice(), dtype, *nullability)?;
+        DType::Binary(_) => {
+            let varbin_array = pack_varbin(chunks.as_slice(), validity, dtype)?;
             Ok(Canonical::VarBin(varbin_array))
         }
         DType::Null => {
@@ -131,15 +140,12 @@ pub(crate) fn try_canonicalize_chunks(
 /// been checked to have the same DType already.
 fn swizzle_struct_chunks(
     chunks: &[Array],
+    validity: Validity,
     struct_dtype: &StructDType,
 ) -> VortexResult<StructArray> {
     let chunks: Vec<StructArray> = chunks.iter().map(StructArray::try_from).try_collect()?;
 
     let len = chunks.iter().map(|chunk| chunk.len()).sum();
-    let validity = chunks
-        .iter()
-        .map(|chunk| chunk.logical_validity())
-        .collect::<Validity>();
 
     let mut field_arrays = Vec::new();
 
@@ -163,16 +169,15 @@ fn swizzle_struct_chunks(
 ///
 /// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
 /// been checked to have the same DType already.
-fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult<BoolArray> {
+fn pack_bools(chunks: &[Array], validity: Validity) -> VortexResult<BoolArray> {
     let len = chunks.iter().map(|chunk| chunk.len()).sum();
-    let validity = validity_from_chunks(chunks, nullability);
-    let mut bools = Vec::with_capacity(len);
+    let mut buffer = BooleanBufferBuilder::new(len);
     for chunk in chunks {
         let chunk = chunk.clone().into_bool()?;
-        bools.extend(chunk.boolean_buffer().iter());
+        buffer.append_buffer(&chunk.boolean_buffer());
     }
 
-    BoolArray::try_new(BooleanBuffer::from(bools), validity)
+    BoolArray::try_new(buffer.finish(), validity)
 }
 
 /// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single
@@ -183,10 +188,9 @@ fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult<BoolAr
 fn pack_primitives(
     chunks: &[Array],
     ptype: PType,
-    nullability: Nullability,
+    validity: Validity,
 ) -> VortexResult<PrimitiveArray> {
     let len: usize = chunks.iter().map(|chunk| chunk.len()).sum();
-    let validity = validity_from_chunks(chunks, nullability);
     let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width());
     for chunk in chunks {
         let chunk = chunk.clone().into_primitive()?;
@@ -205,33 +209,34 @@ fn pack_primitives(
 ///
 /// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
 /// been checked to have the same DType already.
-fn pack_varbin(
-    chunks: &[Array],
-    dtype: &DType,
-    _nullability: Nullability,
-) -> VortexResult<VarBinArray> {
-    let len = chunks.iter().map(|chunk| chunk.len()).sum();
-    let mut builder = VarBinBuilder::<i32>::with_capacity(len);
+fn pack_varbin(chunks: &[Array], validity: Validity, dtype: &DType) -> VortexResult<VarBinArray> {
+    let len: usize = chunks.iter().map(|c| c.len()).sum();
+    let mut offsets = Vec::with_capacity(len + 1);
+    offsets.push(0);
+    let mut buffer = Vec::new();
 
     for chunk in chunks {
         let chunk = chunk.clone().into_varbin()?;
-        chunk.with_iterator(|iter| {
-            for datum in iter {
-                builder.push(datum);
-            }
-        })?;
+        let offsets_arr = try_cast(
+            chunk.offsets().into_primitive()?.array(),
+            &DType::Primitive(PType::I32, NonNullable),
+        )?
+        .into_primitive()?;
+        let offset_adjustment = *offsets.last().expect("offsets has at least one element");
+        offsets.extend(
+            offsets_arr
+                .maybe_null_slice::<i32>()
+                .iter()
+                .skip(1)
+                .map(|off| *off + offset_adjustment),
+        );
+        buffer.extend_from_slice(chunk.bytes().into_primitive()?.buffer());
     }
 
-    Ok(builder.finish(dtype.clone()))
-}
-
-fn validity_from_chunks(chunks: &[Array], nullability: Nullability) -> Validity {
-    if nullability == Nullability::NonNullable {
-        Validity::NonNullable
-    } else {
-        chunks
-            .iter()
-            .map(|chunk| chunk.with_dyn(|a| a.logical_validity()))
-            .collect()
-    }
+    VarBinArray::try_new(
+        PrimitiveArray::from(offsets).into_array(),
+        PrimitiveArray::from(buffer).into_array(),
+        dtype.clone(),
+        validity,
+    )
 }