Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pack_varbin #674

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 48 additions & 36 deletions vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer};
use itertools::Itertools;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, PType, StructDType};
use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_error::{vortex_bail, vortex_err, ErrString, VortexResult};

use crate::array::chunked::ChunkedArray;
Expand All @@ -11,7 +10,8 @@ use crate::array::primitive::PrimitiveArray;
use crate::array::struct_::StructArray;
use crate::array::varbin::VarBinArray;
use crate::array::BoolArray;
use crate::compute::unary::try_cast;
use crate::compute::slice;
use crate::compute::unary::{scalar_at_unchecked, try_cast};
use crate::validity::Validity;
use crate::variants::StructArrayTrait;
use crate::{
Expand Down Expand Up @@ -210,70 +210,82 @@ fn pack_varbin(chunks: &[Array], validity: Validity, dtype: &DType) -> VortexRes
let len: usize = chunks.iter().map(|c| c.len()).sum();
let mut offsets = Vec::with_capacity(len + 1);
offsets.push(0);
let mut buffer = Vec::new();
let mut data_bytes = Vec::new();

for chunk in chunks {
let chunk = chunk.clone().into_varbin()?;
let offsets_arr = try_cast(
chunk.offsets().into_primitive()?.array(),
&DType::Primitive(PType::I32, NonNullable),
&DType::Primitive(PType::I32, Nullability::NonNullable),
)?
.into_primitive()?;
let offset_adjustment = *offsets.last().expect("offsets has at least one element");

let first_offset_value: usize =
usize::try_from(&scalar_at_unchecked(offsets_arr.array(), 0))?;
let last_offset_value: usize = usize::try_from(&scalar_at_unchecked(
offsets_arr.array(),
offsets_arr.len() - 1,
))?;
let primitive_bytes =
slice(&chunk.bytes(), first_offset_value, last_offset_value)?.into_primitive()?;
data_bytes.extend_from_slice(primitive_bytes.buffer());

let adjustment_from_previous = *offsets.last().expect("offsets has at least one element");
offsets.extend(
offsets_arr
.maybe_null_slice::<i32>()
.iter()
.skip(1)
.map(|off| *off + offset_adjustment),
.map(|off| off + adjustment_from_previous - first_offset_value as i32),
);
buffer.extend_from_slice(chunk.bytes().into_primitive()?.buffer());
}

VarBinArray::try_new(
PrimitiveArray::from(offsets).into_array(),
PrimitiveArray::from(buffer).into_array(),
PrimitiveArray::from(data_bytes).into_array(),
dtype.clone(),
validity,
)
}

#[cfg(test)]
mod tests {
use rstest::{fixture, rstest};
use vortex_dtype::Nullability;
use vortex_dtype::{DType, Nullability};

use super::*;

#[fixture]
fn binary_array() -> Array {
let values = PrimitiveArray::from(
"hello worldhello world this is a long string"
.as_bytes()
.to_vec(),
);
let offsets = PrimitiveArray::from(vec![0, 11, 44]);
use crate::accessor::ArrayAccessor;
use crate::array::builder::VarBinBuilder;
use crate::array::chunked::canonical::pack_varbin;
use crate::array::VarBinArray;
use crate::compute::slice;
use crate::validity::Validity;

VarBinArray::try_new(
offsets.into_array(),
values.into_array(),
DType::Utf8(Nullability::NonNullable),
Validity::NonNullable,
)
.unwrap()
.into_array()
fn varbin_array() -> VarBinArray {
let mut builder = VarBinBuilder::<i32>::with_capacity(4);
builder.push_value("foo");
builder.push_value("bar");
builder.push_value("baz");
builder.push_value("quak");
builder.finish(DType::Utf8(Nullability::NonNullable))
}

#[rstest]
fn test_pack_varbin(binary_array: Array) {
let arrays = vec![binary_array.clone(), binary_array.clone()];
let packed_array = pack_varbin(
&arrays,
#[test]
pub fn pack_sliced_varbin() {
let array1 = slice(varbin_array().array(), 1, 3).unwrap();
let array2 = slice(varbin_array().array(), 2, 4).unwrap();
let packed = pack_varbin(
&[array1, array2],
Validity::NonNullable,
&DType::Utf8(Nullability::NonNullable),
)
.unwrap();

assert_eq!(packed_array.len(), binary_array.len() * arrays.len());
assert_eq!(packed.len(), 4);
let values = packed
.with_iterator(|iter| {
iter.flatten()
.map(|v| unsafe { String::from_utf8_unchecked(v.to_vec()) })
.collect::<Vec<_>>()
})
.unwrap();
assert_eq!(values, &["bar", "baz", "baz", "quak"]);
}
}
7 changes: 4 additions & 3 deletions vortex-array/src/array/varbin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ impl<O: NativePType> VarBinBuilder<O> {
}

#[inline]
pub fn push_value(&mut self, value: &[u8]) {
pub fn push_value(&mut self, value: impl AsRef<[u8]>) {
let slice = value.as_ref();
self.offsets
.push(O::from(self.data.len() + value.len()).unwrap());
self.data.extend_from_slice(value);
.push(O::from(self.data.len() + slice.len()).unwrap());
self.data.extend_from_slice(slice);
self.validity.append_non_null();
}

Expand Down
Loading