Skip to content

Commit

Permalink
Fix pack_varbin (#674)
Browse files Browse the repository at this point in the history
Sliced VarBin arrays don't rewrite offsets and bytes so we need to
perform the adjustment when concatenating them
  • Loading branch information
robert3005 authored Aug 21, 2024
1 parent 502eea0 commit 8679d85
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 39 deletions.
84 changes: 48 additions & 36 deletions vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer};
use itertools::Itertools;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, PType, StructDType};
use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_error::{vortex_bail, vortex_err, ErrString, VortexResult};

use crate::array::chunked::ChunkedArray;
Expand All @@ -11,7 +10,8 @@ use crate::array::primitive::PrimitiveArray;
use crate::array::struct_::StructArray;
use crate::array::varbin::VarBinArray;
use crate::array::BoolArray;
use crate::compute::unary::try_cast;
use crate::compute::slice;
use crate::compute::unary::{scalar_at_unchecked, try_cast};
use crate::validity::Validity;
use crate::variants::StructArrayTrait;
use crate::{
Expand Down Expand Up @@ -210,70 +210,82 @@ fn pack_varbin(chunks: &[Array], validity: Validity, dtype: &DType) -> VortexRes
let len: usize = chunks.iter().map(|c| c.len()).sum();
let mut offsets = Vec::with_capacity(len + 1);
offsets.push(0);
let mut buffer = Vec::new();
let mut data_bytes = Vec::new();

for chunk in chunks {
let chunk = chunk.clone().into_varbin()?;
let offsets_arr = try_cast(
chunk.offsets().into_primitive()?.array(),
&DType::Primitive(PType::I32, NonNullable),
&DType::Primitive(PType::I32, Nullability::NonNullable),
)?
.into_primitive()?;
let offset_adjustment = *offsets.last().expect("offsets has at least one element");

let first_offset_value: usize =
usize::try_from(&scalar_at_unchecked(offsets_arr.array(), 0))?;
let last_offset_value: usize = usize::try_from(&scalar_at_unchecked(
offsets_arr.array(),
offsets_arr.len() - 1,
))?;
let primitive_bytes =
slice(&chunk.bytes(), first_offset_value, last_offset_value)?.into_primitive()?;
data_bytes.extend_from_slice(primitive_bytes.buffer());

let adjustment_from_previous = *offsets.last().expect("offsets has at least one element");
offsets.extend(
offsets_arr
.maybe_null_slice::<i32>()
.iter()
.skip(1)
.map(|off| *off + offset_adjustment),
.map(|off| off + adjustment_from_previous - first_offset_value as i32),
);
buffer.extend_from_slice(chunk.bytes().into_primitive()?.buffer());
}

VarBinArray::try_new(
PrimitiveArray::from(offsets).into_array(),
PrimitiveArray::from(buffer).into_array(),
PrimitiveArray::from(data_bytes).into_array(),
dtype.clone(),
validity,
)
}

#[cfg(test)]
mod tests {
use rstest::{fixture, rstest};
use vortex_dtype::Nullability;
use vortex_dtype::{DType, Nullability};

use super::*;

#[fixture]
fn binary_array() -> Array {
let values = PrimitiveArray::from(
"hello worldhello world this is a long string"
.as_bytes()
.to_vec(),
);
let offsets = PrimitiveArray::from(vec![0, 11, 44]);
use crate::accessor::ArrayAccessor;
use crate::array::builder::VarBinBuilder;
use crate::array::chunked::canonical::pack_varbin;
use crate::array::VarBinArray;
use crate::compute::slice;
use crate::validity::Validity;

VarBinArray::try_new(
offsets.into_array(),
values.into_array(),
DType::Utf8(Nullability::NonNullable),
Validity::NonNullable,
)
.unwrap()
.into_array()
fn varbin_array() -> VarBinArray {
let mut builder = VarBinBuilder::<i32>::with_capacity(4);
builder.push_value("foo");
builder.push_value("bar");
builder.push_value("baz");
builder.push_value("quak");
builder.finish(DType::Utf8(Nullability::NonNullable))
}

#[rstest]
fn test_pack_varbin(binary_array: Array) {
let arrays = vec![binary_array.clone(), binary_array.clone()];
let packed_array = pack_varbin(
&arrays,
#[test]
pub fn pack_sliced_varbin() {
let array1 = slice(varbin_array().array(), 1, 3).unwrap();
let array2 = slice(varbin_array().array(), 2, 4).unwrap();
let packed = pack_varbin(
&[array1, array2],
Validity::NonNullable,
&DType::Utf8(Nullability::NonNullable),
)
.unwrap();

assert_eq!(packed_array.len(), binary_array.len() * arrays.len());
assert_eq!(packed.len(), 4);
let values = packed
.with_iterator(|iter| {
iter.flatten()
.map(|v| unsafe { String::from_utf8_unchecked(v.to_vec()) })
.collect::<Vec<_>>()
})
.unwrap();
assert_eq!(values, &["bar", "baz", "baz", "quak"]);
}
}
7 changes: 4 additions & 3 deletions vortex-array/src/array/varbin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ impl<O: NativePType> VarBinBuilder<O> {
}

#[inline]
pub fn push_value(&mut self, value: &[u8]) {
pub fn push_value(&mut self, value: impl AsRef<[u8]>) {
let slice = value.as_ref();
self.offsets
.push(O::from(self.data.len() + value.len()).unwrap());
self.data.extend_from_slice(value);
.push(O::from(self.data.len() + slice.len()).unwrap());
self.data.extend_from_slice(slice);
self.validity.append_non_null();
}

Expand Down

0 comments on commit 8679d85

Please sign in to comment.