Skip to content

Commit

Permalink
Array2 compute (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Apr 12, 2024
1 parent e7bd331 commit b8bd7e6
Show file tree
Hide file tree
Showing 65 changed files with 2,579 additions and 882 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ bindgen = "0.69.4"
criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "1.0.1"
divan = "0.1.14"
enum_dispatch = "0.3.13"
enum-iterator = "2.0.0"
flatbuffers = "23.5.26"
flexbuffers = "2.0.0"
flatc = "0.2.2"
fs_extra = "1.3.0"
half = { version = "^2", features = ["std", "num-traits"] }
hashbrown = "0.14.3"
humansize = "2.1.3"
Expand All @@ -78,8 +81,6 @@ bzip2 = "0.4.4"
csv = "1.3.0"
arrow-csv = "51.0.0"
lazy_static = "1.4.0"
enum-iterator = "2.0.0"
fs_extra = "1.3.0"

[workspace.lints.rust]
warnings = "deny"
Expand Down
7 changes: 4 additions & 3 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ workspace = true
[dependencies]
arrow-array = { workspace = true }
arrow-select = { workspace = true }
enum-iterator = { workspace = true }
fs_extra = { workspace = true }
vortex-alp = { path = "../vortex-alp" }
vortex-array = { path = "../vortex-array" }
vortex-array2 = { path = "../vortex-array2" }
vortex-datetime = { path = "../vortex-datetime" }
vortex-dict = { path = "../vortex-dict" }
vortex-error = { path = "../vortex-error", features = ["parquet"] }
Expand All @@ -32,6 +35,7 @@ itertools = { workspace = true }
lance = { version = "0.10.5", features = [] }
lance-arrow-array = { package = "arrow-array", version = "50.0" }
lance-parquet = { package = "parquet", version = "50.0", features = [] }
lazy_static = { workspace = true }
log = { workspace = true }
parquet = { workspace = true, features = [] }
reqwest = { workspace = true }
Expand All @@ -41,9 +45,6 @@ bzip2 = { workspace = true }
csv = { workspace = true }
arrow-csv = { workspace = true }
arrow = { workspace = true }
lazy_static = { workspace = true }
enum-iterator = { workspace = true }
fs_extra = { workspace = true }
humansize = { workspace = true }

[dev-dependencies]
Expand Down
7 changes: 1 addition & 6 deletions vortex-array/flatbuffers/array.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ enum Version: uint8 {
V0 = 0,
}

// TODO(ngates): figure out if flatbuffers supports optional elements in a vector.
table ArrayChild {
child: Array;
}

table Array {
version: Version = V0;
encoding: uint16;
metadata: [ubyte];
children: [ArrayChild];
children: [Array];
nbuffers: uint16;
}

Expand Down
14 changes: 3 additions & 11 deletions vortex-array/src/serde/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,7 @@ impl<'a> ArrayView<'a> {
.children()?
.iter()
.take(idx)
.map(|child| {
child
.child()
.map(|c| Self::cumulative_nbuffers(c))
.unwrap_or_default()
})
.map(|child| Self::cumulative_nbuffers(child))
.sum();
let buffer_count = Self::cumulative_nbuffers(child);

Expand All @@ -124,7 +119,7 @@ impl<'a> ArrayView<'a> {
fn array_child(&self, idx: usize) -> Option<fb::Array<'a>> {
let children = self.array.children()?;
if idx < children.len() {
children.get(idx).child()
Some(children.get(idx))
} else {
None
}
Expand All @@ -139,10 +134,7 @@ impl<'a> ArrayView<'a> {
fn cumulative_nbuffers(array: fb::Array) -> usize {
let mut nbuffers = array.nbuffers() as usize;
for child in array.children().unwrap_or_default() {
nbuffers += child
.child()
.map(|c| Self::cumulative_nbuffers(c))
.unwrap_or_default();
nbuffers += Self::cumulative_nbuffers(child)
}
nbuffers
}
Expand Down
5 changes: 5 additions & 0 deletions vortex-array2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ edition = { workspace = true }
rust-version = { workspace = true }

[dependencies]
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
flatbuffers = { workspace = true }
flexbuffers = { workspace = true }
half = { workspace = true }
humansize = { workspace = true }
itertools = { workspace = true }
linkme = { workspace = true }
log = { workspace = true }
num-traits = { workspace = true }
paste = { workspace = true }
serde = { workspace = true, features = ["derive"] }
vortex-array = { path = "../vortex-array", features = ["serde"] }
Expand Down
26 changes: 0 additions & 26 deletions vortex-array2/src/array/bool/compute.rs

This file was deleted.

17 changes: 17 additions & 0 deletions vortex-array2/src/array/bool/compute/as_arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::sync::Arc;

use arrow_array::{ArrayRef as ArrowArrayRef, BooleanArray as ArrowBoolArray};
use vortex_error::VortexResult;

use crate::array::bool::BoolArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::validity::ArrayValidity;

impl AsArrowArray for BoolArray<'_> {
fn as_arrow(&self) -> VortexResult<ArrowArrayRef> {
Ok(Arc::new(ArrowBoolArray::new(
self.boolean_buffer().clone(),
self.logical_validity().to_null_buffer()?,
)))
}
}
27 changes: 27 additions & 0 deletions vortex-array2/src/array/bool/compute/as_contiguous.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use arrow_buffer::BooleanBuffer;
use vortex_error::VortexResult;

use crate::array::bool::BoolArray;
use crate::compute::as_contiguous::AsContiguousFn;
use crate::validity::Validity;
use crate::{Array, ArrayTrait, IntoArray};

impl AsContiguousFn for BoolArray<'_> {
fn as_contiguous(&self, arrays: &[Array]) -> VortexResult<Array<'static>> {
let validity = if self.dtype().is_nullable() {
Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity())))
} else {
Validity::NonNullable
};

let mut bools = Vec::with_capacity(arrays.iter().map(|a| a.len()).sum());
for buffer in arrays
.iter()
.map(|a| BoolArray::try_from(a.clone()).unwrap().boolean_buffer())
{
bools.extend(buffer.iter())
}

Ok(BoolArray::try_new(BooleanBuffer::from(bools), validity)?.into_array())
}
}
49 changes: 49 additions & 0 deletions vortex-array2/src/array/bool/compute/fill.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use vortex_error::VortexResult;
use vortex_schema::Nullability;

use crate::array::bool::BoolArray;
use crate::compute::fill::FillForwardFn;
use crate::validity::ArrayValidity;
use crate::{Array, ArrayTrait, IntoArray, ToArrayData};

impl FillForwardFn for BoolArray<'_> {
fn fill_forward(&self) -> VortexResult<Array<'static>> {
if self.dtype().nullability() == Nullability::NonNullable {
return Ok(self.to_array_data().into_array());
}

let validity = self.logical_validity().to_null_buffer()?.unwrap();
let bools = self.boolean_buffer();
let mut last_value = false;
let filled = bools
.iter()
.zip(validity.inner().iter())
.map(|(v, valid)| {
if valid {
last_value = v;
}
last_value
})
.collect::<Vec<_>>();
Ok(BoolArray::from(filled).into_array())
}
}

#[cfg(test)]
mod test {
use crate::array::bool::BoolArray;
use crate::validity::Validity;
use crate::{compute, IntoArray};

#[test]
fn fill_forward() {
let barr =
BoolArray::from_iter(vec![None, Some(false), None, Some(true), None]).into_array();
let filled_bool = BoolArray::try_from(compute::fill::fill_forward(&barr).unwrap()).unwrap();
assert_eq!(
filled_bool.boolean_buffer().iter().collect::<Vec<bool>>(),
vec![false, false, false, true, true]
);
assert_eq!(filled_bool.validity(), Validity::NonNullable);
}
}
1 change: 1 addition & 0 deletions vortex-array2/src/array/bool/compute/flatten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

36 changes: 36 additions & 0 deletions vortex-array2/src/array/bool/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::array::bool::BoolArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::as_contiguous::AsContiguousFn;
use crate::compute::fill::FillForwardFn;
use crate::compute::scalar_at::ScalarAtFn;
use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;

mod as_arrow;
mod as_contiguous;
mod fill;
mod flatten;
mod scalar_at;
mod take;

impl ArrayCompute for BoolArray<'_> {
fn as_arrow(&self) -> Option<&dyn AsArrowArray> {
Some(self)
}

fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
Some(self)
}

fn fill_forward(&self) -> Option<&dyn FillForwardFn> {
Some(self)
}

fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}

fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}
}
19 changes: 19 additions & 0 deletions vortex-array2/src/array/bool/compute/scalar_at.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use vortex::scalar::{BoolScalar, Scalar};
use vortex_error::VortexResult;

use crate::array::bool::BoolArray;
use crate::compute::scalar_at::ScalarAtFn;
use crate::validity::ArrayValidity;
use crate::ArrayTrait;

impl ScalarAtFn for BoolArray<'_> {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
Ok(BoolScalar::try_new(
self.is_valid(index)
.then(|| self.boolean_buffer().value(index)),
self.dtype().nullability(),
)
.unwrap()
.into())
}
}
Loading

0 comments on commit b8bd7e6

Please sign in to comment.