Skip to content

Commit

Permalink
Implement take for StreamArrayReader (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdcasale authored Apr 29, 2024
1 parent 09b70f4 commit d641f99
Show file tree
Hide file tree
Showing 10 changed files with 535 additions and 31 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ arrow-array = "51.0.0"
arrow-buffer = "51.0.0"
arrow-csv = "51.0.0"
arrow-data = "51.0.0"
arrow-ipc = "51.0.0"
arrow-schema = "51.0.0"
arrow-select = "51.0.0"
bindgen = "0.69.4"
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub const BATCH_SIZE: usize = 65_536;
pub fn open_vortex(path: &Path) -> VortexResult<OwnedArray> {
let mut file = File::open(path)?;

let mut reader = StreamReader::try_new(&mut file).unwrap();
let mut reader = StreamReader::try_new(&mut file)?;
let mut reader = reader.next()?.unwrap();
let dtype = reader.dtype().clone();
let mut chunks = vec![];
Expand Down
14 changes: 14 additions & 0 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,24 @@ impl PrimitiveArray<'_> {
}

pub fn scalar_buffer<T: NativePType>(&self) -> ScalarBuffer<T> {
assert_eq!(
T::PTYPE,
self.ptype(),
"Attempted to get scalar buffer of type {} from array of type {}",
T::PTYPE,
self.ptype(),
);
ScalarBuffer::new(self.buffer().clone().into(), 0, self.len())
}

pub fn typed_data<T: NativePType>(&self) -> &[T] {
assert_eq!(
T::PTYPE,
self.ptype(),
"Attempted to get typed_data of type {} from array of type {}",
T::PTYPE,
self.ptype(),
);
self.buffer().typed_data::<T>()
}

Expand Down
44 changes: 40 additions & 4 deletions vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ impl EncodingCompression for BitPackedEncoding {
return Ok(array.to_static());
}

let packed = bitpack(&parray, bit_width)?;

let validity = ctx.compress_validity(parray.validity())?;

let packed = bitpack(&parray, bit_width)?;
let patches = if num_exceptions > 0 {
Some(ctx.auxiliary("patches").compress(
&bitpack_patches(&parray, bit_width, num_exceptions),
Expand All @@ -97,7 +95,42 @@ impl EncodingCompression for BitPackedEncoding {
}
}

fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> VortexResult<OwnedArray> {
pub(crate) fn bitpack_encode(
array: PrimitiveArray<'_>,
bit_width: usize,
) -> VortexResult<BitPackedArray> {
let bit_width_freq = array
.statistics()
.compute_as::<ListScalarVec<usize>>(Stat::BitWidthFreq)
.ok_or_else(|| vortex_err!("Could not compute bit width frequencies"))?
.0;
let num_exceptions = count_exceptions(bit_width, &bit_width_freq);

if bit_width >= array.ptype().bit_width() {
// Nothing we can do
vortex_bail!(
"Cannot pack -- specified bit width is greater than or equal to the type's bit width"
)
}

let packed = bitpack(&array, bit_width)?;
let patches = if num_exceptions > 0 {
Some(bitpack_patches(&array, bit_width, num_exceptions))
} else {
None
};

BitPackedArray::try_new(
packed,
array.validity(),
patches,
bit_width,
array.dtype().clone(),
array.len(),
)
}

pub(crate) fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> VortexResult<OwnedArray> {
// We know the min is > 0, so it's safe to re-interpret signed integers as unsigned.
// TODO(ngates): we should implement this using a vortex cast to centralize this hack.
let bytes = match_integers_by_width!(parray.ptype(), |$P| {
Expand Down Expand Up @@ -329,6 +362,9 @@ fn bytes_per_exception(ptype: PType) -> usize {
}

fn count_exceptions(bit_width: usize, bit_width_freq: &[usize]) -> usize {
if (bit_width_freq.len()) <= bit_width {
return 0;
}
bit_width_freq[bit_width + 1..].iter().sum()
}

Expand Down
37 changes: 36 additions & 1 deletion vortex-fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use ::serde::{Deserialize, Serialize};
pub use compress::*;
use vortex::array::primitive::PrimitiveArray;
use vortex::stats::ArrayStatisticsCompute;
use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
Expand Down Expand Up @@ -121,6 +122,14 @@ impl BitPackedArray<'_> {
&Validity::DTYPE,
))
}

pub fn encode(array: Array<'_>, bit_width: usize) -> VortexResult<BitPackedArray> {
if let Ok(parray) = PrimitiveArray::try_from(array) {
Ok(bitpack_encode(parray, bit_width)?)
} else {
vortex_bail!("Bitpacking can only encode primitive arrays");
}
}
}

impl ArrayFlatten for BitPackedArray<'_> {
Expand Down Expand Up @@ -194,8 +203,9 @@ mod test {
use vortex::compute::scalar_at::scalar_at;
use vortex::compute::slice::slice;
use vortex::encoding::EncodingRef;
use vortex::IntoArray;

use crate::BitPackedEncoding;
use crate::{BitPackedArray, BitPackedEncoding};

#[test]
fn slice_within_block() {
Expand Down Expand Up @@ -248,4 +258,29 @@ mod test {
((9215 % 63) as u8).into()
);
}

#[test]
fn test_encode() {
let values = vec![Some(1), None, Some(1), None, Some(1), None, Some(u64::MAX)];
let uncompressed = PrimitiveArray::from_nullable_vec(values);
let packed = BitPackedArray::encode(uncompressed.into_array(), 1).unwrap();
let expected = &[1, 0, 1, 0, 1, 0, u64::MAX];
let results = packed
.into_array()
.flatten_primitive()
.unwrap()
.typed_data::<u64>()
.to_vec();
assert_eq!(results, expected);
}

#[test]
fn test_encode_too_wide() {
let values = vec![Some(1u8), None, Some(1), None, Some(1), None];
let uncompressed = PrimitiveArray::from_nullable_vec(values);
let _packed = BitPackedArray::encode(uncompressed.clone().into_array(), 8)
.expect_err("Cannot pack value into the same width");
let _packed = BitPackedArray::encode(uncompressed.into_array(), 9)
.expect_err("Cannot pack value into larger width");
}
}
11 changes: 11 additions & 0 deletions vortex-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,21 @@ walkdir = { workspace = true }
criterion = { workspace = true }
rand = { workspace = true }
simplelog = { workspace = true }
vortex-fastlanes = { path = "../vortex-fastlanes" }
vortex-alp = { path = "../vortex-alp" }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-ipc = { workspace = true, features = ["lz4"] }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }

[lints]
workspace = true

[[bench]]
name = "ipc_take"
harness = false

[[bench]]
name = "ipc_array_reader_take"
harness = false
51 changes: 51 additions & 0 deletions vortex-ipc/benches/ipc_array_reader_take.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::io::Cursor;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use vortex::array::primitive::PrimitiveArray;
use vortex::{IntoArray, SerdeContext};
use vortex_ipc::iter::FallibleLendingIterator;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;
use vortex_schema::{DType, Nullability, Signedness};

// 100 record batches, 100k rows each
// take from the first 20 batches and last batch
// compare with arrow
fn ipc_array_reader_take(c: &mut Criterion) {
let indices = (0..20)
.map(|i| i * 100_000 + 1)
.chain([98 * 100_000 + 1])
.collect_vec();
let mut group = c.benchmark_group("ipc_array_reader_take");

group.bench_function("vortex", |b| {
let mut buffer = vec![];
{
let mut cursor = Cursor::new(&mut buffer);
let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap();
writer
.write_schema(&DType::Int(
32.into(),
Signedness::Signed,
Nullability::Nullable,
))
.unwrap();
(0..100i32).for_each(|i| {
let data = PrimitiveArray::from(vec![i; 100_000]).into_array();
writer.write_batch(&data).unwrap();
});
}
let indices = indices.clone().into_array();

b.iter(|| {
let mut cursor = Cursor::new(&buffer);
let mut reader = StreamReader::try_new(&mut cursor).unwrap();
let array_reader = reader.next().unwrap().unwrap();
black_box(array_reader.take(&indices))
});
});
}

criterion_group!(benches, ipc_array_reader_take);
criterion_main!(benches);
65 changes: 51 additions & 14 deletions vortex-ipc/benches/ipc_take.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,67 @@
use std::io::Cursor;
use std::sync::Arc;

use arrow::ipc::reader::StreamReader as ArrowStreamReader;
use arrow_array::{Array, Int32Array, RecordBatch};
use arrow_ipc::writer::{IpcWriteOptions, StreamWriter as ArrowStreamWriter};
use arrow_ipc::{CompressionType, MetadataVersion};
use arrow_schema::{DataType, Field, Schema};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use vortex::array::primitive::PrimitiveArray;
use vortex::compress::CompressCtx;
use vortex::compute::take::take;
use vortex::{IntoArray, SerdeContext};
use vortex_ipc::iter::FallibleLendingIterator;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;

fn ipc_take(c: &mut Criterion) {
let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array();
let data = PrimitiveArray::from(vec![5; 3_000_000]).into_array();
//
// c.bench_function("take_data", |b| {
// b.iter(|| black_box(take(&data, &indices).unwrap()));
// });
let mut group = c.benchmark_group("ipc_take");
let indices = Int32Array::from(vec![10, 11, 12, 13, 100_000, 2_999_999]);
group.bench_function("arrow", |b| {
let mut buffer = vec![];
{
let field = Field::new("uid", DataType::Int32, true);
let schema = Schema::new(vec![field]);
let options = IpcWriteOptions::try_new(32, false, MetadataVersion::V5)
.unwrap()
.try_with_compression(Some(CompressionType::LZ4_FRAME))
.unwrap();
let mut writer =
ArrowStreamWriter::try_new_with_options(&mut buffer, &schema, options).unwrap();
let array = Int32Array::from((0i32..3_000_000).rev().collect_vec());

// Try running take over an ArrayView.
let mut buffer = vec![];
{
let mut cursor = Cursor::new(&mut buffer);
let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap();
writer.write_array(&data).unwrap();
}
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
writer.write(&batch).unwrap();
}

c.bench_function("take_view", |b| {
b.iter(|| {
let mut cursor = Cursor::new(&buffer);
let mut reader = ArrowStreamReader::try_new(&mut cursor, None).unwrap();
let batch = reader.next().unwrap().unwrap();
let array_from_batch = batch.column(0);
let array = array_from_batch
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
black_box(arrow_select::take::take(array, &indices, None).unwrap());
});
});

group.bench_function("vortex", |b| {
let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array();
let uncompressed = PrimitiveArray::from((0i32..3_000_000).rev().collect_vec()).into_array();
let ctx = CompressCtx::default();
let compressed = ctx.compress(&uncompressed, None).unwrap();

// Try running take over an ArrayView.
let mut buffer = vec![];
{
let mut cursor = Cursor::new(&mut buffer);
let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap();
writer.write_array(&compressed).unwrap();
}
b.iter(|| {
let mut cursor = Cursor::new(&buffer);
let mut reader = StreamReader::try_new(&mut cursor).unwrap();
Expand Down
Loading

0 comments on commit d641f99

Please sign in to comment.