Skip to content

Commit

Permalink
Chunked Take (#141)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Kruszewski <[email protected]>
  • Loading branch information
gatesn and robert3005 authored Mar 26, 2024
1 parent a95cbe9 commit c56faf5
Show file tree
Hide file tree
Showing 54 changed files with 724 additions and 316 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ permissions:
jobs:
build:
name: 'build'
runs-on: ubuntu-latest-medium
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bench_vortex::taxi_data::{take_taxi_data, write_taxi_data};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;

use bench_vortex::serde::{take_taxi_data, write_taxi_data};
use vortex::array::ENCODINGS;

fn random_access(c: &mut Criterion) {
Expand Down
10 changes: 10 additions & 0 deletions bench-vortex/src/bin/serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use bench_vortex::setup_logger;
use bench_vortex::taxi_data::{take_taxi_data, write_taxi_data};
use log::LevelFilter;

pub fn main() {
setup_logger(LevelFilter::Debug);
let taxi_spiral = write_taxi_data();
let rows = take_taxi_data(&taxi_spiral, &[10, 11, 12, 13]); //, 100_000, 3_000_000]);
println!("TAKE TAXI DATA: {:?}", rows);
}
7 changes: 2 additions & 5 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ use vortex::compress::{CompressConfig, CompressCtx};
use vortex::formatter::display_tree;
use vortex_alp::ALPEncoding;
use vortex_datetime::DateTimeEncoding;
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};
use vortex_ree::REEEncoding;
use vortex_roaring::RoaringBoolEncoding;
use vortex_schema::DType;

pub mod serde;
pub mod taxi_data;

pub fn idempotent(name: &str, f: impl FnOnce(&mut File)) -> PathBuf {
Expand All @@ -39,8 +37,7 @@ pub fn idempotent(name: &str, f: impl FnOnce(&mut File)) -> PathBuf {
path.to_path_buf()
}

#[allow(dead_code)]
fn setup_logger(level: LevelFilter) {
pub fn setup_logger(level: LevelFilter) {
TermLogger::init(
level,
Config::default(),
Expand All @@ -54,7 +51,7 @@ pub fn enumerate_arrays() -> Vec<EncodingRef> {
println!("FOUND {:?}", ENCODINGS.iter().map(|e| e.id()).collect_vec());
vec![
&ALPEncoding,
&DictEncoding,
//&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
&DateTimeEncoding,
Expand Down
87 changes: 0 additions & 87 deletions bench-vortex/src/serde.rs

This file was deleted.

65 changes: 63 additions & 2 deletions bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
use std::path::PathBuf;
use arrow_array::RecordBatchReader;
use itertools::Itertools;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ProjectionMask;
use std::fs::File;
use std::path::{Path, PathBuf};
use vortex::array::chunked::ChunkedArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{ArrayRef, IntoArray};
use vortex::arrow::FromArrowType;
use vortex::compute::take::take;
use vortex::formatter::display_tree;
use vortex::ptype::PType;
use vortex::serde::{ReadCtx, WriteCtx};
use vortex_schema::DType;

use crate::idempotent;
use crate::{compress_ctx, idempotent};

pub fn download_taxi_data() -> PathBuf {
idempotent("yellow-tripdata-2023-11.parquet", |file| {
Expand All @@ -12,3 +26,50 @@ pub fn download_taxi_data() -> PathBuf {
.unwrap();
})
}

pub fn write_taxi_data() -> PathBuf {
idempotent("taxi.spiral", |write| {
let taxi_pq = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq).unwrap();
let _mask = ProjectionMask::roots(builder.parquet_schema(), (0..14).collect_vec());

// FIXME(ngates): the compressor should handle batch size.
let reader = builder
// .with_limit(100)
// .with_projection(_mask)
.with_batch_size(65_536)
.build()
.unwrap();

let dtype = DType::from_arrow(reader.schema());
println!("SCHEMA {:?}\nDTYPE: {:?}", reader.schema(), dtype);
let ctx = compress_ctx();

let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(|record_batch| {
println!("RBSCHEMA: {:?}", record_batch.schema());
let vortex_array = record_batch.into_array();
let compressed = ctx.compress(&vortex_array, None).unwrap();
println!("COMPRESSED {}", display_tree(&compressed));
compressed
})
.collect_vec();
let chunked = ChunkedArray::new(chunks, dtype.clone());

let mut write_ctx = WriteCtx::new(write);
write_ctx.dtype(&dtype).unwrap();
write_ctx.write(&chunked).unwrap();
})
}

pub fn take_taxi_data(path: &Path, indices: &[u64]) -> ArrayRef {
let chunked = {
let mut file = File::open(path).unwrap();
let dummy_dtype: DType = PType::U8.into();
let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file);
let dtype = read_ctx.dtype().unwrap();
read_ctx.with_schema(&dtype).read().unwrap()
};
take(&chunked, &PrimitiveArray::from(indices.to_vec())).unwrap()
}
7 changes: 7 additions & 0 deletions pyvortex/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::dtype::PyDType;
use crate::error::PyVortexError;
use crate::vortex_arrow;
use std::sync::Arc;
use vortex::compute::take::take;

#[pyclass(name = "Array", module = "vortex", sequence, subclass)]
pub struct PyArray {
Expand Down Expand Up @@ -196,6 +197,12 @@ impl PyArray {
fn dtype(self_: PyRef<Self>) -> PyResult<Py<PyDType>> {
PyDType::wrap(self_.py(), self_.inner.dtype().clone())
}

fn take(&self, indices: PyRef<'_, PyArray>) -> PyResult<Py<PyArray>> {
take(&self.inner, indices.unwrap())
.map_err(PyVortexError::map_err)
.and_then(|arr| PyArray::wrap(indices.py(), arr))
}
}

#[pymethods]
Expand Down
9 changes: 9 additions & 0 deletions pyvortex/test/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def test_varbin_array_round_trip():
assert arr.to_pyarrow().combine_chunks() == a


def test_varbin_array_take():
a = vortex.encode(pa.array(["a", "b", "c", "d"]))
# TODO(ngates): ensure we correctly round-trip to a string and not large_string
assert a.take(vortex.encode(pa.array([0, 2]))).to_pyarrow().combine_chunks() == pa.array(
["a", "c"],
type=pa.large_utf8(),
)


def test_empty_array():
a = pa.array([], type=pa.uint8())
primitive = vortex.encode(a)
Expand Down
16 changes: 9 additions & 7 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl EncodingCompression for ALPEncoding {

let (exponents, encoded, patches) = match_each_alp_float_ptype!(
parray.ptype(), |$T| {
encode_to_array(parray.typed_data::<$T>(), like_alp.map(|l| l.exponents()))
encode_to_array::<$T>(parray, like_alp.map(|l| l.exponents()))
})?;

let compressed_encoded = ctx
Expand All @@ -81,18 +81,20 @@ impl EncodingCompression for ALPEncoding {
}

fn encode_to_array<T>(
values: &[T],
values: &PrimitiveArray,
exponents: Option<&Exponents>,
) -> (Exponents, ArrayRef, Option<ArrayRef>)
where
T: ALPFloat + NativePType,
T::ALPInt: NativePType,
{
let (exponents, values, exc_pos, exc) = T::encode(values, exponents);
let len = values.len();
let (exponents, encoded, exc_pos, exc) = T::encode(values.typed_data::<T>(), exponents);
let len = encoded.len();
(
exponents,
PrimitiveArray::from(values).into_array(),
PrimitiveArray::from(encoded)
.into_nullable(values.validity().is_some().into())
.into_array(),
(!exc.is_empty()).then(|| {
SparseArray::new(
PrimitiveArray::from(exc_pos).into_array(),
Expand All @@ -106,8 +108,8 @@ where

pub(crate) fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
let (exponents, encoded, patches) = match parray.ptype() {
PType::F32 => encode_to_array(parray.typed_data::<f32>(), None),
PType::F64 => encode_to_array(parray.typed_data::<f64>(), None),
PType::F32 => encode_to_array::<f32>(parray, None),
PType::F64 => encode_to_array::<f64>(parray, None),
_ => return Err("ALP can only encode f32 and f64".into()),
};
Ok(ALPArray::new(encoded, exponents, patches))
Expand Down
12 changes: 2 additions & 10 deletions vortex-alp/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,15 @@ use crate::ALPEncoding;

impl ArraySerde for ALPArray {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
ctx.write_option_tag(self.patches().is_some())?;
if let Some(p) = self.patches() {
ctx.write(p)?;
}
ctx.write_optional_array(self.patches())?;
ctx.write_fixed_slice([self.exponents().e, self.exponents().f])?;
ctx.write(self.encoded())
}
}

impl EncodingSerde for ALPEncoding {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let patches_tag = ctx.read_nbytes::<1>()?[0];
let patches = if patches_tag == 0x01 {
Some(ctx.read()?)
} else {
None
};
let patches = ctx.read_optional_array()?;
let exponents = ctx.read_nbytes::<2>()?;
let encoded_dtype = match ctx.schema() {
DType::Float(width, nullability) => match width {
Expand Down
9 changes: 4 additions & 5 deletions vortex-array/src/array/bool/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ impl ArrayCompute for BoolArray {
}

impl AsContiguousFn for BoolArray {
fn as_contiguous(&self, arrays: Vec<ArrayRef>) -> VortexResult<ArrayRef> {
// TODO(ngates): implement a HasValidity trait to avoid this duplicate code.
fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult<ArrayRef> {
let validity: Option<Validity> = if self.dtype().is_nullable() {
Some(Validity::from_iter(arrays.iter().map(|a| a.as_bool()).map(
|a| a.validity().unwrap_or_else(|| Validity::valid(a.len())),
)))
Some(Validity::from_iter(arrays.iter().map(|a| {
a.validity().unwrap_or_else(|| Validity::Valid(a.len()))
})))
} else {
None
};
Expand Down
11 changes: 3 additions & 8 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl BoolArray {
pub fn null(n: usize) -> Self {
BoolArray::new(
BooleanBuffer::from(vec![false; n]),
Some(Validity::invalid(n)),
Some(Validity::Invalid(n)),
)
}

Expand Down Expand Up @@ -170,13 +170,8 @@ impl FromIterator<Option<bool>> for BoolArray {
let mut validity: Vec<bool> = Vec::with_capacity(lower);
let values: Vec<bool> = iter
.map(|i| {
if let Some(v) = i {
validity.push(true);
v
} else {
validity.push(false);
false
}
validity.push(i.is_some());
i.unwrap_or_default()
})
.collect::<Vec<_>>();

Expand Down
Loading

0 comments on commit c56faf5

Please sign in to comment.