Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement take for StreamArrayReader #266

Merged
merged 5 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ arrow-array = "51.0.0"
arrow-buffer = "51.0.0"
arrow-csv = "51.0.0"
arrow-data = "51.0.0"
arrow-ipc = "51.0.0"
arrow-schema = "51.0.0"
arrow-select = "51.0.0"
bindgen = "0.69.4"
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub const BATCH_SIZE: usize = 65_536;
pub fn open_vortex(path: &Path) -> VortexResult<OwnedArray> {
let mut file = File::open(path)?;

let mut reader = StreamReader::try_new(&mut file).unwrap();
let mut reader = StreamReader::try_new(&mut file)?;
let mut reader = reader.next()?.unwrap();
let dtype = reader.dtype().clone();
let mut chunks = vec![];
Expand Down
14 changes: 14 additions & 0 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,24 @@ impl PrimitiveArray<'_> {
}

pub fn scalar_buffer<T: NativePType>(&self) -> ScalarBuffer<T> {
assert_eq!(
T::PTYPE,
self.ptype(),
"Attempted to get scalar buffer of type {} from array of type {}",
T::PTYPE,
self.ptype(),
);
ScalarBuffer::new(self.buffer().clone().into(), 0, self.len())
}

pub fn typed_data<T: NativePType>(&self) -> &[T] {
assert_eq!(
T::PTYPE,
self.ptype(),
"Attempted to get typed_data of type {} from array of type {}",
T::PTYPE,
self.ptype(),
);
self.buffer().typed_data::<T>()
}

Expand Down
44 changes: 40 additions & 4 deletions vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ impl EncodingCompression for BitPackedEncoding {
return Ok(array.to_static());
}

let packed = bitpack(&parray, bit_width)?;

let validity = ctx.compress_validity(parray.validity())?;

let packed = bitpack(&parray, bit_width)?;
let patches = if num_exceptions > 0 {
Some(ctx.auxiliary("patches").compress(
&bitpack_patches(&parray, bit_width, num_exceptions),
Expand All @@ -97,7 +95,42 @@ impl EncodingCompression for BitPackedEncoding {
}
}

fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> VortexResult<OwnedArray> {
pub(crate) fn bitpack_encode(
array: PrimitiveArray<'_>,
bit_width: usize,
) -> VortexResult<BitPackedArray> {
let bit_width_freq = array
.statistics()
.compute_as::<ListScalarVec<usize>>(Stat::BitWidthFreq)
.ok_or_else(|| vortex_err!("Could not compute bit width frequencies"))?
.0;
let num_exceptions = count_exceptions(bit_width, &bit_width_freq);

if bit_width >= array.ptype().bit_width() {
// Nothing we can do
vortex_bail!(
"Cannot pack -- specified bit width is greater than or equal to the type's bit width"
)
}

let packed = bitpack(&array, bit_width)?;
let patches = if num_exceptions > 0 {
Some(bitpack_patches(&array, bit_width, num_exceptions))
} else {
None
};

BitPackedArray::try_new(
packed,
array.validity(),
patches,
bit_width,
array.dtype().clone(),
array.len(),
)
}

pub(crate) fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> VortexResult<OwnedArray> {
// We know the min is > 0, so it's safe to re-interpret signed integers as unsigned.
// TODO(ngates): we should implement this using a vortex cast to centralize this hack.
let bytes = match_integers_by_width!(parray.ptype(), |$P| {
Expand Down Expand Up @@ -329,6 +362,9 @@ fn bytes_per_exception(ptype: PType) -> usize {
}

fn count_exceptions(bit_width: usize, bit_width_freq: &[usize]) -> usize {
if (bit_width_freq.len()) <= bit_width {
return 0;
}
bit_width_freq[bit_width + 1..].iter().sum()
}

Expand Down
37 changes: 36 additions & 1 deletion vortex-fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use ::serde::{Deserialize, Serialize};
pub use compress::*;
use vortex::array::primitive::PrimitiveArray;
use vortex::stats::ArrayStatisticsCompute;
use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
Expand Down Expand Up @@ -121,6 +122,14 @@ impl BitPackedArray<'_> {
&Validity::DTYPE,
))
}

pub fn encode(array: Array<'_>, bit_width: usize) -> VortexResult<BitPackedArray> {
if let Ok(parray) = PrimitiveArray::try_from(array) {
Ok(bitpack_encode(parray, bit_width)?)
} else {
vortex_bail!("Bitpacking can only encode primitive arrays");
}
}
}

impl ArrayFlatten for BitPackedArray<'_> {
Expand Down Expand Up @@ -194,8 +203,9 @@ mod test {
use vortex::compute::scalar_at::scalar_at;
use vortex::compute::slice::slice;
use vortex::encoding::EncodingRef;
use vortex::IntoArray;

use crate::BitPackedEncoding;
use crate::{BitPackedArray, BitPackedEncoding};

#[test]
fn slice_within_block() {
Expand Down Expand Up @@ -248,4 +258,29 @@ mod test {
((9215 % 63) as u8).into()
);
}

#[test]
fn test_encode() {
let values = vec![Some(1), None, Some(1), None, Some(1), None, Some(u64::MAX)];
let uncompressed = PrimitiveArray::from_nullable_vec(values);
let packed = BitPackedArray::encode(uncompressed.into_array(), 1).unwrap();
let expected = &[1, 0, 1, 0, 1, 0, u64::MAX];
let results = packed
.into_array()
.flatten_primitive()
.unwrap()
.typed_data::<u64>()
.to_vec();
assert_eq!(results, expected);
}

#[test]
fn test_encode_too_wide() {
let values = vec![Some(1u8), None, Some(1), None, Some(1), None];
let uncompressed = PrimitiveArray::from_nullable_vec(values);
let _packed = BitPackedArray::encode(uncompressed.clone().into_array(), 8)
.expect_err("Cannot pack value into the same width");
let _packed = BitPackedArray::encode(uncompressed.into_array(), 9)
.expect_err("Cannot pack value into larger width");
}
}
11 changes: 11 additions & 0 deletions vortex-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,21 @@ walkdir = { workspace = true }
criterion = { workspace = true }
rand = { workspace = true }
simplelog = { workspace = true }
vortex-fastlanes = { path = "../vortex-fastlanes" }
gatesn marked this conversation as resolved.
Show resolved Hide resolved
vortex-alp = { path = "../vortex-alp" }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-ipc = { workspace = true, features = ["lz4"] }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }

[lints]
workspace = true

[[bench]]
name = "ipc_take"
harness = false

[[bench]]
name = "ipc_array_reader_take"
harness = false
51 changes: 51 additions & 0 deletions vortex-ipc/benches/ipc_array_reader_take.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::io::Cursor;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use vortex::array::primitive::PrimitiveArray;
use vortex::{IntoArray, SerdeContext};
use vortex_ipc::iter::FallibleLendingIterator;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;
use vortex_schema::{DType, Nullability, Signedness};

// 100 record batches, 100k rows each
// take from the first 20 batches and last batch
// compare with arrow
fn ipc_array_reader_take(c: &mut Criterion) {
let indices = (0..20)
.map(|i| i * 100_000 + 1)
.chain([98 * 100_000 + 1])
.collect_vec();
let mut group = c.benchmark_group("ipc_array_reader_take");

group.bench_function("vortex", |b| {
let mut buffer = vec![];
{
let mut cursor = Cursor::new(&mut buffer);
let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap();
writer
.write_schema(&DType::Int(
32.into(),
Signedness::Signed,
Nullability::Nullable,
))
.unwrap();
(0..100i32).for_each(|i| {
let data = PrimitiveArray::from(vec![i; 100_000]).into_array();
writer.write_batch(&data).unwrap();
});
}
let indices = indices.clone().into_array();

b.iter(|| {
let mut cursor = Cursor::new(&buffer);
let mut reader = StreamReader::try_new(&mut cursor).unwrap();
let array_reader = reader.next().unwrap().unwrap();
black_box(array_reader.take(&indices))
});
});
}

criterion_group!(benches, ipc_array_reader_take);
criterion_main!(benches);
65 changes: 51 additions & 14 deletions vortex-ipc/benches/ipc_take.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,67 @@
use std::io::Cursor;
use std::sync::Arc;

use arrow::ipc::reader::StreamReader as ArrowStreamReader;
use arrow_array::{Array, Int32Array, RecordBatch};
use arrow_ipc::writer::{IpcWriteOptions, StreamWriter as ArrowStreamWriter};
use arrow_ipc::{CompressionType, MetadataVersion};
use arrow_schema::{DataType, Field, Schema};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use vortex::array::primitive::PrimitiveArray;
use vortex::compress::CompressCtx;
use vortex::compute::take::take;
use vortex::{IntoArray, SerdeContext};
use vortex_ipc::iter::FallibleLendingIterator;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;

fn ipc_take(c: &mut Criterion) {
let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array();
let data = PrimitiveArray::from(vec![5; 3_000_000]).into_array();
//
// c.bench_function("take_data", |b| {
// b.iter(|| black_box(take(&data, &indices).unwrap()));
// });
let mut group = c.benchmark_group("ipc_take");
let indices = Int32Array::from(vec![10, 11, 12, 13, 100_000, 2_999_999]);
group.bench_function("arrow", |b| {
let mut buffer = vec![];
{
let field = Field::new("uid", DataType::Int32, true);
let schema = Schema::new(vec![field]);
let options = IpcWriteOptions::try_new(32, false, MetadataVersion::V5)
.unwrap()
.try_with_compression(Some(CompressionType::LZ4_FRAME))
.unwrap();
let mut writer =
ArrowStreamWriter::try_new_with_options(&mut buffer, &schema, options).unwrap();
let array = Int32Array::from((0i32..3_000_000).rev().collect_vec());

// Try running take over an ArrayView.
let mut buffer = vec![];
{
let mut cursor = Cursor::new(&mut buffer);
let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap();
writer.write_array(&data).unwrap();
}
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
writer.write(&batch).unwrap();
}

c.bench_function("take_view", |b| {
b.iter(|| {
let mut cursor = Cursor::new(&buffer);
let mut reader = ArrowStreamReader::try_new(&mut cursor, None).unwrap();
let batch = reader.next().unwrap().unwrap();
let array_from_batch = batch.column(0);
let array = array_from_batch
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
black_box(arrow_select::take::take(array, &indices, None).unwrap());
});
});

group.bench_function("vortex", |b| {
let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array();
let uncompressed = PrimitiveArray::from((0i32..3_000_000).rev().collect_vec()).into_array();
let ctx = CompressCtx::default();
let compressed = ctx.compress(&uncompressed, None).unwrap();

// Try running take over an ArrayView.
let mut buffer = vec![];
{
let mut cursor = Cursor::new(&mut buffer);
let mut writer = StreamWriter::try_new(&mut cursor, SerdeContext::default()).unwrap();
writer.write_array(&compressed).unwrap();
}
b.iter(|| {
let mut cursor = Cursor::new(&buffer);
let mut reader = StreamReader::try_new(&mut cursor).unwrap();
Expand Down
Loading