Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: exclusion logic in udfs #25890

Merged
merged 59 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
a144c72
failing test
aspicer Oct 29, 2024
2207a33
exclusion test
aspicer Oct 29, 2024
56c12b1
hm
aspicer Oct 29, 2024
9835346
excluded trends
aspicer Oct 29, 2024
b4ce9f4
correct behavior
aspicer Oct 29, 2024
23ab058
tests
aspicer Oct 30, 2024
f1b514c
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
63db00b
testing
aspicer Oct 30, 2024
d3c7ef0
working normal
aspicer Oct 30, 2024
670d8b0
test improvement
aspicer Oct 30, 2024
c5fa934
remove returns
aspicer Oct 30, 2024
45ed32b
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
5d5bb74
test funnel
aspicer Oct 30, 2024
a4f2e0b
trends fix
aspicer Oct 30, 2024
3b93fd9
passing
aspicer Oct 30, 2024
9f0317d
v3 user scripts
aspicer Oct 30, 2024
e2a5b17
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
aaaa793
Update query snapshots
github-actions[bot] Oct 30, 2024
35e9c5d
Update query snapshots
github-actions[bot] Oct 30, 2024
3c5fa37
Update query snapshots
github-actions[bot] Oct 30, 2024
24a3012
Update query snapshots
github-actions[bot] Oct 30, 2024
6c1f31e
terminate
aspicer Oct 30, 2024
bea6b57
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
dbdbdeb
last test
aspicer Oct 30, 2024
cd0a1d3
Update query snapshots
github-actions[bot] Oct 30, 2024
42f231f
Update query snapshots
github-actions[bot] Oct 30, 2024
8bcb758
Update query snapshots
github-actions[bot] Oct 30, 2024
77b0f51
fix trends
aspicer Oct 30, 2024
20c78a0
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
162fe51
Update query snapshots
github-actions[bot] Oct 30, 2024
2ac30e6
update
aspicer Oct 30, 2024
1f4ba12
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
bdcd84f
test updates
aspicer Oct 30, 2024
f26e665
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
4b9b446
matchy match
aspicer Oct 30, 2024
5d1c39c
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
3f07954
test updates
aspicer Oct 30, 2024
999bb83
udf versioning
aspicer Oct 30, 2024
09f2013
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
9a854ff
last test for change
aspicer Oct 30, 2024
786ec6f
Merge branch 'aspicer/trends_exclusions' of github.com:PostHog/postho…
aspicer Oct 30, 2024
250ed21
remove v1
aspicer Oct 30, 2024
4673c99
merge
aspicer Oct 30, 2024
0ef4fc1
Update query snapshots
github-actions[bot] Oct 30, 2024
8f975ab
Update query snapshots
github-actions[bot] Oct 30, 2024
569671d
merge
aspicer Oct 31, 2024
abddaf2
Update UI snapshots for `chromium` (1)
github-actions[bot] Oct 31, 2024
5efb971
Update query snapshots
github-actions[bot] Oct 31, 2024
ed8fb7e
Update query snapshots
github-actions[bot] Oct 31, 2024
3e5176c
Update UI snapshots for `chromium` (1)
github-actions[bot] Oct 31, 2024
18fd02f
Merge branch 'master' into aspicer/trends_exclusions
aspicer Oct 31, 2024
3bb6d3a
Update UI snapshots for `chromium` (1)
github-actions[bot] Oct 31, 2024
2bc9501
Update UI snapshots for `chromium` (1)
github-actions[bot] Oct 31, 2024
fd2afa0
Update UI snapshots for `chromium` (2)
github-actions[bot] Oct 31, 2024
b301e05
Update UI snapshots for `chromium` (2)
github-actions[bot] Oct 31, 2024
7568188
Merge branch 'master' into aspicer/trends_exclusions
aspicer Oct 31, 2024
fe39184
wtf
aspicer Oct 31, 2024
b8886d6
Update UI snapshots for `chromium` (2)
github-actions[bot] Oct 31, 2024
d7a1d33
Update UI snapshots for `chromium` (2)
github-actions[bot] Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
92 changes: 54 additions & 38 deletions funnel-udf/src/steps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::PropVal;
#[derive(Clone, Deserialize)]
struct EnteredTimestamp {
timestamp: f64,
excluded: bool,
timings: Vec<f64>,
uuids: Vec<Uuid>,
}
Expand Down Expand Up @@ -48,6 +49,7 @@ const MAX_REPLAY_EVENTS: usize = 10;

const DEFAULT_ENTERED_TIMESTAMP: EnteredTimestamp = EnteredTimestamp {
timestamp: 0.0,
excluded: false,
timings: vec![],
uuids: vec![],
};
Expand Down Expand Up @@ -101,32 +103,29 @@ impl AggregateFunnelRow {
let events_with_same_timestamp: Vec<_> = events_with_same_timestamp.collect();
vars.entered_timestamp[0] = EnteredTimestamp {
timestamp,
excluded: false,
timings: vec![],
uuids: vec![],
};

if events_with_same_timestamp.len() == 1 {
if !self.process_event(
self.process_event(
args,
&mut vars,
&events_with_same_timestamp[0],
prop_val,
false
) {
return;
}
);
} else if events_with_same_timestamp.iter().map(|x| &x.steps).all_equal() {
// Deal with the most common case where they are all the same event (order doesn't matter)
for event in events_with_same_timestamp {
if !self.process_event(
self.process_event(
args,
&mut vars,
event,
prop_val,
false
) {
return;
}
);
}
} else {
// Handle permutations for different events with the same timestamp
Expand All @@ -144,15 +143,13 @@ impl AggregateFunnelRow {

// Run exclusions, if they exist, then run matching events.
for event in sorted_events {
if !self.process_event(
self.process_event(
args,
&mut vars,
&event,
&prop_val,
true
) {
return;
}
);
}
}

Expand All @@ -166,6 +163,11 @@ impl AggregateFunnelRow {
let final_index = vars.max_step.0;
let final_value = &vars.max_step.1;

if final_value.excluded {
self.results.push(Result(-1, prop_val.clone(), vec![], vec![]));
return;
}

for i in 0..final_index {
//if event_uuids[i].len() >= MAX_REPLAY_EVENTS && !event_uuids[i].contains(&final_value.uuids[i]) {
// Always put the actual event uuids first, we use it to extract timestamps
Expand All @@ -188,7 +190,7 @@ impl AggregateFunnelRow {
event: &Event,
prop_val: &PropVal,
processing_multiple_events: bool
) -> bool {
) {
for step in event.steps.iter().rev() {
let mut exclusion = false;
let step = (if *step < 0 {
Expand All @@ -199,35 +201,51 @@ impl AggregateFunnelRow {
}) as usize;

let in_match_window = (event.timestamp - vars.entered_timestamp[step - 1].timestamp) <= args.conversion_window_limit as f64;
let previous_step_excluded = vars.entered_timestamp[step-1].excluded;
let already_reached_this_step = vars.entered_timestamp[step].timestamp == vars.entered_timestamp[step - 1].timestamp
&& vars.entered_timestamp[step].timestamp != 0.0;

if in_match_window && !already_reached_this_step {
if exclusion {
self.results.push(Result(-1, prop_val.clone(), vec![], vec![]));
return false;
}
let is_unmatched_step_attribution = self.breakdown_step.map(|breakdown_step| step == breakdown_step - 1).unwrap_or(false) && *prop_val != event.breakdown;
let already_used_event = processing_multiple_events && vars.entered_timestamp[step-1].uuids.contains(&event.uuid);
if !is_unmatched_step_attribution && !already_used_event {
vars.entered_timestamp[step] = EnteredTimestamp {
timestamp: vars.entered_timestamp[step - 1].timestamp,
timings: {
let mut timings = vars.entered_timestamp[step - 1].timings.clone();
timings.push(event.timestamp);
timings
},
uuids: {
let mut uuids = vars.entered_timestamp[step - 1].uuids.clone();
uuids.push(event.uuid);
uuids
},
};
if vars.event_uuids[step - 1].len() < MAX_REPLAY_EVENTS - 1 {
vars.event_uuids[step - 1].push(event.uuid);
if !previous_step_excluded {
vars.entered_timestamp[step - 1].excluded = true;
if vars.max_step.0 == step - 1 {
let max_timestamp_in_match_window = (event.timestamp - vars.max_step.1.timestamp) <= args.conversion_window_limit as f64;
if max_timestamp_in_match_window {
vars.max_step.1.excluded = true;
}
}
}
if step > vars.max_step.0 {
vars.max_step = (step, vars.entered_timestamp[step].clone());
} else {
let is_unmatched_step_attribution = self.breakdown_step.map(|breakdown_step| step == breakdown_step - 1).unwrap_or(false) && *prop_val != event.breakdown;
let already_used_event = processing_multiple_events && vars.entered_timestamp[step - 1].uuids.contains(&event.uuid);
if !is_unmatched_step_attribution && !already_used_event {
let new_entered_timestamp = |vars: &Vars| -> EnteredTimestamp {
EnteredTimestamp {
timestamp: vars.entered_timestamp[step - 1].timestamp,
excluded: previous_step_excluded,
timings: {
let mut timings = vars.entered_timestamp[step - 1].timings.clone();
timings.push(event.timestamp);
timings
},
uuids: {
let mut uuids = vars.entered_timestamp[step - 1].uuids.clone();
uuids.push(event.uuid);
uuids
},
}
};
if !previous_step_excluded {
vars.entered_timestamp[step] = new_entered_timestamp(vars);
if vars.event_uuids[step - 1].len() < MAX_REPLAY_EVENTS - 1 {
vars.event_uuids[step - 1].push(event.uuid);
}
}

if step > vars.max_step.0 || (step == vars.max_step.0 && vars.max_step.1.excluded) {
vars.max_step = (step, new_entered_timestamp(vars));
}
}
}
}
Expand All @@ -242,7 +260,5 @@ impl AggregateFunnelRow {
}
}
}

true
}
}
126 changes: 88 additions & 38 deletions funnel-udf/src/trends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ where

#[derive(Clone, Deserialize)]
struct EnteredTimestamp {
timestamp: f64
timestamp: f64,
excluded: bool,
}

#[derive(Clone, Deserialize)]
Expand All @@ -40,12 +41,28 @@ struct Args {
value: Vec<Event>,
}

// The Exclusion enum is used to label the max step
// A full exclusion is when there has been an event, a matching exclusion, and a matching event
// A partial exclusion is when there has been an event, and a matching exclusion
#[derive(PartialEq)]
enum Exclusion {
Not,
Partial,
Full,
}

#[derive(Serialize)]
struct ResultStruct(u64, i8, PropVal, Uuid);

struct MaxStep {
step: usize,
timestamp: f64,
excluded: Exclusion,
event_uuid: Uuid,
}

struct IntervalData {
max_step: usize,
max_step_event_uuid: Uuid,
max_step: MaxStep,
entered_timestamp: Vec<EnteredTimestamp>,
}

Expand All @@ -60,6 +77,7 @@ struct AggregateFunnelRow {

const DEFAULT_ENTERED_TIMESTAMP: EnteredTimestamp = EnteredTimestamp {
timestamp: 0.0,
excluded: false
};

pub fn process_line(line: &str) -> Value {
Expand Down Expand Up @@ -107,22 +125,23 @@ impl AggregateFunnelRow {
for (_timestamp, events_with_same_timestamp) in &filtered_events {
let events_with_same_timestamp: Vec<_> = events_with_same_timestamp.collect();
for event in events_with_same_timestamp {
if !self.process_event(
self.process_event(
args,
&mut vars,
&event,
prop_val,
) {
return
}
);
}
}


// At this point, everything left in entered_timestamps is a failure, if it has made it to from_step
for interval_data in vars.interval_start_to_entered_timestamps.values() {
if !self.results.contains_key(&(interval_data.entered_timestamp[0].timestamp as u64)) && interval_data.max_step >= args.from_step + 1 {
self.results.insert(interval_data.entered_timestamp[0].timestamp as u64, ResultStruct(interval_data.entered_timestamp[0].timestamp as u64, -1, prop_val.clone(), interval_data.max_step_event_uuid));
// At this point, everything left in entered_timestamps is an entry, but not an exit, if it has made it to from_step
// When there is an exclusion, we drop all partial matches and only return full matches
let fully_excluded = vars.interval_start_to_entered_timestamps.values().find(|interval_data| interval_data.max_step.excluded == Exclusion::Full);
if fully_excluded.is_none() {
for (interval_start, interval_data) in vars.interval_start_to_entered_timestamps.into_iter() {
if !self.results.contains_key(&interval_start) && interval_data.max_step.step >= args.from_step + 1 && interval_data.max_step.excluded != Exclusion::Partial {
self.results.insert(interval_start, ResultStruct(interval_start, -1, prop_val.clone(), interval_data.max_step.event_uuid));
}
}
}
}
Expand All @@ -134,7 +153,7 @@ impl AggregateFunnelRow {
vars: &mut Vars,
event: &Event,
prop_val: &PropVal,
) -> bool {
) {
for step in event.steps.iter().rev() {
let mut exclusion = false;
let step = (if *step < 0 {
Expand All @@ -145,42 +164,74 @@ impl AggregateFunnelRow {
}) as usize;

if step == 1 {
if !vars.interval_start_to_entered_timestamps.contains_key(&event.interval_start) && !self.results.contains_key(&event.interval_start) {
let mut entered_timestamp = vec![DEFAULT_ENTERED_TIMESTAMP.clone(); args.num_steps + 1];
entered_timestamp[0] = EnteredTimestamp { timestamp: event.interval_start as f64 };
entered_timestamp[1] = EnteredTimestamp { timestamp: event.timestamp };
vars.interval_start_to_entered_timestamps.insert(event.interval_start, IntervalData { max_step: 1, max_step_event_uuid: event.uuid, entered_timestamp: entered_timestamp });
if !self.results.contains_key(&event.interval_start) {
let entered_timestamp_one = EnteredTimestamp { timestamp: event.timestamp, excluded: false };
let interval = vars.interval_start_to_entered_timestamps.get_mut(&event.interval_start);
if interval.is_none() || interval.as_ref().map( | interval | interval.max_step.step == 1 && interval.max_step.excluded != Exclusion::Not).unwrap() {
let mut entered_timestamp = vec![DEFAULT_ENTERED_TIMESTAMP.clone(); args.num_steps + 1];
entered_timestamp[1] = entered_timestamp_one;
let interval_data = IntervalData {
max_step: MaxStep {
step: 1,
timestamp: event.timestamp,
excluded: Exclusion::Not,
event_uuid: event.uuid,
},
entered_timestamp: entered_timestamp
};
vars.interval_start_to_entered_timestamps.insert(event.interval_start, interval_data);
} else {
interval.unwrap().entered_timestamp[1] = entered_timestamp_one;
}
}
} else {
for interval_data in vars.interval_start_to_entered_timestamps.values_mut() {
vars.interval_start_to_entered_timestamps.retain(|&interval_start, interval_data| {
let in_match_window = (event.timestamp - interval_data.entered_timestamp[step - 1].timestamp) <= args.conversion_window_limit as f64;
let previous_step_excluded = interval_data.entered_timestamp[step-1].excluded;
let already_reached_this_step = interval_data.entered_timestamp[step].timestamp == interval_data.entered_timestamp[step - 1].timestamp;
if in_match_window && !already_reached_this_step {
if exclusion {
return false;
}
let is_unmatched_step_attribution = self.breakdown_step.map(|breakdown_step| step == breakdown_step - 1).unwrap_or(false) && *prop_val != event.breakdown;
if !is_unmatched_step_attribution {
interval_data.entered_timestamp[step] = EnteredTimestamp {
timestamp: interval_data.entered_timestamp[step - 1].timestamp
};
// check if we have hit the goal. if we have, remove it from the list and add it to the successful_timestamps
if interval_data.entered_timestamp[args.num_steps].timestamp != 0.0 {
self.results.insert(
interval_data.entered_timestamp[0].timestamp as u64,
ResultStruct(interval_data.entered_timestamp[0].timestamp as u64, 1, prop_val.clone(), event.uuid)
);
} else if step > interval_data.max_step {
interval_data.max_step = step;
interval_data.max_step_event_uuid = event.uuid;
if !previous_step_excluded {
interval_data.entered_timestamp[step - 1].excluded = true;
if interval_data.max_step.step == step - 1 {
let max_timestamp_in_match_window = (event.timestamp - interval_data.max_step.timestamp) <= args.conversion_window_limit as f64;
if max_timestamp_in_match_window {
interval_data.max_step.excluded = Exclusion::Partial;
}
}
}
} else {
let is_unmatched_step_attribution = self.breakdown_step.map(|breakdown_step| step == breakdown_step - 1).unwrap_or(false) && *prop_val != event.breakdown;
if !is_unmatched_step_attribution {
if !previous_step_excluded {
interval_data.entered_timestamp[step] = EnteredTimestamp {
timestamp: interval_data.entered_timestamp[step - 1].timestamp,
excluded: false,
};
}
// check if we have hit the goal. if we have, remove it from the list and add it to the successful_timestamps
if interval_data.entered_timestamp[args.num_steps].timestamp != 0.0 {
self.results.insert(
interval_start,
ResultStruct(interval_start, 1, prop_val.clone(), event.uuid)
);
return false;
} else if step > interval_data.max_step.step || (step == interval_data.max_step.step && interval_data.max_step.excluded == Exclusion::Partial) {
interval_data.max_step = MaxStep {
step: step,
event_uuid: event.uuid,
timestamp: event.timestamp,
excluded: if previous_step_excluded { Exclusion::Full } else { Exclusion::Not },
};
}
}
}
}
}
true
})
}
}
// If a strict funnel, clear all of the steps that we didn't match to
// If we are processing multiple events, skip this step, because ordering makes it complicated
if args.funnel_order_type == "strict" {
for interval_data in vars.interval_start_to_entered_timestamps.values_mut() {
for i in 1..interval_data.entered_timestamp.len() {
Expand All @@ -190,6 +241,5 @@ impl AggregateFunnelRow {
}
}
}
true
}
}
Loading
Loading