Skip to content

Commit

Permalink
Merge branch 'develop' into jc/ipc-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jdcasale committed May 8, 2024
2 parents ab9b872 + 6fb0e8a commit fe1d105
Show file tree
Hide file tree
Showing 83 changed files with 2,050 additions and 1,536 deletions.
225 changes: 123 additions & 102 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"bench-vortex",
"build-vortex",
"fastlanez",
"fastlanez-sys",
"pyvortex",
Expand Down Expand Up @@ -74,12 +75,17 @@ num-traits = "0.2.18"
num_enum = "0.7.2"
parquet = "51.0.0"
paste = "1.0.14"
prost = "0.12.4"
prost-build = "0.12.4"
prost-types = "0.12.4"
pyo3 = { version = "0.20.2", features = ["extension-module", "abi3-py311"] }
pyo3-log = "0.9.0"
rand = "0.8.5"
reqwest = { version = "0.12.0", features = ["blocking"] }
seq-macro = "0.3.5"
serde = "1.0.197"
serde_json = "1.0.116"
serde_test = "1.0.176"
simplelog = { version = "0.12.2", features = ["paris"] }
thiserror = "1.0.58"
tokio = "1.37.0"
Expand Down
19 changes: 19 additions & 0 deletions build-vortex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "build-vortex"
version.workspace = true
homepage.workspace = true
repository.workspace = true
authors.workspace = true
license.workspace = true
keywords.workspace = true
include.workspace = true
edition.workspace = true
rust-version.workspace = true

[dependencies]
flatc = { workspace = true }
prost-build = { workspace = true }
walkdir = { workspace = true }

[lints]
workspace = true
12 changes: 12 additions & 0 deletions build-vortex/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Build Vortex

A crate containing configuration logic for Vortex build.rs files.

## Usage

## Features

Depending on the enabled features, this script supports:

* FlatBuffers
* Protocol Buffers
96 changes: 96 additions & 0 deletions build-vortex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::env;
use std::ffi::OsStr;
use std::fs::create_dir_all;
use std::path::{Path, PathBuf};
use std::process::Command;

use flatc::flatc;
use walkdir::WalkDir;

fn manifest_dir() -> PathBuf {
PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap())
.canonicalize()
.expect("Failed to canonicalize CARGO_MANIFEST_DIR")
}

fn out_dir() -> PathBuf {
PathBuf::from(env::var("OUT_DIR").unwrap())
.canonicalize()
.expect("Failed to canonicalize OUT_DIR")
}

pub fn build() {
// FlatBuffers
if env::var("CARGO_FEATURE_FLATBUFFERS").ok().is_some() {
build_flatbuffers();
}

// Proto (prost)
if env::var("CARGO_FEATURE_PROTO").ok().is_some() {
build_proto();
}
}

pub fn build_proto() {
let proto_dir = manifest_dir().join("proto");
let proto_files = walk_files(&proto_dir, "proto");
let proto_out = out_dir().join("proto");

create_dir_all(&proto_out).expect("Failed to create proto output directory");

prost_build::Config::new()
.out_dir(&proto_out)
.compile_protos(&proto_files, &[&proto_dir, &proto_dir.join("../../")])
.expect("Failed to compile protos");
}

pub fn build_flatbuffers() {
let flatbuffers_dir = manifest_dir().join("flatbuffers");
let fbs_files = walk_files(&flatbuffers_dir, "fbs");
check_call(
Command::new(flatc())
.arg("--rust")
.arg("--filename-suffix")
.arg("")
.arg("-I")
.arg(flatbuffers_dir.join("../../"))
.arg("--include-prefix")
.arg("flatbuffers::deps")
.arg("-o")
.arg(out_dir().join("flatbuffers"))
.args(fbs_files),
)
}

/// Recursively walk for files with the given extension, adding them to rerun-if-changed.
fn walk_files(dir: &Path, ext: &str) -> Vec<PathBuf> {
WalkDir::new(dir)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension() == Some(OsStr::new(ext)))
.map(|e| {
rerun_if_changed(e.path());
e.path().to_path_buf()
})
.collect()
}

fn rerun_if_changed(path: &Path) {
println!(
"cargo:rerun-if-changed={}",
path.canonicalize()
.unwrap_or_else(|_| panic!("failed to canonicalize {}", path.to_str().unwrap()))
.to_str()
.unwrap()
);
}

fn check_call(command: &mut Command) {
let name = command.get_program().to_str().unwrap().to_string();
let Ok(status) = command.status() else {
panic!("Failed to launch {}", &name)
};
if !status.success() {
panic!("{} failed with status {}", &name, status.code().unwrap());
}
}
2 changes: 1 addition & 1 deletion vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
PrimitiveArray::from(exc_pos).into_array(),
PrimitiveArray::from_vec(exc, Validity::AllValid).into_array(),
len,
Scalar::null(&values.dtype().as_nullable()),
Scalar::null(values.dtype().as_nullable()),
)
.into_array()
}),
Expand Down
2 changes: 1 addition & 1 deletion vortex-alp/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl ScalarAtFn for ALPArray<'_> {
use crate::ALPFloat;
let encoded_val = scalar_at(&self.encoded(), index)?;
match_each_alp_float_ptype!(self.dtype().try_into().unwrap(), |$T| {
let encoded_val: <$T as ALPFloat>::ALPInt = encoded_val.try_into().unwrap();
let encoded_val: <$T as ALPFloat>::ALPInt = encoded_val.as_ref().try_into().unwrap();
Scalar::from(<$T as ALPFloat>::decode_single(
encoded_val,
self.exponents(),
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ num_enum = { workspace = true }
paste = { workspace = true }
rand = { workspace = true }
vortex-buffer = { path = "../vortex-buffer" }
vortex-dtype = { path = "../vortex-dtype", features = ["serde"] }
vortex-dtype = { path = "../vortex-dtype", features = ["flatbuffers", "serde"] }
vortex-error = { path = "../vortex-error", features = ["flexbuffers"] }
vortex-flatbuffers = { path = "../vortex-flatbuffers" }
vortex-scalar = { path = "../vortex-scalar", features = ["serde"] }
vortex-scalar = { path = "../vortex-scalar", features = ["flatbuffers", "serde"] }
serde = { workspace = true, features = ["derive"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
14 changes: 6 additions & 8 deletions vortex-array/src/array/bool/compute/scalar_at.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use vortex_error::VortexResult;
use vortex_scalar::{BoolScalar, Scalar};
use vortex_scalar::Scalar;

use crate::array::bool::BoolArray;
use crate::compute::scalar_at::ScalarAtFn;
Expand All @@ -8,12 +8,10 @@ use crate::ArrayDType;

impl ScalarAtFn for BoolArray<'_> {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
Ok(BoolScalar::try_new(
self.is_valid(index)
.then(|| self.boolean_buffer().value(index)),
self.dtype().nullability(),
)
.unwrap()
.into())
if self.is_valid(index) {
Ok(self.boolean_buffer().value(index).into())
} else {
return Ok(Scalar::null(self.dtype().clone()));
}
}
}
2 changes: 1 addition & 1 deletion vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ mod tests {
#[test]
fn bool_array() {
let arr = BoolArray::from(vec![true, false, true]).into_array();
let scalar: bool = scalar_at(&arr, 0).unwrap().try_into().unwrap();
let scalar = bool::try_from(&scalar_at(&arr, 0).unwrap()).unwrap();
assert!(scalar);
}
}
6 changes: 3 additions & 3 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ impl ChunkedArray<'_> {
.unwrap()
.to_index();
let mut chunk_start =
usize::try_from(scalar_at(&self.chunk_ends(), index_chunk).unwrap()).unwrap();
usize::try_from(&scalar_at(&self.chunk_ends(), index_chunk).unwrap()).unwrap();

if chunk_start != index {
index_chunk -= 1;
chunk_start =
usize::try_from(scalar_at(&self.chunk_ends(), index_chunk).unwrap()).unwrap();
usize::try_from(&scalar_at(&self.chunk_ends(), index_chunk).unwrap()).unwrap();
}

let index_in_chunk = index - chunk_start;
Expand Down Expand Up @@ -125,7 +125,7 @@ impl AcceptArrayVisitor for ChunkedArray<'_> {

impl ArrayTrait for ChunkedArray<'_> {
fn len(&self) -> usize {
usize::try_from(scalar_at(&self.chunk_ends(), self.nchunks()).unwrap()).unwrap()
usize::try_from(&scalar_at(&self.chunk_ends(), self.nchunks()).unwrap()).unwrap()
}
}

Expand Down
36 changes: 19 additions & 17 deletions vortex-array/src/array/constant/flatten.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use vortex_dtype::{match_each_native_ptype, Nullability};
use vortex_error::VortexResult;
use vortex_scalar::Scalar;
use vortex_dtype::{match_each_native_ptype, Nullability, PType};
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::BoolScalar;

use crate::array::bool::BoolArray;
use crate::array::constant::ConstantArray;
Expand All @@ -22,20 +22,22 @@ impl ArrayFlatten for ConstantArray<'_> {
},
};

Ok(match self.scalar() {
Scalar::Bool(b) => Flattened::Bool(BoolArray::from_vec(
vec![b.value().copied().unwrap_or_default(); self.len()],
if let Ok(b) = BoolScalar::try_from(self.scalar()) {
return Ok(Flattened::Bool(BoolArray::from_vec(
vec![b.value().unwrap_or_default(); self.len()],
validity,
)),
Scalar::Primitive(p) => {
match_each_native_ptype!(p.ptype(), |$P| {
Flattened::Primitive(PrimitiveArray::from_vec::<$P>(
vec![$P::try_from(self.scalar())?; self.len()],
validity,
))
})
}
_ => panic!("Unsupported scalar type {}", self.dtype()),
})
)));
}

if let Ok(ptype) = PType::try_from(self.scalar().dtype()) {
return match_each_native_ptype!(ptype, |$P| {
Ok(Flattened::Primitive(PrimitiveArray::from_vec::<$P>(
vec![$P::try_from(self.scalar())?; self.len()],
validity,
)))
});
}

vortex_bail!("Unsupported scalar type {}", self.dtype())
}
}
15 changes: 8 additions & 7 deletions vortex-array/src/array/constant/stats.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use std::collections::HashMap;

use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_scalar::Scalar;
use vortex_scalar::BoolScalar;

use crate::array::constant::ConstantArray;
use crate::stats::{ArrayStatisticsCompute, Stat, StatsSet};
use crate::{ArrayDType, ArrayTrait};
use crate::ArrayTrait;

impl ArrayStatisticsCompute for ConstantArray<'_> {
fn compute_statistics(&self, _stat: Stat) -> VortexResult<StatsSet> {
if matches!(self.dtype(), &DType::Bool(_)) {
let Scalar::Bool(b) = self.scalar() else {
unreachable!("Got bool dtype without bool scalar")
if let Ok(b) = BoolScalar::try_from(self.scalar()) {
let true_count = if b.value().unwrap_or(false) {
self.len() as u64
} else {
0
};
return Ok(StatsSet::from(HashMap::from([(
Stat::TrueCount,
(self.len() as u64 * b.value().cloned().map(|v| v as u64).unwrap_or(0)).into(),
true_count.into(),
)])));
}

Expand Down
36 changes: 5 additions & 31 deletions vortex-array/src/array/extension/compute.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
use arrow_array::ArrayRef as ArrowArrayRef;
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::{ExtScalar, Scalar};
use vortex_scalar::Scalar;

use crate::array::datetime::LocalDateTimeArray;
use crate::array::extension::ExtensionArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn};
use crate::compute::cast::CastFn;
use crate::compute::scalar_at::{scalar_at, ScalarAtFn};
use crate::compute::search_sorted::{
search_sorted, SearchResult, SearchSortedFn, SearchSortedSide,
};
use crate::compute::slice::{slice, SliceFn};
use crate::compute::take::{take, TakeFn};
use crate::compute::ArrayCompute;
use crate::{Array, ArrayDType, IntoArray, OwnedArray, ToStatic};
use crate::{Array, IntoArray, OwnedArray, ToStatic};

impl ArrayCompute for ExtensionArray<'_> {
fn as_arrow(&self) -> Option<&dyn AsArrowArray> {
Expand All @@ -36,10 +33,6 @@ impl ArrayCompute for ExtensionArray<'_> {
Some(self)
}

fn search_sorted(&self) -> Option<&dyn SearchSortedFn> {
Some(self)
}

fn slice(&self) -> Option<&dyn SliceFn> {
Some(self)
}
Expand Down Expand Up @@ -82,29 +75,10 @@ impl AsContiguousFn for ExtensionArray<'_> {

impl ScalarAtFn for ExtensionArray<'_> {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
Ok(Scalar::Extension(ExtScalar::try_new(
Ok(Scalar::extension(
self.ext_dtype().clone(),
self.dtype().nullability(),
Some(scalar_at(&self.storage(), index)?),
)?))
}
}

impl SearchSortedFn for ExtensionArray<'_> {
fn search_sorted(&self, value: &Scalar, side: SearchSortedSide) -> VortexResult<SearchResult> {
if value.dtype() != self.dtype() {
vortex_bail!("Value dtype does not match array dtype");
}
let Scalar::Extension(ext) = value else {
unreachable!();
};

let storage_scalar = ext
.value()
.map(|v| (**v).clone())
.unwrap_or_else(|| Scalar::null(self.dtype()));

search_sorted(&self.storage(), storage_scalar, side)
scalar_at(&self.storage(), index)?,
))
}
}

Expand Down
Loading

0 comments on commit fe1d105

Please sign in to comment.