Skip to content

Commit

Permalink
feat: enable trend actors in udfs (and enable matched recordings for …
Browse files Browse the repository at this point in the history
…trends) (#25641)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
aspicer and github-actions[bot] authored Oct 17, 2024
1 parent a9b5a60 commit e468f3b
Show file tree
Hide file tree
Showing 35 changed files with 1,105 additions and 1,450 deletions.
14 changes: 7 additions & 7 deletions docker/clickhouse/user_defined_function.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<function>
<type>executable_pool</type>
<name>aggregate_funnel_trends</name>
<return_type>Array(Tuple(UInt64, Int8, Nullable(String)))</return_type>
<return_type>Array(Tuple(UInt64, Int8, Nullable(String), UUID))</return_type>
<return_name>result</return_name>
<argument>
<type>UInt8</type>
Expand Down Expand Up @@ -169,7 +169,7 @@
<name>prop_vals</name>
</argument>
<argument>
<type>Array(Tuple(Nullable(Float64), UInt64, Nullable(String), Array(Int8)))</type>
<type>Array(Tuple(Nullable(Float64), UInt64, UUID, Nullable(String), Array(Int8)))</type>
<name>value</name>
</argument>
<format>JSONEachRow</format>
Expand All @@ -181,7 +181,7 @@
<type>executable_pool</type>
<name>aggregate_funnel_array_trends</name>
<!-- Return type for trends is a start interval time, a success flag (1 or -1), and a breakdown value -->
<return_type>Array(Tuple(UInt64, Int8, Array(String)))</return_type>
<return_type>Array(Tuple(UInt64, Int8, Array(String), UUID))</return_type>
<return_name>result</return_name>
<argument>
<type>UInt8</type>
Expand All @@ -208,7 +208,7 @@
<name>prop_vals</name>
</argument>
<argument>
<type>Array(Tuple(Nullable(Float64), UInt64, Array(String), Array(Int8)))</type>
<type>Array(Tuple(Nullable(Float64), UInt64, UUID, Array(String), Array(Int8)))</type>
<name>value</name>
</argument>
<format>JSONEachRow</format>
Expand All @@ -220,7 +220,7 @@
<type>executable_pool</type>
<name>aggregate_funnel_cohort_trends</name>
<!-- Return type for trends is a start interval time, a success flag (1 or -1), and a breakdown value -->
<return_type>Array(Tuple(UInt64, Int8, UInt64))</return_type>
<return_type>Array(Tuple(UInt64, Int8, UInt64, UUID))</return_type>
<return_name>result</return_name>
<argument>
<type>UInt8</type>
Expand All @@ -247,7 +247,7 @@
<name>prop_vals</name>
</argument>
<argument>
<type>Array(Tuple(Nullable(Float64), UInt64, UInt64, Array(Int8)))</type>
<type>Array(Tuple(Nullable(Float64), UInt64, UUID, UInt64, Array(Int8)))</type>
<name>value</name>
</argument>
<format>JSONEachRow</format>
Expand Down Expand Up @@ -285,7 +285,7 @@
<name>prop_vals</name>
</argument>
<argument>
<type>Array(Tuple(Nullable(Float64), UInt64, Array(String), Array(Int8)))</type>
<type>Array(Tuple(Nullable(Float64), UInt64, UUID, Array(String), Array(Int8)))</type>
<name>value</name>
</argument>
<format>JSONEachRow</format>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/scenes/funnels/FunnelLineGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export function FunnelLineGraph({
kind: NodeKind.FunnelsActorsQuery,
source: querySource,
funnelTrendsDropOff: false,
includeRecordings: true,
funnelTrendsEntrancePeriodStart: dayjs(day).format('YYYY-MM-DD HH:mm:ss'),
}
openPersonsModal({
Expand Down
60 changes: 31 additions & 29 deletions funnel-udf/src/trends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::str::FromStr;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use uuid::Uuid;
use crate::PropVal;

fn deserialize_number_from_string<'de, D>(deserializer: D) -> Result<u64, D::Error>
Expand All @@ -15,15 +16,15 @@ where

#[derive(Clone, Deserialize)]
struct EnteredTimestamp {
timestamp: f64,
timings: Vec<f64>,
timestamp: f64
}

#[derive(Clone, Deserialize)]
struct Event {
timestamp: f64,
#[serde(deserialize_with = "deserialize_number_from_string")]
interval_start: u64,
uuid: Uuid,
breakdown: PropVal,
steps: Vec<i8>,
}
Expand All @@ -40,10 +41,16 @@ struct Args {
}

#[derive(Serialize)]
struct ResultStruct(u64, i8, PropVal);
struct ResultStruct(u64, i8, PropVal, Uuid);

struct IntervalData {
max_step: usize,
max_step_event_uuid: Uuid,
entered_timestamp: Vec<EnteredTimestamp>,
}

struct Vars {
interval_start_to_entered_timestamps: HashMap<u64, Vec<EnteredTimestamp>>,
interval_start_to_entered_timestamps: HashMap<u64, IntervalData>,
}

struct AggregateFunnelRow {
Expand All @@ -53,7 +60,6 @@ struct AggregateFunnelRow {

const DEFAULT_ENTERED_TIMESTAMP: EnteredTimestamp = EnteredTimestamp {
timestamp: 0.0,
timings: vec![],
};

pub fn process_line(line: &str) -> Value {
Expand Down Expand Up @@ -114,9 +120,9 @@ impl AggregateFunnelRow {


// At this point, everything left in entered_timestamps is a failure, if it has made it to from_step
for entered_timestamp in vars.interval_start_to_entered_timestamps.values() {
if !self.results.contains_key(&(entered_timestamp[0].timestamp as u64)) && entered_timestamp[0].timings.len() > 0 {
self.results.insert(entered_timestamp[0].timestamp as u64, ResultStruct(entered_timestamp[0].timestamp as u64, -1, prop_val.clone() ));
for interval_data in vars.interval_start_to_entered_timestamps.values() {
if !self.results.contains_key(&(interval_data.entered_timestamp[0].timestamp as u64)) && interval_data.max_step >= args.from_step + 1 {
self.results.insert(interval_data.entered_timestamp[0].timestamp as u64, ResultStruct(interval_data.entered_timestamp[0].timestamp as u64, -1, prop_val.clone(), interval_data.max_step_event_uuid));
}
}
}
Expand All @@ -141,36 +147,32 @@ impl AggregateFunnelRow {
if step == 1 {
if !vars.interval_start_to_entered_timestamps.contains_key(&event.interval_start) && !self.results.contains_key(&event.interval_start) {
let mut entered_timestamp = vec![DEFAULT_ENTERED_TIMESTAMP.clone(); args.num_steps + 1];
entered_timestamp[0] = EnteredTimestamp { timestamp: event.interval_start as f64, timings: if args.from_step == 0 {vec![1.0]} else {vec![]} };
entered_timestamp[1] = EnteredTimestamp { timestamp: event.timestamp, timings: vec![event.timestamp] };
vars.interval_start_to_entered_timestamps.insert(event.interval_start, entered_timestamp);
entered_timestamp[0] = EnteredTimestamp { timestamp: event.interval_start as f64 };
entered_timestamp[1] = EnteredTimestamp { timestamp: event.timestamp };
vars.interval_start_to_entered_timestamps.insert(event.interval_start, IntervalData { max_step: 1, max_step_event_uuid: event.uuid, entered_timestamp: entered_timestamp });
}
} else {
for entered_timestamp in vars.interval_start_to_entered_timestamps.values_mut() {
let in_match_window = (event.timestamp - entered_timestamp[step - 1].timestamp) <= args.conversion_window_limit as f64;
let already_reached_this_step = entered_timestamp[step].timestamp == entered_timestamp[step - 1].timestamp;
for interval_data in vars.interval_start_to_entered_timestamps.values_mut() {
let in_match_window = (event.timestamp - interval_data.entered_timestamp[step - 1].timestamp) <= args.conversion_window_limit as f64;
let already_reached_this_step = interval_data.entered_timestamp[step].timestamp == interval_data.entered_timestamp[step - 1].timestamp;
if in_match_window && !already_reached_this_step {
if exclusion {
return false;
}
let is_unmatched_step_attribution = self.breakdown_step.map(|breakdown_step| step == breakdown_step - 1).unwrap_or(false) && *prop_val != event.breakdown;
if !is_unmatched_step_attribution {
entered_timestamp[step] = EnteredTimestamp {
timestamp: entered_timestamp[step - 1].timestamp,
timings: {
let mut timings = entered_timestamp[step - 1].timings.clone();
timings.push(event.timestamp);
timings
},
interval_data.entered_timestamp[step] = EnteredTimestamp {
timestamp: interval_data.entered_timestamp[step - 1].timestamp
};
// check if we have hit the goal. if we have, remove it from the list and add it to the successful_timestamps
if entered_timestamp[args.num_steps].timestamp != 0.0 {
if interval_data.entered_timestamp[args.num_steps].timestamp != 0.0 {
self.results.insert(
entered_timestamp[0].timestamp as u64,
ResultStruct(entered_timestamp[0].timestamp as u64, 1, prop_val.clone())
interval_data.entered_timestamp[0].timestamp as u64,
ResultStruct(interval_data.entered_timestamp[0].timestamp as u64, 1, prop_val.clone(), event.uuid)
);
} else if step == args.from_step + 1 {
entered_timestamp[0].timings.push(1.0)
} else if step > interval_data.max_step {
interval_data.max_step = step;
interval_data.max_step_event_uuid = event.uuid;
}
}
}
Expand All @@ -180,10 +182,10 @@ impl AggregateFunnelRow {
// If a strict funnel, clear all of the steps that we didn't match to
// If we are processing multiple events, skip this step, because ordering makes it complicated
if args.funnel_order_type == "strict" {
for entered_timestamp in vars.interval_start_to_entered_timestamps.values_mut() {
for i in 1..entered_timestamp.len() {
for interval_data in vars.interval_start_to_entered_timestamps.values_mut() {
for i in 1..interval_data.entered_timestamp.len() {
if !event.steps.contains(&(i as i8)) {
entered_timestamp[i] = DEFAULT_ENTERED_TIMESTAMP;
interval_data.entered_timestamp[i] = DEFAULT_ENTERED_TIMESTAMP;
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions posthog/hogql_queries/insights/funnels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from .funnel_unordered import FunnelUnordered
from .funnel_time_to_convert import FunnelTimeToConvert
from .funnel_trends import FunnelTrends
from .funnel_trends_udf import FunnelTrendsUDF
from .funnel_persons import FunnelActors
from .funnel_strict_persons import FunnelStrictActors
from .funnel_unordered_persons import FunnelUnorderedActors
from .funnel_trends_persons import FunnelTrendsActors
from .funnel_strict_actors import FunnelStrictActors
from .funnel_unordered_actors import FunnelUnorderedActors
from .funnel_trends_actors import FunnelTrendsActors
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from posthog.hogql_queries.insights.funnels import FunnelUDF
from posthog.hogql_queries.insights.funnels.funnel_event_query import FunnelEventQuery
from posthog.hogql_queries.insights.funnels.funnel_persons import FunnelActors
from posthog.hogql_queries.insights.funnels.funnel_strict_persons import FunnelStrictActors
from posthog.hogql_queries.insights.funnels.funnel_unordered_persons import FunnelUnorderedActors
from posthog.hogql_queries.insights.funnels.funnel_strict_actors import FunnelStrictActors
from posthog.hogql_queries.insights.funnels.funnel_unordered_actors import FunnelUnorderedActors
from posthog.models.action.action import Action
from posthog.models.element.element import chain_to_elements
from posthog.models.event.util import ElementSerializer
Expand Down
101 changes: 95 additions & 6 deletions posthog/hogql_queries/insights/funnels/funnel_trends_udf.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
from typing import cast
from typing import cast, Optional

from rest_framework.exceptions import ValidationError

from posthog.hogql import ast
from posthog.hogql.constants import HogQLQuerySettings
from posthog.hogql.parser import parse_select
from posthog.hogql.parser import parse_select, parse_expr
from posthog.hogql_queries.insights.funnels import FunnelTrends
from posthog.hogql_queries.insights.funnels.funnel_udf import udf_event_array_filter
from posthog.hogql_queries.insights.utils.utils import get_start_of_interval_hogql_str
from posthog.schema import BreakdownType, BreakdownAttributionType
from posthog.utils import DATERANGE_MAP
from posthog.utils import DATERANGE_MAP, relative_date_parse

TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
HUMAN_READABLE_TIMESTAMP_FORMAT = "%-d-%b-%Y"


class FunnelTrendsUDF(FunnelTrends):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# In base, these fields only get added if you're running an actors query
if "uuid" not in self._extra_event_fields:
self._extra_event_fields.append("uuid")

def get_step_counts_query(self):
max_steps = self.context.max_steps
return self._get_step_counts_query(
Expand All @@ -30,7 +38,18 @@ def conversion_window_limit(self) -> int:
self.context.funnelWindowInterval * DATERANGE_MAP[self.context.funnelWindowIntervalUnit].total_seconds()
)

def get_query(self) -> ast.SelectQuery:
def matched_event_select(self):
if self._include_matched_events():
return """
groupArray(tuple(timestamp, uuid, $session_id, $window_id)) as user_events,
mapFromArrays(arrayMap(x -> x.2, user_events), user_events) as user_events_map,
[user_events_map[af_tuple.4]] as matching_events,
"""
return ""

# This is the function that calls the UDF
# This is used by both the query itself and the actors query
def _inner_aggregation_query(self):
# If they're asking for a "to_step" just truncate the funnel
funnelsFilter = self.context.funnelsFilter
max_steps = self.context.max_steps if funnelsFilter.funnelToStep is None else funnelsFilter.funnelToStep + 1
Expand Down Expand Up @@ -79,7 +98,12 @@ def get_query(self) -> ast.SelectQuery:
parse_select(
f"""
SELECT
arraySort(t -> t.1, groupArray(tuple(toFloat(timestamp), _toUInt64(toDateTime({get_start_of_interval_hogql_str(self.context.interval.value, team=self.context.team, source='timestamp')})), {prop_selector}, arrayFilter((x) -> x != 0, [{steps}{exclusions}])))) as events_array,
arraySort(t -> t.1, groupArray(tuple(
toFloat(timestamp),
_toUInt64(toDateTime({get_start_of_interval_hogql_str(self.context.interval.value, team=self.context.team, source='timestamp')})),
uuid,
{prop_selector},
arrayFilter((x) -> x != 0, [{steps}{exclusions}])))) as events_array,
arrayJoin({fn}(
{from_step},
{max_steps},
Expand All @@ -91,7 +115,9 @@ def get_query(self) -> ast.SelectQuery:
)) as af_tuple,
toTimeZone(toDateTime(_toUInt64(af_tuple.1)), '{self.context.team.timezone}') as entrance_period_start,
af_tuple.2 as success_bool,
af_tuple.3 as breakdown
af_tuple.3 as breakdown,
{self.matched_event_select()}
aggregation_target as aggregation_target
FROM {{inner_event_query}}
GROUP BY aggregation_target{breakdown_prop}
""",
Expand All @@ -100,6 +126,10 @@ def get_query(self) -> ast.SelectQuery:
)
# This is necessary so clickhouse doesn't truncate timezone information when passing datetimes to and from python
inner_select.settings = HogQLQuerySettings(date_time_output_format="iso", date_time_input_format="best_effort")
return inner_select

def get_query(self) -> ast.SelectQuery:
inner_select = self._inner_aggregation_query()

conversion_rate_expr = (
"if(reached_from_step_count > 0, round(reached_to_step_count / reached_from_step_count * 100, 2), 0)"
Expand Down Expand Up @@ -163,3 +193,62 @@ def get_query(self) -> ast.SelectQuery:
{"fill_query": fill_query, "inner_select": inner_select},
)
return cast(ast.SelectQuery, s)

def _matching_events(self):
if (
hasattr(self.context, "actorsQuery")
and self.context.actorsQuery is not None
and self.context.actorsQuery.includeRecordings
):
return [ast.Alias(alias="matching_events", expr=ast.Field(chain=["matching_events"]))]
return []

def actor_query(
self,
extra_fields: Optional[list[str]] = None,
) -> ast.SelectQuery:
team, actorsQuery = self.context.team, self.context.actorsQuery

if actorsQuery is None:
raise ValidationError("No actors query present.")

# At this time, we do not support self.dropOff (we don't use it anywhere in the frontend)
if actorsQuery.funnelTrendsDropOff is None:
raise ValidationError(f"Actors parameter `funnelTrendsDropOff` must be provided for funnel trends persons!")

if actorsQuery.funnelTrendsEntrancePeriodStart is None:
raise ValidationError(
f"Actors parameter `funnelTrendsEntrancePeriodStart` must be provided funnel trends persons!"
)

entrancePeriodStart = relative_date_parse(actorsQuery.funnelTrendsEntrancePeriodStart, team.timezone_info)
if entrancePeriodStart is None:
raise ValidationError(
f"Actors parameter `funnelTrendsEntrancePeriodStart` must be a valid relative date string!"
)

select: list[ast.Expr] = [
ast.Alias(alias="actor_id", expr=ast.Field(chain=["aggregation_target"])),
*self._matching_events(),
*([ast.Field(chain=[field]) for field in extra_fields or []]),
]
select_from = ast.JoinExpr(table=self._inner_aggregation_query())

where = ast.And(
exprs=[
parse_expr("success_bool != 1") if actorsQuery.funnelTrendsDropOff else parse_expr("success_bool = 1"),
ast.CompareOperation(
op=ast.CompareOperationOp.Eq,
left=parse_expr("entrance_period_start"),
right=ast.Constant(value=entrancePeriodStart),
),
]
)
order_by = [ast.OrderExpr(expr=ast.Field(chain=["aggregation_target"]))]

return ast.SelectQuery(
select=select,
select_from=select_from,
order_by=order_by,
where=where,
)
Loading

0 comments on commit e468f3b

Please sign in to comment.