Skip to content

Commit

Permalink
Add and/or compute functions (#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS authored Jul 22, 2024
1 parent 1f31308 commit 062c817
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 43 deletions.
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ resolver = "2"
version = "0.1.0"
homepage = "https://github.com/fulcrum-so/vortex"
repository = "https://github.com/fulcrum-so/vortex"
authors = ["Robert Kruszewski <[email protected]>, Nicholas Gates <[email protected]>, Will Manning <[email protected]>"]
authors = [
"Robert Kruszewski <[email protected]>, Nicholas Gates <[email protected]>, Will Manning <[email protected]>",
]
license = "Apache-2.0"
keywords = ["vortex"]
include = [
"benches/*.rs",
"src/**/*.rs",
"Cargo.toml",
]
include = ["benches/*.rs", "src/**/*.rs", "Cargo.toml"]
edition = "2021"
rust-version = "1.76"

Expand All @@ -36,6 +34,7 @@ ahash = "0.8.11"
allocator-api2 = "0.2.16"
arrayref = "0.3.7"
arrow = { version = "52.0.0", features = ["pyarrow"] }
arrow-arith = "52.0.0"
arrow-array = "52.0.0"
arrow-buffer = "52.0.0"
arrow-cast = "52.0.0"
Expand Down Expand Up @@ -96,6 +95,7 @@ pyo3-log = "0.11.0"
rand = "0.8.5"
rayon = "1.10.0"
reqwest = { version = "0.12.0", features = ["blocking"] }
rstest = "0.21"
seq-macro = "0.3.5"
serde = "1.0.197"
serde_json = "1.0.116"
Expand Down
8 changes: 7 additions & 1 deletion vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ arrow-cast = { workspace = true }
arrow-select = { workspace = true }
arrow-schema = { workspace = true }
arrow-ord = { workspace = true }
arrow-arith = { workspace = true }
bytes = { workspace = true }
enum-iterator = { workspace = true }
flatbuffers = { workspace = true }
flexbuffers = { workspace = true }
Expand All @@ -43,7 +45,10 @@ vortex-dtype = { path = "../vortex-dtype", features = ["flatbuffers", "serde"] }
vortex-error = { path = "../vortex-error", features = ["flexbuffers"] }
vortex-expr = { path = "../vortex-expr" }
vortex-flatbuffers = { path = "../vortex-flatbuffers" }
vortex-scalar = { path = "../vortex-scalar", features = ["flatbuffers", "serde"] }
vortex-scalar = { path = "../vortex-scalar", features = [
"flatbuffers",
"serde",
] }
serde = { workspace = true, features = ["derive"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand All @@ -55,6 +60,7 @@ build-vortex = { path = "../build-vortex" }

[dev-dependencies]
criterion = { workspace = true }
rstest = { workspace = true }
tokio = { workspace = true }

[[bench]]
Expand Down
36 changes: 36 additions & 0 deletions vortex-array/src/array/bool/compute/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use arrow_arith::boolean;
use arrow_array::cast::AsArray as _;
use vortex_error::VortexResult;

use crate::array::bool::BoolArray;
use crate::arrow::FromArrowArray;
use crate::compute::{AndFn, OrFn};
use crate::{Array, ArrayData, IntoArray, IntoCanonical};

impl OrFn for BoolArray {
fn or(&self, array: &Array) -> VortexResult<Array> {
let lhs = self.clone().into_canonical()?.into_arrow();
let lhs = lhs.as_boolean();

let rhs = array.clone().into_canonical()?.into_arrow();
let rhs = rhs.as_boolean();

let array = boolean::or(lhs, rhs)?;

Ok(ArrayData::from_arrow(&array, true).into_array())
}
}

impl AndFn for BoolArray {
fn and(&self, array: &Array) -> VortexResult<Array> {
let lhs = self.clone().into_canonical()?.into_arrow();
let lhs = lhs.as_boolean();

let rhs = array.clone().into_canonical()?.into_arrow();
let rhs = rhs.as_boolean();

let array = boolean::and(lhs, rhs)?;

Ok(ArrayData::from_arrow(&array, true).into_array())
}
}
9 changes: 9 additions & 0 deletions vortex-array/src/array/bool/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::array::bool::BoolArray;
use crate::compute::unary::{FillForwardFn, ScalarAtFn};
use crate::compute::{ArrayCompute, CompareFn, SliceFn, TakeFn};

mod boolean;
mod compare;
mod fill;
mod flatten;
Expand Down Expand Up @@ -29,4 +30,12 @@ impl ArrayCompute for BoolArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn and(&self) -> Option<&dyn crate::compute::AndFn> {
Some(self)
}

fn or(&self) -> Option<&dyn crate::compute::OrFn> {
Some(self)
}
}
109 changes: 103 additions & 6 deletions vortex-array/src/array/constant/compute.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::cmp::Ordering;

use vortex_error::VortexResult;
use vortex_dtype::Nullability;
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::Scalar;

use crate::array::constant::ConstantArray;
use crate::compute::unary::ScalarAtFn;
use crate::compute::unary::{scalar_at, ScalarAtFn};
use crate::compute::{
ArrayCompute, SearchResult, SearchSortedFn, SearchSortedSide, SliceFn, TakeFn,
AndFn, ArrayCompute, OrFn, SearchResult, SearchSortedFn, SearchSortedSide, SliceFn, TakeFn,
};
use crate::{Array, IntoArray};
use crate::stats::{ArrayStatistics, Stat};
use crate::{Array, ArrayDType, AsArray, IntoArray};

impl ArrayCompute for ConstantArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Expand All @@ -26,6 +28,14 @@ impl ArrayCompute for ConstantArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn and(&self) -> Option<&dyn AndFn> {
Some(self)
}

fn or(&self) -> Option<&dyn OrFn> {
Some(self)
}
}

impl ScalarAtFn for ConstantArray {
Expand Down Expand Up @@ -59,11 +69,63 @@ impl SearchSortedFn for ConstantArray {
}
}

impl AndFn for ConstantArray {
fn and(&self, array: &Array) -> VortexResult<Array> {
constant_array_bool_impl(
self,
array,
|(l, r)| l & r,
|other, this| other.with_dyn(|other| other.and().map(|other| other.and(this))),
)
}
}

impl OrFn for ConstantArray {
fn or(&self, array: &Array) -> VortexResult<Array> {
constant_array_bool_impl(
self,
array,
|(l, r)| l | r,
|other, this| other.with_dyn(|other| other.or().map(|other| other.or(this))),
)
}
}

fn constant_array_bool_impl(
constant_array: &ConstantArray,
other: &Array,
bool_op: impl Fn((bool, bool)) -> bool,
fallback_fn: impl Fn(&Array, &Array) -> Option<VortexResult<Array>>,
) -> VortexResult<Array> {
// If the right side is constant
if let Some(true) = other.statistics().get_as::<bool>(Stat::IsConstant) {
let lhs = constant_array.scalar().value().as_bool()?;
let rhs = scalar_at(other, 0)?.value().as_bool()?;

let scalar = match lhs.zip(rhs).map(bool_op) {
Some(b) => Scalar::bool(b, Nullability::Nullable),
None => Scalar::null(constant_array.dtype().as_nullable()),
};

Ok(ConstantArray::new(scalar, constant_array.len()).into_array())
} else {
// try and use a the rhs specialized implementation if it exists
match fallback_fn(other, constant_array.as_array_ref()) {
Some(r) => r,
None => vortex_bail!("Operation is not supported"),
}
}
}

#[cfg(test)]
mod test {
use rstest::rstest;

use crate::array::bool::BoolArray;
use crate::array::constant::ConstantArray;
use crate::compute::{search_sorted, SearchResult, SearchSortedSide};
use crate::IntoArray;
use crate::compute::unary::scalar_at;
use crate::compute::{and, or, search_sorted, SearchResult, SearchSortedSide};
use crate::{Array, IntoArray, IntoArrayVariant};

#[test]
pub fn search() {
Expand All @@ -90,4 +152,39 @@ mod test {
SearchResult::Found(5000)
);
}

#[rstest]
#[case(ConstantArray::new(true, 4).into_array(), BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array())]
#[case(BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array(), ConstantArray::new(true, 4).into_array())]
fn test_or(#[case] lhs: Array, #[case] rhs: Array) {
let r = or(&lhs, &rhs).unwrap().into_bool().unwrap().into_array();

let v0 = scalar_at(&r, 0).unwrap().value().as_bool().unwrap();
let v1 = scalar_at(&r, 1).unwrap().value().as_bool().unwrap();
let v2 = scalar_at(&r, 2).unwrap().value().as_bool().unwrap();
let v3 = scalar_at(&r, 3).unwrap().value().as_bool().unwrap();

assert!(v0.unwrap());
assert!(v1.unwrap());
assert!(v2.unwrap());
assert!(v3.unwrap());
}

#[rstest]
#[case(ConstantArray::new(true, 4).into_array(), BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array())]
#[case(BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array(),
ConstantArray::new(true, 4).into_array())]
fn test_and(#[case] lhs: Array, #[case] rhs: Array) {
let r = and(&lhs, &rhs).unwrap().into_bool().unwrap().into_array();

let v0 = scalar_at(&r, 0).unwrap().value().as_bool().unwrap();
let v1 = scalar_at(&r, 1).unwrap().value().as_bool().unwrap();
let v2 = scalar_at(&r, 2).unwrap().value().as_bool().unwrap();
let v3 = scalar_at(&r, 3).unwrap().value().as_bool().unwrap();

assert!(v0.unwrap());
assert!(!v1.unwrap());
assert!(v2.unwrap());
assert!(!v3.unwrap());
}
}
8 changes: 8 additions & 0 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow_buffer::{ArrowNativeType, Buffer as ArrowBuffer, MutableBuffer};
use bytes::Bytes;
use itertools::Itertools;
use num_traits::AsPrimitive;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -68,6 +69,13 @@ impl PrimitiveArray {
Self::from_vec(elems, validity)
}

/// Creates a new array of type U8
pub fn from_bytes(bytes: Bytes, validity: Validity) -> Self {
let buffer = Buffer::Bytes(bytes);

PrimitiveArray::new(buffer, PType::U8, validity)
}

pub fn validity(&self) -> Validity {
self.metadata()
.validity
Expand Down
Loading

0 comments on commit 062c817

Please sign in to comment.