diff --git a/posthog/hogql_queries/insights/trends/breakdown_values.py b/posthog/hogql_queries/insights/trends/breakdown_values.py new file mode 100644 index 00000000000000..4ed5bfc742803b --- /dev/null +++ b/posthog/hogql_queries/insights/trends/breakdown_values.py @@ -0,0 +1,81 @@ +from typing import List +from posthog.hogql import ast +from posthog.hogql.parser import parse_expr, parse_select +from posthog.hogql.query import execute_hogql_query +from posthog.hogql_queries.utils.query_date_range import QueryDateRange +from posthog.models.team.team import Team + + +class BreakdownValues: + team: Team + event_name: str + breakdown_field: str + query_date_range: QueryDateRange + + def __init__(self, team: Team, event_name: str, breakdown_field: str, query_date_range: QueryDateRange): + self.team = team + self.event_name = event_name + self.breakdown_field = breakdown_field + self.query_date_range = query_date_range + + def get_breakdown_values(self) -> List[str]: + select_field = ast.Alias(alias="value", expr=ast.Field(chain=["properties", self.breakdown_field])) + + query = parse_select( + """ + SELECT groupArray(value) FROM ( + SELECT + {select_field}, + count(*) as count + FROM + events e + WHERE + {events_where} + GROUP BY + value + ORDER BY + count DESC, + value DESC + LIMIT 25 + OFFSET 0 + ) + """, + placeholders={ + "events_where": self._where_filter(), + "team_id": ast.Constant(value=self.team.pk), + "select_field": select_field, + }, + ) + + response = execute_hogql_query( + query_type="TrendsQueryBreakdownValues", + query=query, + team=self.team, + ) + + values = response.results[0][0] + + return values + + def _where_filter(self) -> ast.Expr: + filters: List[ast.Expr] = [] + + filters.append(parse_expr("team_id = {team_id}", placeholders={"team_id": ast.Constant(value=self.team.pk)})) + filters.append(parse_expr("notEmpty(e.person_id)")) + filters.extend( + [ + parse_expr( + "toTimeZone(timestamp, 'UTC') >= {date_from}", + placeholders=self.query_date_range.to_placeholders(), + ), + parse_expr( + "toTimeZone(timestamp, 'UTC') <= {date_to}", + placeholders=self.query_date_range.to_placeholders(), + ), + ] + ) + + if self.event_name is not None: + filters.append(parse_expr("event = {event}", placeholders={"event": ast.Constant(value=self.event_name)})) + + return ast.And(exprs=filters) diff --git a/posthog/hogql_queries/insights/trends/query_builder.py b/posthog/hogql_queries/insights/trends/query_builder.py new file mode 100644 index 00000000000000..9009325035d235 --- /dev/null +++ b/posthog/hogql_queries/insights/trends/query_builder.py @@ -0,0 +1,253 @@ +from typing import List +from posthog.hogql import ast +from posthog.hogql.parser import parse_expr, parse_select +from posthog.hogql.property import property_to_expr +from posthog.hogql_queries.insights.trends.breakdown_values import BreakdownValues +from posthog.hogql_queries.utils.query_date_range import QueryDateRange +from posthog.models.filters.mixins.utils import cached_property +from posthog.models.team.team import Team +from posthog.schema import ActionsNode, EventsNode, TrendsQuery + + +class TrendsQueryBuilder: + query: TrendsQuery + team: Team + query_date_range: QueryDateRange + series: EventsNode | ActionsNode + + def __init__( + self, trends_query: TrendsQuery, team: Team, query_date_range: QueryDateRange, series: EventsNode | ActionsNode + ): + self.query = trends_query + self.team = team + self.query_date_range = query_date_range + self.series = series + + def build_query(self) -> ast.SelectUnionQuery: + date_subqueries = self._get_date_subqueries() + event_query = self._get_events_subquery() + + date_events_union = ast.SelectUnionQuery(select_queries=[*date_subqueries, event_query]) + + inner_select = self._inner_select_query(date_events_union) + full_query = self._outer_select_query(inner_select) + + return full_query + + def _get_date_subqueries(self) -> List[ast.SelectQuery]: + if not self._breakdown_enabled(): + return [ + parse_select( + """ + SELECT + 0 AS total, + dateTrunc({interval}, {date_to}) - {number_interval_period} AS day_start + FROM + numbers( + coalesce(dateDiff({interval}, {date_from}, {date_to}), 0) + ) + """, + placeholders={ + **self.query_date_range.to_placeholders(), + }, + ), + parse_select( + """ + SELECT + 0 AS total, + {date_from} AS day_start + """, + placeholders={ + **self.query_date_range.to_placeholders(), + }, + ), + ] + + return [ + parse_select( + """ + SELECT + 0 AS total, + ticks.day_start as day_start, + breakdown_value + FROM ( + SELECT + dateTrunc({interval}, {date_to}) - {number_interval_period} AS day_start + FROM + numbers( + coalesce(dateDiff({interval}, {date_from}, {date_to}), 0) + ) + UNION ALL + SELECT {date_from} AS day_start + ) as ticks + CROSS JOIN ( + SELECT breakdown_value + FROM ( + SELECT {breakdown_values} + ) + ARRAY JOIN breakdown_value as breakdown_value + ) as sec + ORDER BY breakdown_value, day_start + """, + placeholders={ + **self.query_date_range.to_placeholders(), + "breakdown_values": ast.Alias(alias="breakdown_value", expr=self._get_breakdown_values_ast), + }, + ) + ] + + def _get_events_subquery(self) -> ast.SelectQuery: + query = parse_select( + """ + SELECT + {aggregation_operation} AS total, + dateTrunc({interval}, toTimeZone(toDateTime(timestamp), 'UTC')) AS day_start + FROM events AS e + %s + WHERE {events_filter} + GROUP BY day_start + """ + % (self._sample_value()), + placeholders={ + **self.query_date_range.to_placeholders(), + "events_filter": self._events_filter(), + "aggregation_operation": self._aggregation_operation(), + }, + ) + + if self._breakdown_enabled(): + query.select.append( + ast.Alias(alias="breakdown_value", expr=ast.Field(chain=["properties", self.query.breakdown.breakdown])) + ) + query.group_by.append(ast.Field(chain=["breakdown_value"])) + + return query + + def _outer_select_query(self, inner_query: ast.SelectQuery) -> ast.SelectQuery: + query = parse_select( + """ + SELECT + groupArray(day_start) AS date, + groupArray(count) AS total + FROM {inner_query} + """, + placeholders={"inner_query": inner_query}, + ) + + if self._breakdown_enabled(): + query.select.append(ast.Field(chain=["breakdown_value"])) + query.group_by = [ast.Field(chain=["breakdown_value"])] + query.order_by = [ast.OrderExpr(expr=ast.Field(chain=["breakdown_value"]), order="ASC")] + + return query + + def _inner_select_query(self, inner_query: ast.SelectUnionQuery) -> ast.SelectQuery: + query = parse_select( + """ + SELECT + sum(total) AS count, + day_start + FROM {inner_query} + GROUP BY day_start + ORDER BY day_start ASC + """, + placeholders={"inner_query": inner_query}, + ) + + if self._breakdown_enabled(): + query.select.append(ast.Field(chain=["breakdown_value"])) + query.group_by.append(ast.Field(chain=["breakdown_value"])) + query.order_by.append(ast.OrderExpr(expr=ast.Field(chain=["breakdown_value"]), order="ASC")) + + return query + + def _events_filter(self) -> ast.Expr: + series = self.series + filters: List[ast.Expr] = [] + + # Team ID + filters.append(parse_expr("team_id = {team_id}", placeholders={"team_id": ast.Constant(value=self.team.pk)})) + + # Dates + filters.extend( + [ + parse_expr( + "(toTimeZone(timestamp, 'UTC') >= {date_from})", + placeholders=self.query_date_range.to_placeholders(), + ), + parse_expr( + "(toTimeZone(timestamp, 'UTC') <= {date_to})", + placeholders=self.query_date_range.to_placeholders(), + ), + ] + ) + + # Series + if self._series_event_name() is not None: + filters.append( + parse_expr("event = {event}", placeholders={"event": ast.Constant(value=self._series_event_name())}) + ) + + # Filter Test Accounts + if ( + self.query.filterTestAccounts + and isinstance(self.team.test_account_filters, list) + and len(self.team.test_account_filters) > 0 + ): + for property in self.team.test_account_filters: + filters.append(property_to_expr(property, self.team)) + + # Properties + if self.query.properties is not None and self.query.properties != []: + filters.append(property_to_expr(self.query.properties, self.team)) + + # Series Filters + if series.properties is not None and series.properties != []: + filters.append(property_to_expr(series.properties, self.team)) + + # Breakdown + if self._breakdown_enabled(): + filters.append( + ast.CompareOperation( + left=ast.Field(chain=["properties", self.query.breakdown.breakdown]), + op=ast.CompareOperationOp.In, + right=self._get_breakdown_values_ast, + ) + ) + + if len(filters) == 0: + return ast.Constant(value=True) + elif len(filters) == 1: + return filters[0] + else: + return ast.And(exprs=filters) + + def _aggregation_operation(self) -> ast.Expr: + if self.series.math == "hogql": + return parse_expr(self.series.math_hogql) + + return parse_expr("count(*)") + + # Using string interpolation for SAMPLE due to HogQL limitations with `UNION ALL` and `SAMPLE` AST nodes + def _sample_value(self) -> str: + if self.query.samplingFactor is None: + return "" + + return f"SAMPLE {self.query.samplingFactor}" + + def _series_event_name(self) -> str | None: + if isinstance(self.series, EventsNode): + return self.series.event + return None + + def _breakdown_enabled(self): + return self.query.breakdown is not None and self.query.breakdown.breakdown is not None + + @cached_property + def _get_breakdown_values_ast(self) -> ast.Array: + breakdown = BreakdownValues( + self.team, self._series_event_name(), self.query.breakdown.breakdown, self.query_date_range + ) + breakdown_values = breakdown.get_breakdown_values() + + return ast.Array(exprs=list(map(lambda v: ast.Constant(value=v), breakdown_values))) diff --git a/posthog/hogql_queries/insights/trends/series_with_extras.py b/posthog/hogql_queries/insights/trends/series_with_extras.py new file mode 100644 index 00000000000000..e95035fa907f0d --- /dev/null +++ b/posthog/hogql_queries/insights/trends/series_with_extras.py @@ -0,0 +1,11 @@ +from typing import Optional +from posthog.schema import ActionsNode, EventsNode + + +class SeriesWithExtras: + series: EventsNode | ActionsNode + is_previous_period_series: Optional[bool] + + def __init__(self, series: EventsNode | ActionsNode, is_previous_period_series: Optional[bool]): + self.series = series + self.is_previous_period_series = is_previous_period_series diff --git a/posthog/hogql_queries/insights/trends_query_runner.py b/posthog/hogql_queries/insights/trends/trends_query_runner.py similarity index 53% rename from posthog/hogql_queries/insights/trends_query_runner.py rename to posthog/hogql_queries/insights/trends/trends_query_runner.py index 219c57e617a6ff..c2c9d8d3b92548 100644 --- a/posthog/hogql_queries/insights/trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/trends_query_runner.py @@ -9,10 +9,10 @@ from posthog.caching.utils import is_stale from posthog.hogql import ast -from posthog.hogql.parser import parse_expr, parse_select -from posthog.hogql.property import property_to_expr from posthog.hogql.query import execute_hogql_query from posthog.hogql.timings import HogQLTimings +from posthog.hogql_queries.insights.trends.query_builder import TrendsQueryBuilder +from posthog.hogql_queries.insights.trends.series_with_extras import SeriesWithExtras from posthog.hogql_queries.query_runner import QueryRunner from posthog.hogql_queries.utils.formula_ast import FormulaAST from posthog.hogql_queries.utils.query_date_range import QueryDateRange @@ -22,15 +22,6 @@ from posthog.schema import ActionsNode, EventsNode, HogQLQueryResponse, TrendsQuery, TrendsQueryResponse -class SeriesWithExtras: - series: EventsNode | ActionsNode - is_previous_period_series: Optional[bool] - - def __init__(self, series: EventsNode | ActionsNode, is_previous_period_series: Optional[bool]): - self.series = series - self.is_previous_period_series = is_previous_period_series - - class TrendsQueryRunner(QueryRunner): query: TrendsQuery query_type = TrendsQuery @@ -40,63 +31,6 @@ def __init__(self, query: TrendsQuery | Dict[str, Any], team: Team, timings: Opt super().__init__(query, team, timings) self.series = self.setup_series() - def to_query(self) -> List[ast.SelectQuery]: - queries = [] - with self.timings.measure("trends_query"): - for series in self.series: - if not series.is_previous_period_series: - date_placeholders = self.query_date_range.to_placeholders() - else: - date_placeholders = self.query_previous_date_range.to_placeholders() - - queries.append( - parse_select( - """ - SELECT - groupArray(day_start) AS date, - groupArray(count) AS total - FROM - ( - SELECT - sum(total) AS count, - day_start - FROM - ( - SELECT - 0 AS total, - dateTrunc({interval}, {date_to}) - {number_interval_period} AS day_start - FROM - numbers( - coalesce(dateDiff({interval}, {date_from}, {date_to}), 0) - ) - UNION ALL - SELECT - 0 AS total, - {date_from} - UNION ALL - SELECT - {aggregation_operation} AS total, - dateTrunc({interval}, toTimeZone(toDateTime(timestamp), 'UTC')) AS date - FROM events AS e - %s - WHERE {events_filter} - GROUP BY date - ) - GROUP BY day_start - ORDER BY day_start ASC - ) - """ - % (self.sample_value()), - placeholders={ - **date_placeholders, - "events_filter": self.events_filter(series), - "aggregation_operation": self.aggregation_operation(series.series), - }, - timings=self.timings, - ) - ) - return queries - def _is_stale(self, cached_result_package): date_to = self.query_date_range.date_to() interval = self.query_date_range.interval_name @@ -119,9 +53,19 @@ def _refresh_frequency(self): return refresh_frequency - def to_persons_query(self) -> str: - # TODO: add support for selecting and filtering by breakdowns - raise NotImplementedError() + def to_query(self) -> List[ast.SelectQuery]: + queries = [] + with self.timings.measure("trends_query"): + for series in self.series: + if not series.is_previous_period_series: + query_date_range = self.query_date_range + else: + query_date_range = self.query_previous_date_range + + query_builder = TrendsQueryBuilder(self.query, self.team, query_date_range, series.series) + queries.append(query_builder.build_query()) + + return queries def calculate(self): queries = self.to_query() @@ -156,10 +100,22 @@ def build_series_response(self, response: HogQLQueryResponse, series: SeriesWith for val in response.results: series_object = { "data": val[1], - "labels": [item.strftime("%-d-%b-%Y") for item in val[0]], # Add back in hour formatting - "days": [item.strftime("%Y-%m-%d") for item in val[0]], # Add back in hour formatting + "labels": [item.strftime("%-d-%b-%Y") for item in val[0]], # TODO: Add back in hour formatting + "days": [item.strftime("%Y-%m-%d") for item in val[0]], # TODO: Add back in hour formatting "count": float(sum(val[1])), "label": "All events" if self.series_event(series.series) is None else self.series_event(series.series), + "action": { # TODO: Populate missing props in `action` + "id": self.series_event(series.series), + "type": "events", + "order": 0, + "name": self.series_event(series.series) or "All events", + "custom_name": None, + "math": series.series.math, + "math_property": None, + "math_hogql": None, + "math_group_type_index": None, + "properties": {}, + }, } # Modifications for when comparing to previous period @@ -173,6 +129,11 @@ def build_series_response(self, response: HogQLQueryResponse, series: SeriesWith series_object["compare_label"] = "previous" if series.is_previous_period_series else "current" series_object["labels"] = labels + # Modifications for when breakdowns are active + if self.query.breakdown is not None and self.query.breakdown.breakdown is not None: + series_object["breakdown_value"] = val[2] + series_object["label"] = "{} - {}".format(series_object["label"], val[2]) + res.append(series_object) return res @@ -188,85 +149,6 @@ def query_previous_date_range(self): date_range=self.query.dateRange, team=self.team, interval=self.query.interval, now=datetime.now() ) - def aggregation_operation(self, series: EventsNode | ActionsNode) -> ast.Expr: - if series.math == "hogql": - return parse_expr(series.math_hogql) - - return parse_expr("count(*)") - - def events_filter(self, series_with_extra: SeriesWithExtras) -> ast.Expr: - series = series_with_extra.series - filters: List[ast.Expr] = [] - - # Team ID - filters.append(parse_expr("team_id = {team_id}", placeholders={"team_id": ast.Constant(value=self.team.pk)})) - - if not series_with_extra.is_previous_period_series: - # Dates (current period) - filters.extend( - [ - parse_expr( - "(toTimeZone(timestamp, 'UTC') >= {date_from})", - placeholders=self.query_date_range.to_placeholders(), - ), - parse_expr( - "(toTimeZone(timestamp, 'UTC') <= {date_to})", - placeholders=self.query_date_range.to_placeholders(), - ), - ] - ) - else: - # Date (previous period) - filters.extend( - [ - parse_expr( - "(toTimeZone(timestamp, 'UTC') >= {date_from})", - placeholders=self.query_previous_date_range.to_placeholders(), - ), - parse_expr( - "(toTimeZone(timestamp, 'UTC') <= {date_to})", - placeholders=self.query_previous_date_range.to_placeholders(), - ), - ] - ) - - # Series - if self.series_event(series) is not None: - filters.append( - parse_expr("event = {event}", placeholders={"event": ast.Constant(value=self.series_event(series))}) - ) - - # Filter Test Accounts - if ( - self.query.filterTestAccounts - and isinstance(self.team.test_account_filters, list) - and len(self.team.test_account_filters) > 0 - ): - for property in self.team.test_account_filters: - filters.append(property_to_expr(property, self.team)) - - # Properties - if self.query.properties is not None and self.query.properties != []: - filters.append(property_to_expr(self.query.properties, self.team)) - - # Series Filters - if series.properties is not None and series.properties != []: - filters.append(property_to_expr(series.properties, self.team)) - - if len(filters) == 0: - return ast.Constant(value=True) - elif len(filters) == 1: - return filters[0] - else: - return ast.And(exprs=filters) - - # Using string interpolation for SAMPLE due to HogQL limitations with `UNION ALL` and `SAMPLE` AST nodes - def sample_value(self) -> str: - if self.query.samplingFactor is None: - return "" - - return f"SAMPLE {self.query.samplingFactor}" - def series_event(self, series: EventsNode | ActionsNode) -> str | None: if isinstance(series, EventsNode): return series.event diff --git a/posthog/hogql_queries/query_runner.py b/posthog/hogql_queries/query_runner.py index 3f42fb2f734bde..a8c4c69878ebe0 100644 --- a/posthog/hogql_queries/query_runner.py +++ b/posthog/hogql_queries/query_runner.py @@ -87,7 +87,7 @@ def get_query_runner( return LifecycleQueryRunner(query=cast(LifecycleQuery | Dict[str, Any], query), team=team, timings=timings) if kind == "TrendsQuery": - from .insights.trends_query_runner import TrendsQueryRunner + from .insights.trends.trends_query_runner import TrendsQueryRunner return TrendsQueryRunner(query=cast(TrendsQuery | Dict[str, Any], query), team=team, timings=timings) if kind == "PersonsQuery":