Skip to content

Commit

Permalink
feat: Added the first iteration of trend insghts breakdowns (#17891)
Browse files Browse the repository at this point in the history
* Added the first iteration of trend insghts breakdowns

* Removed team id from all events filters

* Added support for histogram breakdowns

* PR fixes

* Support group breakdowns

* Abstract property chain into a cached prop
  • Loading branch information
Gilbert09 authored Oct 13, 2023
1 parent 16a71f6 commit 1ecf289
Show file tree
Hide file tree
Showing 8 changed files with 586 additions and 154 deletions.
2 changes: 1 addition & 1 deletion posthog/hogql/functions/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ class HogQLFunctionMeta:
"medianBFloat16If": HogQLFunctionMeta("medianBFloat16If", 2, 2, aggregate=True),
"quantile": HogQLFunctionMeta("quantile", 1, 1, min_params=1, max_params=1, aggregate=True),
"quantileIf": HogQLFunctionMeta("quantileIf", 2, 2, min_params=1, max_params=1, aggregate=True),
"quantiles": HogQLFunctionMeta("quantiles", 1, 1, min_params=1, max_params=1, aggregate=True),
"quantiles": HogQLFunctionMeta("quantiles", 1, None, aggregate=True),
"quantilesIf": HogQLFunctionMeta("quantilesIf", 2, 2, min_params=1, max_params=1, aggregate=True),
# "quantileExact": HogQLFunctionMeta("quantileExact", 1, 1, aggregate=True),
# "quantileExactIf": HogQLFunctionMeta("quantileExactIf", 2, 2, aggregate=True),
Expand Down
135 changes: 135 additions & 0 deletions posthog/hogql_queries/insights/trends/breakdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from typing import Dict, List, Tuple
from posthog.hogql import ast
from posthog.hogql_queries.insights.trends.breakdown_values import BreakdownValues
from posthog.hogql_queries.insights.trends.utils import get_properties_chain, series_event_name
from posthog.hogql_queries.utils.query_date_range import QueryDateRange
from posthog.models.filters.mixins.utils import cached_property
from posthog.models.team.team import Team
from posthog.schema import ActionsNode, EventsNode, TrendsQuery


class Breakdown:
query: TrendsQuery
team: Team
series: EventsNode | ActionsNode
query_date_range: QueryDateRange

def __init__(
self, team: Team, query: TrendsQuery, series: EventsNode | ActionsNode, query_date_range: QueryDateRange
):
self.team = team
self.query = query
self.series = series
self.query_date_range = query_date_range

@cached_property
def enabled(self) -> bool:
return self.query.breakdown is not None and self.query.breakdown.breakdown is not None

@cached_property
def is_histogram_breakdown(self) -> bool:
return self.enabled and self.query.breakdown.breakdown_histogram_bin_count is not None

def placeholders(self) -> Dict[str, ast.Expr]:
values = self._breakdown_buckets_ast if self.is_histogram_breakdown else self._breakdown_values_ast

return {"cross_join_breakdown_values": ast.Alias(alias="breakdown_value", expr=values)}

def column_expr(self) -> ast.Expr:
if self.is_histogram_breakdown:
return ast.Alias(alias="breakdown_value", expr=self._get_breakdown_histogram_multi_if())

return ast.Alias(
alias="breakdown_value",
expr=ast.Field(chain=self._properties_chain),
)

def events_where_filter(self) -> ast.Expr:
return ast.CompareOperation(
left=ast.Field(chain=self._properties_chain),
op=ast.CompareOperationOp.In,
right=self._breakdown_values_ast,
)

@cached_property
def _breakdown_buckets_ast(self) -> ast.Array:
buckets = self._get_breakdown_histogram_buckets()
values = [f"[{t[0]},{t[1]}]" for t in buckets]
values.append('["",""]')

return ast.Array(exprs=list(map(lambda v: ast.Constant(value=v), values)))

@cached_property
def _breakdown_values_ast(self) -> ast.Array:
return ast.Array(exprs=[ast.Constant(value=v) for v in self._get_breakdown_values])

@cached_property
def _get_breakdown_values(self) -> ast.Array:
breakdown = BreakdownValues(
team=self.team,
event_name=series_event_name(self.series),
breakdown_field=self.query.breakdown.breakdown,
breakdown_type=self.query.breakdown.breakdown_type,
query_date_range=self.query_date_range,
histogram_bin_count=self.query.breakdown.breakdown_histogram_bin_count,
group_type_index=self.query.breakdown.breakdown_group_type_index,
)
return breakdown.get_breakdown_values()

def _get_breakdown_histogram_buckets(self) -> List[Tuple[float, float]]:
buckets = []
values = self._get_breakdown_values

if len(values) == 1:
values = [values[0], values[0]]

for i in range(len(values) - 1):
last_value = i == len(values) - 2

# Since we always `floor(x, 2)` the value, we add 0.01 to the last bucket
# to ensure it's always slightly greater than the maximum value
lower_bound = values[i]
upper_bound = values[i + 1] + 0.01 if last_value else values[i + 1]
buckets.append((lower_bound, upper_bound))

return buckets

def _get_breakdown_histogram_multi_if(self) -> ast.Expr:
multi_if_exprs: List[ast.Expr] = []

buckets = self._get_breakdown_histogram_buckets()

for lower_bound, upper_bound in buckets:

multi_if_exprs.extend(
[
ast.And(
exprs=[
ast.CompareOperation(
left=ast.Field(chain=self._properties_chain),
op=ast.CompareOperationOp.GtEq,
right=ast.Constant(value=lower_bound),
),
ast.CompareOperation(
left=ast.Field(chain=self._properties_chain),
op=ast.CompareOperationOp.Lt,
right=ast.Constant(value=upper_bound),
),
]
),
ast.Constant(value=f"[{lower_bound},{upper_bound}]"),
]
)

# `else` block of the multi-if
multi_if_exprs.append(ast.Constant(value='["",""]'))

return ast.Call(name="multiIf", args=multi_if_exprs)

@cached_property
def _properties_chain(self):
return get_properties_chain(
breakdown_type=self.query.breakdown.breakdown_type,
breakdown_field=self.query.breakdown.breakdown,
group_type_index=self.query.breakdown.breakdown_group_type_index,
)
120 changes: 120 additions & 0 deletions posthog/hogql_queries/insights/trends/breakdown_values.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from typing import List, Optional
from posthog.hogql import ast
from posthog.hogql.parser import parse_expr, parse_select
from posthog.hogql.query import execute_hogql_query
from posthog.hogql_queries.insights.trends.utils import get_properties_chain
from posthog.hogql_queries.utils.query_date_range import QueryDateRange
from posthog.models.team.team import Team


class BreakdownValues:
team: Team
event_name: str
breakdown_field: str
breakdown_type: str
query_date_range: QueryDateRange
histogram_bin_count: Optional[int]
group_type_index: Optional[int]

def __init__(
self,
team: Team,
event_name: str,
breakdown_field: str,
query_date_range: QueryDateRange,
breakdown_type: str,
histogram_bin_count: Optional[float] = None,
group_type_index: Optional[float] = None,
):
self.team = team
self.event_name = event_name
self.breakdown_field = breakdown_field
self.query_date_range = query_date_range
self.breakdown_type = breakdown_type
self.histogram_bin_count = int(histogram_bin_count) if histogram_bin_count is not None else None
self.group_type_index = int(group_type_index) if group_type_index is not None else None

def get_breakdown_values(self) -> List[str]:
select_field = ast.Alias(
alias="value",
expr=ast.Field(
chain=get_properties_chain(
breakdown_type=self.breakdown_type,
breakdown_field=self.breakdown_field,
group_type_index=self.group_type_index,
)
),
)

query = parse_select(
"""
SELECT groupArray(value) FROM (
SELECT
{select_field},
count(*) as count
FROM
events e
WHERE
{events_where}
GROUP BY
value
ORDER BY
count DESC,
value DESC
)
""",
placeholders={
"events_where": self._where_filter(),
"select_field": select_field,
},
)

if self.histogram_bin_count is not None:
expr = self._to_bucketing_expression()
query.select = [expr]

response = execute_hogql_query(
query_type="TrendsQueryBreakdownValues",
query=query,
team=self.team,
)

values = response.results[0][0]
return values

def _where_filter(self) -> ast.Expr:
filters: List[ast.Expr] = []

filters.append(parse_expr("notEmpty(e.person_id)"))
filters.extend(
[
parse_expr(
"timestamp >= {date_from}",
placeholders=self.query_date_range.to_placeholders(),
),
parse_expr(
"timestamp <= {date_to}",
placeholders=self.query_date_range.to_placeholders(),
),
]
)

if self.event_name is not None:
filters.append(parse_expr("event = {event}", placeholders={"event": ast.Constant(value=self.event_name)}))

return ast.And(exprs=filters)

def _to_bucketing_expression(self) -> ast.Expr:
assert isinstance(self.histogram_bin_count, int)

if self.histogram_bin_count <= 1:
qunatile_expression = "quantiles(0,1)(value)"
else:
quantiles = []
bin_size = 1.0 / self.histogram_bin_count
for i in range(self.histogram_bin_count + 1):
quantiles.append(i * bin_size)

qunatile_expression = f"quantiles({','.join([f'{quantile:.2f}' for quantile in quantiles])})(value)"

return parse_expr(f"arrayCompact(arrayMap(x -> floor(x, 2), {qunatile_expression}))")
Loading

0 comments on commit 1ecf289

Please sign in to comment.