Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix chunked array stat merging #303

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vortex-array/src/array/chunked/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ impl ArrayStatisticsCompute for ChunkedArray<'_> {
acc.merge(&x);
acc
})
.unwrap_or_else(StatsSet::new))
.unwrap_or_default())
}
}
263 changes: 206 additions & 57 deletions vortex-array/src/stats/statsset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,25 @@ impl StatsSet {
}

fn merge_min(&mut self, other: &Self) {
self.merge_ordered(other, |other, own| other < own);
self.merge_ordered(Stat::Min, other, |other, own| other < own);
}

fn merge_max(&mut self, other: &Self) {
self.merge_ordered(other, |other, own| other > own);
self.merge_ordered(Stat::Max, other, |other, own| other > own);
}

fn merge_ordered<F: Fn(&Scalar, &Scalar) -> bool>(&mut self, other: &Self, cmp: F) {
match self.values.entry(Stat::Max) {
Entry::Occupied(mut e) => {
if let Some(omin) = other.get(Stat::Max) {
if cmp(omin, e.get()) {
e.insert(omin.clone());
}
}
}
Entry::Vacant(e) => {
if let Some(min) = other.get(Stat::Max) {
e.insert(min.clone());
/// Merges stats if both are present, if either stat is not present, drops the stat from the
/// result set. For example, if we know the minimums of two arrays, the minimum of their union
/// is the minimum-of-minimums, but if we only know the minimum of one of the two arrays, we
/// do not know the minimum of their union.
fn merge_ordered<F: Fn(&Scalar, &Scalar) -> bool>(&mut self, stat: Stat, other: &Self, cmp: F) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth a docstring, eg

/// Merges stats if both are present, if either stat is not present, drops the stat from the result set. For example, if we know the minimums of two arrays, the minimum of their union is the minimum-of-minimums, but if we only know the minimum of one of the two arrays, we do not know the minimum of their union.

if let Entry::Occupied(mut e) = self.values.entry(stat) {
if let Some(ov) = other.get(stat) {
if cmp(ov, e.get()) {
e.insert(ov.clone());
}
} else {
e.remove();
}
}
}
Expand Down Expand Up @@ -113,11 +112,19 @@ impl StatsSet {
) {
if let Some(is_sorted) = self.get_as(stat) {
if let Some(other_is_sorted) = other.get_as(stat) {
if is_sorted && other_is_sorted && cmp(self.get(Stat::Max), other.get(Stat::Min)) {
if !(self.get(Stat::Max).is_some() && other.get(Stat::Min).is_some()) {
self.values.remove(&stat);
} else if is_sorted
&& other_is_sorted
&& cmp(self.get(Stat::Max), other.get(Stat::Min))
{
return;
} else {
self.values.insert(stat, false.into());
}
} else {
self.values.remove(&stat);
}
self.values.insert(stat, false.into());
}
}

Expand All @@ -130,17 +137,12 @@ impl StatsSet {
}

fn merge_scalar_stat(&mut self, other: &Self, stat: Stat) {
match self.values.entry(stat) {
Entry::Occupied(mut e) => {
if let Some(other_value) = other.get_as::<usize>(stat) {
let self_value: usize = e.get().try_into().unwrap();
e.insert((self_value + other_value).into());
}
}
Entry::Vacant(e) => {
if let Some(min) = other.get(stat) {
e.insert(min.clone());
}
if let Entry::Occupied(mut e) = self.values.entry(stat) {
if let Some(other_value) = other.get_as::<usize>(stat) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters because negative values will panic on the coercion to usize, but this will mutate the PType of the scalars to be merged -- ie if we merge an u32 with an u32, the result of this will be a u64.

Also maybe worth renaming this method to merge_count_stat

let self_value: usize = e.get().try_into().unwrap();
e.insert((self_value + other_value).into());
} else {
e.remove();
}
}
}
Expand All @@ -154,42 +156,32 @@ impl StatsSet {
}

fn merge_freq_stat(&mut self, other: &Self, stat: Stat) {
match self.values.entry(stat) {
Entry::Occupied(mut e) => {
if let Some(other_value) = other.get_as::<Vec<u64>>(stat) {
// TODO(robert): Avoid the copy here. We could e.get_mut() but need to figure out casting
let self_value: Vec<u64> = e.get().try_into().unwrap();
e.insert(
self_value
.iter()
.zip_eq(other_value.iter())
.map(|(s, o)| *s + *o)
.collect::<Vec<_>>()
.into(),
);
}
}
Entry::Vacant(e) => {
if let Some(other_value) = other.get(stat) {
e.insert(other_value.clone());
}
if let Entry::Occupied(mut e) = self.values.entry(stat) {
if let Some(other_value) = other.get_as::<Vec<u64>>(stat) {
// TODO(robert): Avoid the copy here. We could e.get_mut() but need to figure out casting
let self_value: Vec<u64> = e.get().try_into().unwrap();
e.insert(
self_value
.iter()
.zip_eq(other_value.iter())
.map(|(s, o)| *s + *o)
.collect::<Vec<_>>()
.into(),
);
} else {
e.remove();
}
}
}

/// Merged run count is an upper bound where we assume run is interrupted at the boundary
fn merge_run_count(&mut self, other: &Self) {
match self.values.entry(Stat::RunCount) {
Entry::Occupied(mut e) => {
if let Some(other_value) = other.get_as::<usize>(Stat::RunCount) {
let self_value: usize = e.get().try_into().unwrap();
e.insert((self_value + other_value + 1).into());
}
}
Entry::Vacant(e) => {
if let Some(min) = other.get(Stat::RunCount) {
e.insert(min.clone());
}
if let Entry::Occupied(mut e) = self.values.entry(Stat::RunCount) {
if let Some(other_value) = other.get_as::<usize>(Stat::RunCount) {
let self_value: usize = e.get().try_into().unwrap();
e.insert((self_value + other_value + 1).into());
} else {
e.remove();
}
}
}
Expand All @@ -210,3 +202,160 @@ impl IntoIterator for StatsSet {
self.values.into_iter()
}
}

#[cfg(test)]
mod test {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want tests for the other codepaths as well

use itertools::Itertools;

use crate::stats::{Stat, StatsSet};

#[test]
fn merge_into_min() {
let mut first = StatsSet::of(Stat::Min, 42.into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::Min), None);
}

#[test]
fn merge_from_min() {
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::Min, 42.into()));
assert_eq!(first.get(Stat::Min), None);
}

#[test]
fn merge_mins() {
let mut first = StatsSet::of(Stat::Min, 37.into());
first.merge(&StatsSet::of(Stat::Min, 42.into()));
assert_eq!(first.get(Stat::Min).cloned(), Some(37.into()));
}

#[test]
fn merge_into_max() {
let mut first = StatsSet::of(Stat::Max, 42.into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::Max), None);
}

#[test]
fn merge_from_max() {
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::Max, 42.into()));
assert_eq!(first.get(Stat::Max), None);
}

#[test]
fn merge_maxes() {
let mut first = StatsSet::of(Stat::Max, 37.into());
first.merge(&StatsSet::of(Stat::Max, 42.into()));
assert_eq!(first.get(Stat::Max).cloned(), Some(42.into()));
}

#[test]
fn merge_into_scalar() {
let mut first = StatsSet::of(Stat::TrueCount, 42.into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::TrueCount), None);
}

#[test]
fn merge_from_scalar() {
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::TrueCount, 42.into()));
assert_eq!(first.get(Stat::TrueCount), None);
}

#[test]
fn merge_scalars() {
let mut first = StatsSet::of(Stat::TrueCount, 37.into());
first.merge(&StatsSet::of(Stat::TrueCount, 42.into()));
assert_eq!(first.get(Stat::TrueCount).cloned(), Some(79u64.into()));
}

#[test]
fn merge_into_freq() {
let vec = (0..255).collect_vec();
let mut first = StatsSet::of(Stat::BitWidthFreq, vec.clone().into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::BitWidthFreq), None);
}

#[test]
fn merge_from_freq() {
let vec = (0..255).collect_vec();
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::BitWidthFreq, vec.clone().into()));
assert_eq!(first.get(Stat::BitWidthFreq), None);
}

#[test]
fn merge_freqs() {
let vec_in = vec![5u64; 256];
let vec_out = vec![10u64; 256];
let mut first = StatsSet::of(Stat::BitWidthFreq, vec_in.clone().into());
first.merge(&StatsSet::of(Stat::BitWidthFreq, vec_in.clone().into()));
assert_eq!(
first.get(Stat::BitWidthFreq).cloned(),
Some(vec_out.clone().into())
);
}

#[test]
fn merge_into_sortedness() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::IsStrictSorted), None);
}

#[test]
fn merge_from_sortedness() {
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::IsStrictSorted, true.into()));
assert_eq!(first.get(Stat::IsStrictSorted), None);
}

#[test]
fn merge_sortedness() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.set(Stat::Max, 1.into());
let mut second = StatsSet::of(Stat::IsStrictSorted, true.into());
second.set(Stat::Min, 2.into());
first.merge(&second);
assert_eq!(first.get(Stat::IsStrictSorted).cloned(), Some(true.into()));
}

#[test]
fn merge_sortedness_out_of_order() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.set(Stat::Min, 1.into());
let mut second = StatsSet::of(Stat::IsStrictSorted, true.into());
second.set(Stat::Max, 2.into());
second.merge(&first);
assert_eq!(
second.get(Stat::IsStrictSorted).cloned(),
Some(false.into())
);
}

#[test]
fn merge_sortedness_only_one_sorted() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.set(Stat::Max, 1.into());
let mut second = StatsSet::of(Stat::IsStrictSorted, false.into());
second.set(Stat::Min, 2.into());
first.merge(&second);
assert_eq!(
second.get(Stat::IsStrictSorted).cloned(),
Some(false.into())
);
}

#[test]
fn merge_sortedness_missing_min() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.set(Stat::Max, 1.into());
let second = StatsSet::of(Stat::IsStrictSorted, true.into());
first.merge(&second);
assert_eq!(first.get(Stat::IsStrictSorted).cloned(), None);
}
}