Skip to content

Commit

Permalink
Faster canonicalization (#663)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Aug 21, 2024
1 parent 8b9654a commit 9d652f4
Showing 1 changed file with 54 additions and 49 deletions.
103 changes: 54 additions & 49 deletions vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use arrow_buffer::{BooleanBuffer, Buffer, MutableBuffer};
use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer};
use itertools::Itertools;
use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, PType, StructDType};
use vortex_error::{vortex_bail, vortex_err, ErrString, VortexResult};

use crate::accessor::ArrayAccessor;
use crate::array::chunked::ChunkedArray;
use crate::array::extension::ExtensionArray;
use crate::array::null::NullArray;
use crate::array::primitive::PrimitiveArray;
use crate::array::struct_::StructArray;
use crate::array::varbin::builder::VarBinBuilder;
use crate::array::varbin::VarBinArray;
use crate::array::BoolArray;
use crate::compute::unary::try_cast;
use crate::validity::Validity;
use crate::variants::StructArrayTrait;
use crate::{
Expand All @@ -20,12 +20,21 @@ use crate::{

impl IntoCanonical for ChunkedArray {
fn into_canonical(self) -> VortexResult<Canonical> {
try_canonicalize_chunks(self.chunks().collect(), self.dtype())
try_canonicalize_chunks(
self.chunks().collect(),
if self.dtype().is_nullable() {
self.logical_validity().into_validity()
} else {
Validity::NonNullable
},
self.dtype(),
)
}
}

pub(crate) fn try_canonicalize_chunks(
chunks: Vec<Array>,
validity: Validity,
dtype: &DType,
) -> VortexResult<Canonical> {
if chunks.is_empty() {
Expand All @@ -44,7 +53,7 @@ pub(crate) fn try_canonicalize_chunks(
// Structs can have their internal field pointers swizzled to push the chunking down
// one level internally without copying or decompressing any data.
DType::Struct(struct_dtype, _) => {
let struct_array = swizzle_struct_chunks(chunks.as_slice(), struct_dtype)?;
let struct_array = swizzle_struct_chunks(chunks.as_slice(), validity, struct_dtype)?;
Ok(Canonical::Struct(struct_array))
}

Expand Down Expand Up @@ -100,20 +109,20 @@ pub(crate) fn try_canonicalize_chunks(
todo!()
}

DType::Bool(nullability) => {
let bool_array = pack_bools(chunks.as_slice(), *nullability)?;
DType::Bool(_) => {
let bool_array = pack_bools(chunks.as_slice(), validity)?;
Ok(Canonical::Bool(bool_array))
}
DType::Primitive(ptype, nullability) => {
let prim_array = pack_primitives(chunks.as_slice(), *ptype, *nullability)?;
DType::Primitive(ptype, _) => {
let prim_array = pack_primitives(chunks.as_slice(), *ptype, validity)?;
Ok(Canonical::Primitive(prim_array))
}
DType::Utf8(nullability) => {
let varbin_array = pack_varbin(chunks.as_slice(), dtype, *nullability)?;
DType::Utf8(_) => {
let varbin_array = pack_varbin(chunks.as_slice(), validity, dtype)?;
Ok(Canonical::VarBin(varbin_array))
}
DType::Binary(nullability) => {
let varbin_array = pack_varbin(chunks.as_slice(), dtype, *nullability)?;
DType::Binary(_) => {
let varbin_array = pack_varbin(chunks.as_slice(), validity, dtype)?;
Ok(Canonical::VarBin(varbin_array))
}
DType::Null => {
Expand All @@ -131,15 +140,12 @@ pub(crate) fn try_canonicalize_chunks(
/// been checked to have the same DType already.
fn swizzle_struct_chunks(
chunks: &[Array],
validity: Validity,
struct_dtype: &StructDType,
) -> VortexResult<StructArray> {
let chunks: Vec<StructArray> = chunks.iter().map(StructArray::try_from).try_collect()?;

let len = chunks.iter().map(|chunk| chunk.len()).sum();
let validity = chunks
.iter()
.map(|chunk| chunk.logical_validity())
.collect::<Validity>();

let mut field_arrays = Vec::new();

Expand All @@ -163,16 +169,15 @@ fn swizzle_struct_chunks(
///
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
/// been checked to have the same DType already.
fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult<BoolArray> {
fn pack_bools(chunks: &[Array], validity: Validity) -> VortexResult<BoolArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let validity = validity_from_chunks(chunks, nullability);
let mut bools = Vec::with_capacity(len);
let mut buffer = BooleanBufferBuilder::new(len);
for chunk in chunks {
let chunk = chunk.clone().into_bool()?;
bools.extend(chunk.boolean_buffer().iter());
buffer.append_buffer(&chunk.boolean_buffer());
}

BoolArray::try_new(BooleanBuffer::from(bools), validity)
BoolArray::try_new(buffer.finish(), validity)
}

/// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single
Expand All @@ -183,10 +188,9 @@ fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult<BoolAr
fn pack_primitives(
chunks: &[Array],
ptype: PType,
nullability: Nullability,
validity: Validity,
) -> VortexResult<PrimitiveArray> {
let len: usize = chunks.iter().map(|chunk| chunk.len()).sum();
let validity = validity_from_chunks(chunks, nullability);
let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width());
for chunk in chunks {
let chunk = chunk.clone().into_primitive()?;
Expand All @@ -205,33 +209,34 @@ fn pack_primitives(
///
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
/// been checked to have the same DType already.
fn pack_varbin(
chunks: &[Array],
dtype: &DType,
_nullability: Nullability,
) -> VortexResult<VarBinArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let mut builder = VarBinBuilder::<i32>::with_capacity(len);
fn pack_varbin(chunks: &[Array], validity: Validity, dtype: &DType) -> VortexResult<VarBinArray> {
let len: usize = chunks.iter().map(|c| c.len()).sum();
let mut offsets = Vec::with_capacity(len + 1);
offsets.push(0);
let mut buffer = Vec::new();

for chunk in chunks {
let chunk = chunk.clone().into_varbin()?;
chunk.with_iterator(|iter| {
for datum in iter {
builder.push(datum);
}
})?;
let offsets_arr = try_cast(
chunk.offsets().into_primitive()?.array(),
&DType::Primitive(PType::I32, NonNullable),
)?
.into_primitive()?;
let offset_adjustment = *offsets.last().expect("offsets has at least one element");
offsets.extend(
offsets_arr
.maybe_null_slice::<i32>()
.iter()
.skip(1)
.map(|off| *off + offset_adjustment),
);
buffer.extend_from_slice(chunk.bytes().into_primitive()?.buffer());
}

Ok(builder.finish(dtype.clone()))
}

fn validity_from_chunks(chunks: &[Array], nullability: Nullability) -> Validity {
if nullability == Nullability::NonNullable {
Validity::NonNullable
} else {
chunks
.iter()
.map(|chunk| chunk.with_dyn(|a| a.logical_validity()))
.collect()
}
VarBinArray::try_new(
PrimitiveArray::from(offsets).into_array(),
PrimitiveArray::from(buffer).into_array(),
dtype.clone(),
validity,
)
}

0 comments on commit 9d652f4

Please sign in to comment.