Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve performance of filter_period_intersect #436

Merged
merged 3 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aw-query/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ mod qfunctions {
let events: Vec<Event> = (&args[0]).try_into()?;
let filter_events: Vec<Event> = (&args[1]).try_into()?;

let mut filtered_events = aw_transform::filter_period_intersect(&events, &filter_events);
let mut filtered_events = aw_transform::filter_period_intersect(events, filter_events);
let mut filtered_tagged_events = Vec::new();
for event in filtered_events.drain(..) {
filtered_tagged_events.push(DataType::Event(event));
Expand Down
2 changes: 1 addition & 1 deletion aw-transform/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn bench_filter_period_intersect(c: &mut Criterion) {
c.bench_function("1000 events", |b| {
b.iter(|| {
let events1 = create_events(1000);
filter_period_intersect(&events1, &events2);
filter_period_intersect(events1, events2.clone());
})
});
}
Expand Down
105 changes: 83 additions & 22 deletions aw-transform/src/filter_period.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use aw_models::Event;
use chrono::{DateTime, Utc};
use chrono::Duration;

use crate::sort_by_timestamp;

/// Removes events not intersecting with the provided filter_events
///
Expand All @@ -15,33 +17,55 @@ use chrono::{DateTime, Utc};
/// filter_events: [ ] [ ]
/// output: [a ] [a ][b ]
/// ```
pub fn filter_period_intersect(events: &[Event], filter_events: &[Event]) -> Vec<Event> {
pub fn filter_period_intersect(events: Vec<Event>, filter_events: Vec<Event>) -> Vec<Event> {
if events.len() == 0 || filter_events.len() == 0 {
return Vec::new();
}

let mut filtered_events = Vec::new();
let events = sort_by_timestamp(events);
let filter_events = sort_by_timestamp(filter_events);

// Start with pre-calculating endtimes of events
let mut events_with_endtimes: Vec<(&Event, DateTime<Utc>)> = Vec::new();
for event in events {
events_with_endtimes.push((event, event.calculate_endtime()));
}
let mut events_iter = events.into_iter();
let mut filter_events_iter = filter_events.into_iter();
let mut cur_event = events_iter.next().unwrap();
let mut cur_filter_event = filter_events_iter.next().unwrap();

// Do actual filtering
for filter in filter_events {
let filter_endtime = filter.calculate_endtime();
for (event, event_endtime) in &events_with_endtimes {
if event.timestamp > filter_endtime {
continue;
loop {
let event_endtime = cur_event.calculate_endtime();
let filter_endtime = cur_filter_event.calculate_endtime();
if cur_event.duration == Duration::seconds(0) || event_endtime <= cur_filter_event.timestamp
{
match events_iter.next() {
Some(e) => {
cur_event = e;
continue;
}
None => return filtered_events,
}
if *event_endtime < filter.timestamp {
continue;
}
if cur_event.timestamp >= cur_filter_event.calculate_endtime() {
match filter_events_iter.next() {
Some(e) => {
cur_filter_event = e;
continue;
}
None => return filtered_events,
}
let mut e = (*event).clone();
e.timestamp = std::cmp::max(e.timestamp, filter.timestamp);
let endtime = std::cmp::min(*event_endtime, filter_endtime);
e.duration = endtime - e.timestamp;
filtered_events.push(e);
}

let mut e = cur_event.clone();
e.timestamp = std::cmp::max(e.timestamp, cur_filter_event.timestamp);
let endtime = std::cmp::min(event_endtime, filter_endtime);
e.duration = endtime - e.timestamp;

// trim current event
let old_timestamp = cur_event.timestamp;
cur_event.timestamp = e.timestamp + e.duration;
cur_event.duration = old_timestamp + cur_event.duration - cur_event.timestamp;

filtered_events.push(e);
}
filtered_events
}

#[cfg(test)]
Expand Down Expand Up @@ -81,7 +105,8 @@ mod tests {
data: json_map! {"test": json!(1)},
};

let filtered_events = filter_period_intersect(&vec![e1, e2, e3, e4, e5], &[filter_event]);
let filtered_events =
filter_period_intersect(vec![e1, e2, e3, e4, e5], vec![filter_event.clone()]);
assert_eq!(filtered_events.len(), 3);
assert_eq!(filtered_events[0].duration, Duration::milliseconds(500));
assert_eq!(filtered_events[1].duration, Duration::milliseconds(1000));
Expand All @@ -93,5 +118,41 @@ mod tests {
assert_eq!(filtered_events[1].timestamp, dt);
let dt: DateTime<Utc> = DateTime::from_str("2000-01-01T00:00:04.000Z").unwrap();
assert_eq!(filtered_events[2].timestamp, dt);

let timestamp_01s = DateTime::from_str("2000-01-01T00:00:01Z").unwrap();
let e = Event {
id: None,
timestamp: timestamp_01s,
duration: Duration::seconds(1),
data: json_map! {"test": json!(1)},
};
let mut f2 = filter_event.clone();
f2.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap();
f2.duration = Duration::milliseconds(1500);
let res = filter_period_intersect(vec![e.clone()], vec![f2]);
assert_eq!(res[0].timestamp, timestamp_01s);
assert_eq!(res[0].duration, Duration::milliseconds(500));

let timestamp_01_5s = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap();
let mut f3 = filter_event.clone();
f3.timestamp = timestamp_01_5s;
f3.duration = Duration::milliseconds(1000);
let res = filter_period_intersect(vec![e.clone()], vec![f3]);
assert_eq!(res[0].timestamp, timestamp_01_5s);
assert_eq!(res[0].duration, Duration::milliseconds(500));

let mut f4 = filter_event.clone();
f4.timestamp = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap();
f4.duration = Duration::milliseconds(100);
let res = filter_period_intersect(vec![e.clone()], vec![f4]);
assert_eq!(res[0].timestamp, timestamp_01_5s);
assert_eq!(res[0].duration, Duration::milliseconds(100));

let mut f5 = filter_event.clone();
f5.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap();
f5.duration = Duration::seconds(10);
let res = filter_period_intersect(vec![e.clone()], vec![f5]);
assert_eq!(res[0].timestamp, timestamp_01s);
assert_eq!(res[0].duration, Duration::milliseconds(1000));
}
}
Loading