Skip to content

Commit

Permalink
Buffer alignment
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Dec 18, 2024
2 parents 124441a + d6a2475 commit e77ff30
Show file tree
Hide file tree
Showing 31 changed files with 389 additions and 127 deletions.
9 changes: 9 additions & 0 deletions .github/coverage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ignore-non-existing: true

excl-line: "unreachable!"
ignore:
- "bench-vortex/*"
- "fuzz/*"
- "home/*"
- "/*"
- "../*"
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,15 @@ jobs:
RUSTDOCFLAGS: '-Zprofile'
- uses: rraval/actions-rs-grcov@e96292badb0d33512d16654efb0ee3032a9a3cff
id: grcov
with:
config: ".github/coverage.yml"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Coveralls
uses: coverallsapp/github-action@v2
with:
file: "${{ steps.grcov.outputs.report }}"


license-check-and-audit-check:
name: License Check and Audit Check
Expand Down
3 changes: 1 addition & 2 deletions bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use std::io::Write;
use std::path::PathBuf;

use futures::executor::block_on;
use vortex::buffer::io_buf::IoBuf;
use vortex::error::VortexError;
use vortex::io::VortexWrite;
use vortex::io::{IoBuf, VortexWrite};

use crate::data_downloads::{data_vortex_uncompressed, download_data};
use crate::reader::rewrite_parquet_as_vortex;
Expand Down
66 changes: 35 additions & 31 deletions encodings/fsst/src/compute/compare.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use fsst::Symbol;
use vortex_array::array::ConstantArray;
use vortex_array::compute::{compare, CompareFn, Operator};
use vortex_array::{ArrayDType, ArrayData, ArrayLen, IntoArrayVariant, ToArrayData};
use vortex_array::{ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoArrayVariant};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_dtype::{DType, Nullability};
use vortex_error::{VortexExpect, VortexResult};
use vortex_scalar::Scalar;

use crate::{FSSTArray, FSSTEncoding};

Expand All @@ -16,10 +17,16 @@ impl CompareFn<FSSTArray> for FSSTEncoding {
operator: Operator,
) -> VortexResult<Option<ArrayData>> {
match (rhs.as_constant(), operator) {
// TODO(ngates): implement short-circuit comparisons for other operators.
(Some(constant_array), Operator::Eq | Operator::NotEq) => compare_fsst_constant(
(Some(constant), _) if constant.is_null() => {
// All comparisons to null must return null
Ok(Some(
ConstantArray::new(Scalar::null(DType::Bool(Nullability::Nullable)), lhs.len())
.into_array(),
))
}
(Some(constant), Operator::Eq | Operator::NotEq) => compare_fsst_constant(
lhs,
&ConstantArray::new(constant_array, lhs.len()),
&ConstantArray::new(constant, lhs.len()),
operator == Operator::Eq,
)
.map(Some),
Expand Down Expand Up @@ -49,34 +56,31 @@ fn compare_fsst_constant(
let compressor = compressor.build();

let encoded_scalar = match left.dtype() {
DType::Utf8(_) => right
.scalar()
.as_utf8()
.value()
.map(|scalar| Buffer::from(compressor.compress(scalar.as_bytes()))),
DType::Binary(_) => right
.scalar()
.as_binary()
.value()
.map(|scalar| Buffer::from(compressor.compress(scalar.as_slice()))),
DType::Utf8(_) => {
let value = right
.scalar()
.as_utf8()
.value()
.vortex_expect("Expected non-null scalar");
Buffer::from(compressor.compress(value.as_bytes()))
}
DType::Binary(_) => {
let value = right
.scalar()
.as_binary()
.value()
.vortex_expect("Expected non-null scalar");
Buffer::from(compressor.compress(value.as_slice()))
}
_ => unreachable!("FSSTArray can only have string or binary data type"),
};

match encoded_scalar {
None => {
// Eq and NotEq on null values yield nulls, per the Arrow behavior.
Ok(right.to_array())
}
Some(encoded_scalar) => {
let rhs = ConstantArray::new(encoded_scalar, left.len());

compare(
left.codes(),
rhs,
if equal { Operator::Eq } else { Operator::NotEq },
)
}
}
let rhs = ConstantArray::new(encoded_scalar, left.len());
compare(
left.codes(),
rhs,
if equal { Operator::Eq } else { Operator::NotEq },
)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion pyvortex/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ pub fn scalar_helper(dtype: DType, value: &Bound<'_, PyAny>) -> PyResult<Scalar>
.iter()
.map(|element| scalar_helper(element_type.as_ref().clone(), element))
.collect::<PyResult<Vec<_>>>()?;
Ok(Scalar::list(element_type, values))
Ok(Scalar::list(element_type, values, Nullability::Nullable))
}
DType::Extension(..) => todo!(),
}
Expand Down
10 changes: 7 additions & 3 deletions vortex-array/src/array/chunked/compute/compare.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use vortex_dtype::{DType, Nullability};
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::array::{ChunkedArray, ChunkedEncoding};
use crate::compute::{compare, slice, CompareFn, Operator};
use crate::{ArrayData, IntoArrayData};
use crate::{ArrayDType, ArrayData, IntoArrayData};

impl CompareFn<ChunkedArray> for ChunkedEncoding {
fn compare(
Expand All @@ -24,7 +24,11 @@ impl CompareFn<ChunkedArray> for ChunkedEncoding {
}

Ok(Some(
ChunkedArray::try_new(compare_chunks, DType::Bool(Nullability::Nullable))?.into_array(),
ChunkedArray::try_new(
compare_chunks,
DType::Bool((lhs.dtype().is_nullable() || rhs.dtype().is_nullable()).into()),
)?
.into_array(),
))
}
}
6 changes: 5 additions & 1 deletion vortex-array/src/array/list/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ impl ScalarAtFn<ListArray> for ListEncoding {
let elem = array.elements_at(index)?;
let scalars: Vec<Scalar> = (0..elem.len()).map(|i| scalar_at(&elem, i)).try_collect()?;

Ok(Scalar::list(Arc::new(elem.dtype().clone()), scalars))
Ok(Scalar::list(
Arc::new(elem.dtype().clone()),
scalars,
array.dtype().nullability(),
))
}
}

Expand Down
20 changes: 16 additions & 4 deletions vortex-array/src/array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl ValidityVTable<ListArray> for ListEncoding {
mod test {
use std::sync::Arc;

use vortex_dtype::PType;
use vortex_dtype::{Nullability, PType};
use vortex_scalar::Scalar;

use crate::array::list::ListArray;
Expand Down Expand Up @@ -228,15 +228,27 @@ mod test {
ListArray::try_new(elements.into_array(), offsets.into_array(), validity).unwrap();

assert_eq!(
Scalar::list(Arc::new(PType::I32.into()), vec![1.into(), 2.into()]),
Scalar::list(
Arc::new(PType::I32.into()),
vec![1.into(), 2.into()],
Nullability::Nullable
),
scalar_at(&list, 0).unwrap()
);
assert_eq!(
Scalar::list(Arc::new(PType::I32.into()), vec![3.into(), 4.into()]),
Scalar::list(
Arc::new(PType::I32.into()),
vec![3.into(), 4.into()],
Nullability::Nullable
),
scalar_at(&list, 1).unwrap()
);
assert_eq!(
Scalar::list(Arc::new(PType::I32.into()), vec![5.into()]),
Scalar::list(
Arc::new(PType::I32.into()),
vec![5.into()],
Nullability::Nullable
),
scalar_at(&list, 2).unwrap()
);
}
Expand Down
6 changes: 4 additions & 2 deletions vortex-array/src/array/varbin/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vortex_error::{vortex_bail, VortexResult};
use crate::array::{VarBinArray, VarBinEncoding};
use crate::arrow::{Datum, FromArrowArray};
use crate::compute::{CompareFn, Operator};
use crate::{ArrayData, IntoArrayData};
use crate::{ArrayDType, ArrayData, IntoArrayData};

// This implementation exists so we can have custom translation of RHS to arrow that's not the same as IntoCanonical
impl CompareFn<VarBinArray> for VarBinEncoding {
Expand All @@ -17,6 +17,8 @@ impl CompareFn<VarBinArray> for VarBinEncoding {
operator: Operator,
) -> VortexResult<Option<ArrayData>> {
if let Some(rhs_const) = rhs.as_constant() {
let nullable = lhs.dtype().is_nullable() || rhs_const.dtype().is_nullable();

let lhs = Datum::try_from(lhs.clone().into_array())?;

// TODO(robert): Handle LargeString/Binary arrays
Expand Down Expand Up @@ -46,7 +48,7 @@ impl CompareFn<VarBinArray> for VarBinEncoding {
Operator::Lte => cmp::lt_eq(&lhs, arrow_rhs)?,
};

Ok(Some(ArrayData::from_arrow(&array, true)))
Ok(Some(ArrayData::from_arrow(&array, nullable)))
} else {
Ok(None)
}
Expand Down
16 changes: 13 additions & 3 deletions vortex-array/src/builders/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,27 @@ mod tests {

builder
.append_value(
Scalar::list(dtype.clone(), vec![1i32.into(), 2i32.into(), 3i32.into()]).as_list(),
Scalar::list(
dtype.clone(),
vec![1i32.into(), 2i32.into(), 3i32.into()],
Nullability::NonNullable,
)
.as_list(),
)
.unwrap();

builder
.append_value(Scalar::empty(dtype.clone()).as_list())
.append_value(Scalar::list_empty(dtype.clone(), Nullability::NonNullable).as_list())
.unwrap();

builder
.append_value(
Scalar::list(dtype, vec![4i32.into(), 5i32.into(), 6i32.into()]).as_list(),
Scalar::list(
dtype,
vec![4i32.into(), 5i32.into(), 6i32.into()],
Nullability::NonNullable,
)
.as_list(),
)
.unwrap();

Expand Down
32 changes: 31 additions & 1 deletion vortex-array/src/compute/binary_numeric.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use arrow_array::ArrayRef;
use vortex_dtype::DType;
use vortex_dtype::{DType, PType};
use vortex_error::{vortex_bail, VortexError, VortexResult};
use vortex_scalar::{BinaryNumericOperator, Scalar};

Expand Down Expand Up @@ -117,13 +117,43 @@ pub fn binary_numeric(
// Check if LHS supports the operation directly.
if let Some(fun) = lhs.encoding().binary_numeric_fn() {
if let Some(result) = fun.binary_numeric(lhs, rhs, op)? {
debug_assert_eq!(
result.len(),
lhs.len(),
"Numeric operation length mismatch {}",
lhs.encoding().id()
);
debug_assert_eq!(
result.dtype(),
&DType::Primitive(
PType::try_from(lhs.dtype())?,
(lhs.dtype().is_nullable() || rhs.dtype().is_nullable()).into()
),
"Numeric operation dtype mismatch {}",
lhs.encoding().id()
);
return Ok(result);
}
}

// Check if RHS supports the operation directly.
if let Some(fun) = rhs.encoding().binary_numeric_fn() {
if let Some(result) = fun.binary_numeric(rhs, lhs, op)? {
debug_assert_eq!(
result.len(),
lhs.len(),
"Numeric operation length mismatch {}",
rhs.encoding().id()
);
debug_assert_eq!(
result.dtype(),
&DType::Primitive(
PType::try_from(lhs.dtype())?,
(lhs.dtype().is_nullable() || rhs.dtype().is_nullable()).into()
),
"Numeric operation dtype mismatch {}",
rhs.encoding().id()
);
return Ok(result);
}
}
Expand Down
31 changes: 29 additions & 2 deletions vortex-array/src/compute/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::ArrayRef;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};

use crate::arrow::FromArrowArray;
Expand Down Expand Up @@ -106,16 +107,42 @@ pub fn binary_boolean(
.encoding()
.binary_boolean_fn()
.and_then(|f| f.binary_boolean(lhs, rhs, op).transpose())
.transpose()?
{
return result;
debug_assert_eq!(
result.len(),
lhs.len(),
"Boolean operation length mismatch {}",
lhs.encoding().id()
);
debug_assert_eq!(
result.dtype(),
&DType::Bool((lhs.dtype().is_nullable() || rhs.dtype().is_nullable()).into()),
"Boolean operation dtype mismatch {}",
lhs.encoding().id()
);
return Ok(result);
}

if let Some(result) = rhs
.encoding()
.binary_boolean_fn()
.and_then(|f| f.binary_boolean(rhs, lhs, op).transpose())
.transpose()?
{
return result;
debug_assert_eq!(
result.len(),
lhs.len(),
"Boolean operation length mismatch {}",
rhs.encoding().id()
);
debug_assert_eq!(
result.dtype(),
&DType::Bool((lhs.dtype().is_nullable() || rhs.dtype().is_nullable()).into()),
"Boolean operation dtype mismatch {}",
rhs.encoding().id()
);
return Ok(result);
}

log::debug!(
Expand Down
Loading

0 comments on commit e77ff30

Please sign in to comment.