Skip to content

Commit

Permalink
Fix chunked array stat merging (#303)
Browse files Browse the repository at this point in the history
Stats merging conflated min and max statistics and would wrongly merge max
values. If statistics were not computed for an array then merging would result
in incorrect results only spanning one part of the array
  • Loading branch information
robert3005 authored May 8, 2024
1 parent 59d4db0 commit f1242b5
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 58 deletions.
2 changes: 1 addition & 1 deletion vortex-array/src/array/chunked/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ impl ArrayStatisticsCompute for ChunkedArray<'_> {
acc.merge(&x);
acc
})
.unwrap_or_else(StatsSet::new))
.unwrap_or_default())
}
}
263 changes: 206 additions & 57 deletions vortex-array/src/stats/statsset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,25 @@ impl StatsSet {
}

fn merge_min(&mut self, other: &Self) {
self.merge_ordered(other, |other, own| other < own);
self.merge_ordered(Stat::Min, other, |other, own| other < own);
}

fn merge_max(&mut self, other: &Self) {
self.merge_ordered(other, |other, own| other > own);
self.merge_ordered(Stat::Max, other, |other, own| other > own);
}

fn merge_ordered<F: Fn(&Scalar, &Scalar) -> bool>(&mut self, other: &Self, cmp: F) {
match self.values.entry(Stat::Max) {
Entry::Occupied(mut e) => {
if let Some(omin) = other.get(Stat::Max) {
if cmp(omin, e.get()) {
e.insert(omin.clone());
}
}
}
Entry::Vacant(e) => {
if let Some(min) = other.get(Stat::Max) {
e.insert(min.clone());
/// Merges stats if both are present, if either stat is not present, drops the stat from the
/// result set. For example, if we know the minimums of two arrays, the minimum of their union
/// is the minimum-of-minimums, but if we only know the minimum of one of the two arrays, we
/// do not know the minimum of their union.
fn merge_ordered<F: Fn(&Scalar, &Scalar) -> bool>(&mut self, stat: Stat, other: &Self, cmp: F) {
if let Entry::Occupied(mut e) = self.values.entry(stat) {
if let Some(ov) = other.get(stat) {
if cmp(ov, e.get()) {
e.insert(ov.clone());
}
} else {
e.remove();
}
}
}
Expand Down Expand Up @@ -113,11 +112,19 @@ impl StatsSet {
) {
if let Some(is_sorted) = self.get_as(stat) {
if let Some(other_is_sorted) = other.get_as(stat) {
if is_sorted && other_is_sorted && cmp(self.get(Stat::Max), other.get(Stat::Min)) {
if !(self.get(Stat::Max).is_some() && other.get(Stat::Min).is_some()) {
self.values.remove(&stat);
} else if is_sorted
&& other_is_sorted
&& cmp(self.get(Stat::Max), other.get(Stat::Min))
{
return;
} else {
self.values.insert(stat, false.into());
}
} else {
self.values.remove(&stat);
}
self.values.insert(stat, false.into());
}
}

Expand All @@ -130,17 +137,12 @@ impl StatsSet {
}

fn merge_scalar_stat(&mut self, other: &Self, stat: Stat) {
match self.values.entry(stat) {
Entry::Occupied(mut e) => {
if let Some(other_value) = other.get_as::<usize>(stat) {
let self_value: usize = e.get().try_into().unwrap();
e.insert((self_value + other_value).into());
}
}
Entry::Vacant(e) => {
if let Some(min) = other.get(stat) {
e.insert(min.clone());
}
if let Entry::Occupied(mut e) = self.values.entry(stat) {
if let Some(other_value) = other.get_as::<usize>(stat) {
let self_value: usize = e.get().try_into().unwrap();
e.insert((self_value + other_value).into());
} else {
e.remove();
}
}
}
Expand All @@ -154,42 +156,32 @@ impl StatsSet {
}

fn merge_freq_stat(&mut self, other: &Self, stat: Stat) {
match self.values.entry(stat) {
Entry::Occupied(mut e) => {
if let Some(other_value) = other.get_as::<Vec<u64>>(stat) {
// TODO(robert): Avoid the copy here. We could e.get_mut() but need to figure out casting
let self_value: Vec<u64> = e.get().try_into().unwrap();
e.insert(
self_value
.iter()
.zip_eq(other_value.iter())
.map(|(s, o)| *s + *o)
.collect::<Vec<_>>()
.into(),
);
}
}
Entry::Vacant(e) => {
if let Some(other_value) = other.get(stat) {
e.insert(other_value.clone());
}
if let Entry::Occupied(mut e) = self.values.entry(stat) {
if let Some(other_value) = other.get_as::<Vec<u64>>(stat) {
// TODO(robert): Avoid the copy here. We could e.get_mut() but need to figure out casting
let self_value: Vec<u64> = e.get().try_into().unwrap();
e.insert(
self_value
.iter()
.zip_eq(other_value.iter())
.map(|(s, o)| *s + *o)
.collect::<Vec<_>>()
.into(),
);
} else {
e.remove();
}
}
}

/// Merged run count is an upper bound where we assume run is interrupted at the boundary
fn merge_run_count(&mut self, other: &Self) {
match self.values.entry(Stat::RunCount) {
Entry::Occupied(mut e) => {
if let Some(other_value) = other.get_as::<usize>(Stat::RunCount) {
let self_value: usize = e.get().try_into().unwrap();
e.insert((self_value + other_value + 1).into());
}
}
Entry::Vacant(e) => {
if let Some(min) = other.get(Stat::RunCount) {
e.insert(min.clone());
}
if let Entry::Occupied(mut e) = self.values.entry(Stat::RunCount) {
if let Some(other_value) = other.get_as::<usize>(Stat::RunCount) {
let self_value: usize = e.get().try_into().unwrap();
e.insert((self_value + other_value + 1).into());
} else {
e.remove();
}
}
}
Expand All @@ -210,3 +202,160 @@ impl IntoIterator for StatsSet {
self.values.into_iter()
}
}

#[cfg(test)]
mod test {
use itertools::Itertools;

use crate::stats::{Stat, StatsSet};

#[test]
fn merge_into_min() {
let mut first = StatsSet::of(Stat::Min, 42.into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::Min), None);
}

#[test]
fn merge_from_min() {
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::Min, 42.into()));
assert_eq!(first.get(Stat::Min), None);
}

#[test]
fn merge_mins() {
let mut first = StatsSet::of(Stat::Min, 37.into());
first.merge(&StatsSet::of(Stat::Min, 42.into()));
assert_eq!(first.get(Stat::Min).cloned(), Some(37.into()));
}

#[test]
fn merge_into_max() {
let mut first = StatsSet::of(Stat::Max, 42.into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::Max), None);
}

#[test]
fn merge_from_max() {
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::Max, 42.into()));
assert_eq!(first.get(Stat::Max), None);
}

#[test]
fn merge_maxes() {
let mut first = StatsSet::of(Stat::Max, 37.into());
first.merge(&StatsSet::of(Stat::Max, 42.into()));
assert_eq!(first.get(Stat::Max).cloned(), Some(42.into()));
}

#[test]
fn merge_into_scalar() {
let mut first = StatsSet::of(Stat::TrueCount, 42.into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::TrueCount), None);
}

#[test]
fn merge_from_scalar() {
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::TrueCount, 42.into()));
assert_eq!(first.get(Stat::TrueCount), None);
}

#[test]
fn merge_scalars() {
let mut first = StatsSet::of(Stat::TrueCount, 37.into());
first.merge(&StatsSet::of(Stat::TrueCount, 42.into()));
assert_eq!(first.get(Stat::TrueCount).cloned(), Some(79u64.into()));
}

#[test]
fn merge_into_freq() {
let vec = (0..255).collect_vec();
let mut first = StatsSet::of(Stat::BitWidthFreq, vec.clone().into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::BitWidthFreq), None);
}

#[test]
fn merge_from_freq() {
let vec = (0..255).collect_vec();
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::BitWidthFreq, vec.clone().into()));
assert_eq!(first.get(Stat::BitWidthFreq), None);
}

#[test]
fn merge_freqs() {
let vec_in = vec![5u64; 256];
let vec_out = vec![10u64; 256];
let mut first = StatsSet::of(Stat::BitWidthFreq, vec_in.clone().into());
first.merge(&StatsSet::of(Stat::BitWidthFreq, vec_in.clone().into()));
assert_eq!(
first.get(Stat::BitWidthFreq).cloned(),
Some(vec_out.clone().into())
);
}

#[test]
fn merge_into_sortedness() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.merge(&StatsSet::new());
assert_eq!(first.get(Stat::IsStrictSorted), None);
}

#[test]
fn merge_from_sortedness() {
let mut first = StatsSet::new();
first.merge(&StatsSet::of(Stat::IsStrictSorted, true.into()));
assert_eq!(first.get(Stat::IsStrictSorted), None);
}

#[test]
fn merge_sortedness() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.set(Stat::Max, 1.into());
let mut second = StatsSet::of(Stat::IsStrictSorted, true.into());
second.set(Stat::Min, 2.into());
first.merge(&second);
assert_eq!(first.get(Stat::IsStrictSorted).cloned(), Some(true.into()));
}

#[test]
fn merge_sortedness_out_of_order() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.set(Stat::Min, 1.into());
let mut second = StatsSet::of(Stat::IsStrictSorted, true.into());
second.set(Stat::Max, 2.into());
second.merge(&first);
assert_eq!(
second.get(Stat::IsStrictSorted).cloned(),
Some(false.into())
);
}

#[test]
fn merge_sortedness_only_one_sorted() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.set(Stat::Max, 1.into());
let mut second = StatsSet::of(Stat::IsStrictSorted, false.into());
second.set(Stat::Min, 2.into());
first.merge(&second);
assert_eq!(
second.get(Stat::IsStrictSorted).cloned(),
Some(false.into())
);
}

#[test]
fn merge_sortedness_missing_min() {
let mut first = StatsSet::of(Stat::IsStrictSorted, true.into());
first.set(Stat::Max, 1.into());
let second = StatsSet::of(Stat::IsStrictSorted, true.into());
first.merge(&second);
assert_eq!(first.get(Stat::IsStrictSorted).cloned(), None);
}
}

0 comments on commit f1242b5

Please sign in to comment.