Skip to content

Commit

Permalink
Added the persons query to trends query
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 committed Oct 30, 2023
1 parent 7a37a71 commit 3bf712d
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 46 deletions.
6 changes: 5 additions & 1 deletion posthog/hogql/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,11 @@ def visit_field_type(self, type: ast.FieldType):
return field_sql
field_sql = f"{self.visit(type.table_type)}.{field_sql}"

elif isinstance(type.table_type, ast.SelectQueryType) or isinstance(type.table_type, ast.SelectQueryAliasType):
elif (
isinstance(type.table_type, ast.SelectQueryType)
or isinstance(type.table_type, ast.SelectQueryAliasType)
or isinstance(type.table_type, ast.SelectUnionQueryType)
):
field_sql = self._print_identifier(type.name)
if isinstance(type.table_type, ast.SelectQueryAliasType):
field_sql = f"{self.visit(type.table_type)}.{field_sql}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from posthog.hogql.query import execute_hogql_query
from posthog.hogql.timings import HogQLTimings
from posthog.hogql_queries.insights.lifecycle_query_runner import LifecycleQueryRunner
from posthog.hogql_queries.insights.trends.trends_query_runner import TrendsQueryRunner
from posthog.hogql_queries.query_runner import QueryRunner, get_query_runner
from posthog.models import Team
from posthog.models.filters.mixins.utils import cached_property
Expand Down Expand Up @@ -32,16 +33,19 @@ def __init__(
def source_runner(self) -> QueryRunner:
return get_query_runner(self.query.source, self.team, self.timings, self.in_export_context)

def to_query(self) -> ast.SelectQuery:
def to_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
if isinstance(self.source_runner, LifecycleQueryRunner):
lifecycle_runner = cast(LifecycleQueryRunner, self.source_runner)
day = self.query.day
status = self.query.status
return lifecycle_runner.to_persons_query(day=day, status=status)
elif isinstance(self.source_runner, TrendsQueryRunner):
trends_runner = cast(TrendsQueryRunner, self.source_runner)
return trends_runner.to_persons_query()

raise ValueError(f"Cannot convert source query of type {self.query.source.kind} to persons query")

def to_persons_query(self) -> ast.SelectQuery:
def to_persons_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
return self.to_query()

def calculate(self) -> HogQLQueryResponse:
Expand Down
12 changes: 6 additions & 6 deletions posthog/hogql_queries/insights/trends/aggregation_operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List, Optional, cast
from posthog.hogql import ast
from posthog.hogql.parser import parse_expr, parse_select
from posthog.hogql_queries.utils.query_date_range import QueryDateRange
Expand All @@ -13,15 +13,15 @@ class QueryAlternator:
_group_bys: List[ast.Expr]
_select_from: ast.JoinExpr | None

def __init__(self, query: ast.SelectQuery | ast.SelectUnionQuery):
def __init__(self, query: ast.SelectQuery):
assert isinstance(query, ast.SelectQuery)

self._query = query
self._selects = []
self._group_bys = []
self._select_from = None

def build(self) -> ast.SelectQuery | ast.SelectUnionQuery:
def build(self) -> ast.SelectQuery:
if len(self._selects) > 0:
self._query.select.extend(self._selects)

Expand Down Expand Up @@ -280,9 +280,9 @@ def _events_query(
)

def get_query_orchestrator(self, events_where_clause: ast.Expr, sample_value: ast.RatioExpr):
events_query = self._events_query(events_where_clause, sample_value)
inner_select = self._inner_select_query(events_query)
parent_select = self._parent_select_query(inner_select)
events_query = cast(ast.SelectQuery, self._events_query(events_where_clause, sample_value))
inner_select = cast(ast.SelectQuery, self._inner_select_query(events_query))
parent_select = cast(ast.SelectQuery, self._parent_select_query(inner_select))

class QueryOrchestrator:
events_query_builder: QueryAlternator
Expand Down
98 changes: 63 additions & 35 deletions posthog/hogql_queries/insights/trends/query_builder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional, cast
from posthog.hogql import ast
from posthog.hogql.parser import parse_expr, parse_select
from posthog.hogql.property import property_to_expr
Expand Down Expand Up @@ -35,9 +35,9 @@ def __init__(
self.series = series
self.timings = timings

def build_query(self) -> ast.SelectUnionQuery:
def build_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
date_subqueries = self._get_date_subqueries()
event_query = self._get_events_subquery()
event_query = self._get_events_subquery(False)

date_events_union = ast.SelectUnionQuery(select_queries=[*date_subqueries, event_query])

Expand All @@ -46,11 +46,21 @@ def build_query(self) -> ast.SelectUnionQuery:

return full_query

def build_persons_query(self) -> ast.SelectQuery:
event_query = self._get_events_subquery(True)

event_query.select = [ast.Alias(alias="person_id", expr=ast.Field(chain=["e", "person_id"]))]
event_query.group_by = None

return event_query

def _get_date_subqueries(self) -> List[ast.SelectQuery]:
if not self._breakdown.enabled:
return [
parse_select(
"""
cast(
ast.SelectQuery,
parse_select(
"""
SELECT
0 AS total,
dateTrunc({interval}, {date_to}) - {number_interval_period} AS day_start
Expand All @@ -59,25 +69,31 @@ def _get_date_subqueries(self) -> List[ast.SelectQuery]:
coalesce(dateDiff({interval}, {date_from}, {date_to}), 0)
)
""",
placeholders={
**self.query_date_range.to_placeholders(),
},
placeholders={
**self.query_date_range.to_placeholders(),
},
),
),
parse_select(
"""
cast(
ast.SelectQuery,
parse_select(
"""
SELECT
0 AS total,
{date_from} AS day_start
""",
placeholders={
**self.query_date_range.to_placeholders(),
},
placeholders={
**self.query_date_range.to_placeholders(),
},
),
),
]

return [
parse_select(
"""
cast(
ast.SelectQuery,
parse_select(
"""
SELECT
0 AS total,
ticks.day_start as day_start,
Expand All @@ -101,16 +117,19 @@ def _get_date_subqueries(self) -> List[ast.SelectQuery]:
) as sec
ORDER BY breakdown_value, day_start
""",
placeholders={
**self.query_date_range.to_placeholders(),
**self._breakdown.placeholders(),
},
placeholders={
**self.query_date_range.to_placeholders(),
**self._breakdown.placeholders(),
},
),
)
]

def _get_events_subquery(self) -> ast.SelectQuery:
default_query = parse_select(
"""
def _get_events_subquery(self, no_modifications: Optional[bool]) -> ast.SelectQuery:
default_query = cast(
ast.SelectQuery,
parse_select(
"""
SELECT
{aggregation_operation} AS total,
dateTrunc({interval}, timestamp) AS day_start
Expand All @@ -119,16 +138,19 @@ def _get_events_subquery(self) -> ast.SelectQuery:
WHERE {events_filter}
GROUP BY day_start
""",
placeholders={
**self.query_date_range.to_placeholders(),
"events_filter": self._events_filter(),
"aggregation_operation": self._aggregation_operation.select_aggregation(),
"sample": self._sample_value(),
},
placeholders={
**self.query_date_range.to_placeholders(),
"events_filter": self._events_filter(),
"aggregation_operation": self._aggregation_operation.select_aggregation(),
"sample": self._sample_value(),
},
),
)

# No breakdowns and no complex series aggregation
if not self._breakdown.enabled and not self._aggregation_operation.requires_query_orchestration():
if (
not self._breakdown.enabled and not self._aggregation_operation.requires_query_orchestration()
) or no_modifications is True:
return default_query
# Both breakdowns and complex series aggregation
elif self._breakdown.enabled and self._aggregation_operation.requires_query_orchestration():
Expand Down Expand Up @@ -161,14 +183,17 @@ def _get_events_subquery(self) -> ast.SelectQuery:
return default_query

def _outer_select_query(self, inner_query: ast.SelectQuery) -> ast.SelectQuery:
query = parse_select(
"""
query = cast(
ast.SelectQuery,
parse_select(
"""
SELECT
groupArray(day_start) AS date,
groupArray(count) AS total
FROM {inner_query}
""",
placeholders={"inner_query": inner_query},
placeholders={"inner_query": inner_query},
),
)

if self._breakdown.enabled:
Expand All @@ -179,16 +204,19 @@ def _outer_select_query(self, inner_query: ast.SelectQuery) -> ast.SelectQuery:
return query

def _inner_select_query(self, inner_query: ast.SelectUnionQuery) -> ast.SelectQuery:
query = parse_select(
"""
query = cast(
ast.SelectQuery,
parse_select(
"""
SELECT
sum(total) AS count,
day_start
FROM {inner_query}
GROUP BY day_start
ORDER BY day_start ASC
""",
placeholders={"inner_query": inner_query},
placeholders={"inner_query": inner_query},
),
)

if self._breakdown.enabled:
Expand Down
22 changes: 21 additions & 1 deletion posthog/hogql_queries/insights/trends/trends_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(
query: TrendsQuery | Dict[str, Any],
team: Team,
timings: Optional[HogQLTimings] = None,
in_export_context: Optional[int] = None,
in_export_context: Optional[bool] = None,
):
super().__init__(query, team, timings, in_export_context)
self.series = self.setup_series()
Expand Down Expand Up @@ -93,6 +93,26 @@ def to_query(self) -> List[ast.SelectQuery]:

return queries

def to_persons_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
queries = []
with self.timings.measure("trends_persons_query"):
for series in self.series:
if not series.is_previous_period_series:
query_date_range = self.query_date_range
else:
query_date_range = self.query_previous_date_range

query_builder = TrendsQueryBuilder(
trends_query=series.overriden_query or self.query,
team=self.team,
query_date_range=query_date_range,
series=series.series,
timings=self.timings,
)
queries.append(query_builder.build_persons_query())

return ast.SelectUnionQuery(select_queries=queries)

def calculate(self):
queries = self.to_query()

Expand Down
2 changes: 1 addition & 1 deletion posthog/hogql_queries/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def run(self, refresh_requested: Optional[bool] = None) -> CachedQueryResponse:
def to_query(self) -> ast.SelectQuery:
raise NotImplementedError()

def to_persons_query(self) -> ast.SelectQuery:
def to_persons_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
# TODO: add support for selecting and filtering by breakdowns
raise NotImplementedError()

Expand Down

0 comments on commit 3bf712d

Please sign in to comment.