Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and/or compute functions #481

Merged
merged 21 commits into from
Jul 22, 2024
Merged
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -20,14 +20,12 @@ resolver = "2"
version = "0.1.0"
homepage = "https://github.com/fulcrum-so/vortex"
repository = "https://github.com/fulcrum-so/vortex"
authors = ["Robert Kruszewski <[email protected]>, Nicholas Gates <[email protected]>, Will Manning <[email protected]>"]
authors = [
"Robert Kruszewski <[email protected]>, Nicholas Gates <[email protected]>, Will Manning <[email protected]>",
]
license = "Apache-2.0"
keywords = ["vortex"]
include = [
"benches/*.rs",
"src/**/*.rs",
"Cargo.toml",
]
include = ["benches/*.rs", "src/**/*.rs", "Cargo.toml"]
edition = "2021"
rust-version = "1.76"

@@ -36,6 +34,7 @@ ahash = "0.8.11"
allocator-api2 = "0.2.16"
arrayref = "0.3.7"
arrow = { version = "52.0.0", features = ["pyarrow"] }
arrow-arith = "52.0.0"
arrow-array = "52.0.0"
arrow-buffer = "52.0.0"
arrow-cast = "52.0.0"
8 changes: 7 additions & 1 deletion vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@ arrow-cast = { workspace = true }
arrow-select = { workspace = true }
arrow-schema = { workspace = true }
arrow-ord = { workspace = true }
arrow-arith = { workspace = true }
bytes = { workspace = true }
enum-iterator = { workspace = true }
flatbuffers = { workspace = true }
flexbuffers = { workspace = true }
@@ -43,7 +45,10 @@ vortex-dtype = { path = "../vortex-dtype", features = ["flatbuffers", "serde"] }
vortex-error = { path = "../vortex-error", features = ["flexbuffers"] }
vortex-expr = { path = "../vortex-expr" }
vortex-flatbuffers = { path = "../vortex-flatbuffers" }
vortex-scalar = { path = "../vortex-scalar", features = ["flatbuffers", "serde"] }
vortex-scalar = { path = "../vortex-scalar", features = [
"flatbuffers",
"serde",
] }
serde = { workspace = true, features = ["derive"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
@@ -55,6 +60,7 @@ build-vortex = { path = "../build-vortex" }

[dev-dependencies]
criterion = { workspace = true }
rstest = "0.21"
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
tokio = { workspace = true }

[[bench]]
36 changes: 36 additions & 0 deletions vortex-array/src/array/bool/compute/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use arrow_arith::boolean;
use arrow_array::cast::AsArray;
use vortex_error::VortexResult;

use crate::array::bool::BoolArray;
use crate::arrow::FromArrowArray;
use crate::compute::{AndFn, OrFn};
use crate::{Array, ArrayData, IntoArray, IntoCanonical};

impl OrFn for BoolArray {
fn or(&self, array: &Array) -> VortexResult<Array> {
let lhs = self.clone().into_canonical()?.into_arrow();
let lhs = lhs.as_boolean();

let rhs = array.clone().into_canonical()?.into_arrow();
let rhs = rhs.as_boolean();
Comment on lines +15 to +16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this maybe dispatch on the right hand side instead of converting to arrow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely possible, but I think we might be missing some larger abstraction here to solve the whole left/right issue


let array = boolean::or(lhs, rhs)?;

Ok(ArrayData::from_arrow(&array, true).into_array())
}
}

impl AndFn for BoolArray {
fn and(&self, array: &crate::Array) -> vortex_error::VortexResult<crate::Array> {
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
let lhs = self.clone().into_canonical()?.into_arrow();
let lhs = lhs.as_boolean();

let rhs = array.clone().into_canonical()?.into_arrow();
let rhs = rhs.as_boolean();

let array = boolean::and(lhs, rhs)?;

Ok(ArrayData::from_arrow(&array, true).into_array())
}
}
9 changes: 9 additions & 0 deletions vortex-array/src/array/bool/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ use crate::array::bool::BoolArray;
use crate::compute::unary::{FillForwardFn, ScalarAtFn};
use crate::compute::{ArrayCompute, CompareFn, SliceFn, TakeFn};

mod boolean;
mod compare;
mod fill;
mod flatten;
@@ -29,4 +30,12 @@ impl ArrayCompute for BoolArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn and(&self) -> Option<&dyn crate::compute::AndFn> {
Some(self)
}

fn or(&self) -> Option<&dyn crate::compute::OrFn> {
Some(self)
}
}
106 changes: 100 additions & 6 deletions vortex-array/src/array/constant/compute.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::cmp::Ordering;

use vortex_error::VortexResult;
use vortex_dtype::Nullability;
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::Scalar;

use crate::array::constant::ConstantArray;
use crate::compute::unary::ScalarAtFn;
use crate::compute::unary::{scalar_at, ScalarAtFn};
use crate::compute::{
ArrayCompute, SearchResult, SearchSortedFn, SearchSortedSide, SliceFn, TakeFn,
and, or, AndFn, ArrayCompute, OrFn, SearchResult, SearchSortedFn, SearchSortedSide, SliceFn,
TakeFn,
};
use crate::{Array, IntoArray};
use crate::stats::ArrayStatistics;
use crate::{Array, ArrayDType, AsArray, IntoArray};

impl ArrayCompute for ConstantArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
@@ -26,6 +29,14 @@ impl ArrayCompute for ConstantArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn and(&self) -> Option<&dyn AndFn> {
Some(self)
}

fn or(&self) -> Option<&dyn OrFn> {
Some(self)
}
}

impl ScalarAtFn for ConstantArray {
@@ -59,11 +70,59 @@ impl SearchSortedFn for ConstantArray {
}
}

impl AndFn for ConstantArray {
fn and(&self, array: &Array) -> VortexResult<Array> {
constant_array_bool_impl(self, array, |(l, r)| l & r, and)
}
}

impl OrFn for ConstantArray {
fn or(&self, array: &Array) -> VortexResult<Array> {
constant_array_bool_impl(self, array, |(l, r)| l | r, or)
}
}

fn constant_array_bool_impl(
constant_array: &ConstantArray,
other: &Array,
bool_op: impl Fn((bool, bool)) -> bool,
fallback_fn: impl Fn(&Array, &Array) -> VortexResult<Array>,
) -> VortexResult<Array> {
if constant_array.len() != other.len() {
vortex_bail!("Boolean operations aren't supported on arrays of different lengths")
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
}

if !constant_array.dtype().is_boolean() || !other.dtype().is_boolean() {
vortex_bail!("Boolean operations are only supported on boolean arrays")
}

// If the right side is constant
if let Some(true) = other.statistics().compute_is_constant() {
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
let lhs = constant_array.scalar().value().as_bool()?;
let rhs = scalar_at(other, 0)?.value().as_bool()?;

let scalar = match lhs.zip(rhs).map(bool_op) {
Some(b) => Scalar::bool(b, Nullability::Nullable),
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
None => Scalar::null(constant_array.dtype().as_nullable()),
};

Ok(ConstantArray::new(scalar, constant_array.len()).into_array())
} else {
// try and use a the rhs specialized implementation if it exists
fallback_fn(other, constant_array.as_array_ref())
}
}

#[cfg(test)]
mod test {
use rstest::rstest;

use crate::Array;
use crate::array::bool::BoolArray;
use crate::array::constant::ConstantArray;
use crate::compute::{search_sorted, SearchResult, SearchSortedSide};
use crate::IntoArray;
use crate::compute::unary::scalar_at;
use crate::compute::{and, or, search_sorted, SearchResult, SearchSortedSide};
use crate::{IntoArray, IntoArrayVariant};

#[test]
pub fn search() {
@@ -90,4 +149,39 @@ mod test {
SearchResult::Found(5000)
);
}

#[rstest]
#[case(ConstantArray::new(true, 4).into_array(), BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array())]
#[case(BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array(), ConstantArray::new(true, 4).into_array())]
fn test_or(#[case] lhs: Array, #[case] rhs: Array) {
let r = or(&lhs, &rhs).unwrap().into_bool().unwrap().into_array();

let v0 = scalar_at(&r, 0).unwrap().value().as_bool().unwrap();
let v1 = scalar_at(&r, 1).unwrap().value().as_bool().unwrap();
let v2 = scalar_at(&r, 2).unwrap().value().as_bool().unwrap();
let v3 = scalar_at(&r, 3).unwrap().value().as_bool().unwrap();

assert!(v0.unwrap());
assert!(v1.unwrap());
assert!(v2.unwrap());
assert!(v3.unwrap());
}

#[rstest]
#[case(ConstantArray::new(true, 4).into_array(), BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array())]
#[case(BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array(),
ConstantArray::new(true, 4).into_array())]
fn test_and(#[case] lhs: Array, #[case] rhs: Array) {
let r = and(&lhs, &rhs).unwrap().into_bool().unwrap().into_array();

let v0 = scalar_at(&r, 0).unwrap().value().as_bool().unwrap();
let v1 = scalar_at(&r, 1).unwrap().value().as_bool().unwrap();
let v2 = scalar_at(&r, 2).unwrap().value().as_bool().unwrap();
let v3 = scalar_at(&r, 3).unwrap().value().as_bool().unwrap();

assert!(v0.unwrap());
assert!(!v1.unwrap());
assert!(v2.unwrap());
assert!(!v3.unwrap());
}
}
13 changes: 6 additions & 7 deletions vortex-array/src/array/varbin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::mem;

use arrow_buffer::NullBufferBuilder;
use bytes::BytesMut;
use vortex_dtype::{DType, NativePType};

use crate::array::primitive::PrimitiveArray;
@@ -10,7 +9,7 @@ use crate::IntoArray;

pub struct VarBinBuilder<O: NativePType> {
offsets: Vec<O>,
data: Vec<u8>,
data: BytesMut,
validity: NullBufferBuilder,
}

@@ -20,7 +19,7 @@ impl<O: NativePType> VarBinBuilder<O> {
offsets.push(O::zero());
Self {
offsets,
data: Vec::new(),
data: BytesMut::new(),
validity: NullBufferBuilder::new(len),
}
}
@@ -47,9 +46,9 @@ impl<O: NativePType> VarBinBuilder<O> {
self.validity.append_null();
}

pub fn finish(&mut self, dtype: DType) -> VarBinArray {
let offsets = PrimitiveArray::from(mem::take(&mut self.offsets));
let data = PrimitiveArray::from(mem::take(&mut self.data));
pub fn finish(mut self, dtype: DType) -> VarBinArray {
let offsets = PrimitiveArray::from(self.offsets);
let data = PrimitiveArray::from(Vec::from(self.data.freeze()));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't zero copy is it? We can add a function to PrimitiveArray to construct directly from a buffer (for PType==u8?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, @robert3005 did we discuss VarBin data being a buffer instead of a child array?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we thought about it but then we wanted to have something like ZstdEncoding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m starting to think general purpose compression can be configured on buffers at write-time though; using the layouts mechanism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not zero-copy, but still pretty cheap IMO. Constructing things from Bytes is hard because there's no guarantee the instance is exclusive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But PrimitiveArray wraps a vortex-buffer, which itself wraps Bytes. So this copy is purely because the right API isn't exposed / isn't used

Copy link
Contributor Author

@AdamGS AdamGS Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, fixing. Added a from_bytes function.

let nulls = self.validity.finish();

6 changes: 2 additions & 4 deletions vortex-array/src/canonical.rs
Original file line number Diff line number Diff line change
@@ -222,10 +222,8 @@ fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef {
PType::I32 | PType::I64 => offsets,
// Unless it's u64, everything else can be converted into an i32.
// FIXME(ngates): do not copy offsets again
PType::U64 => try_cast(&offsets.to_array(), PType::I64.into())
.expect("cast to i64")
.into_primitive()
.expect("flatten_primitive"),
PType::U64 => offsets.reinterpret_cast(PType::I64),
PType::U32 => offsets.reinterpret_cast(PType::I32),
_ => try_cast(&offsets.to_array(), PType::I32.into())
.expect("cast to i32")
.into_primitive()
Loading
Loading