Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(trends): fallback to event data for distinct_ids of personless events #27131

Merged
merged 11 commits into from
Jan 2, 2025
4 changes: 3 additions & 1 deletion posthog/hogql_queries/actor_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from posthog.hogql_queries.insights.paginators import HogQLHasMorePaginator
from posthog.hogql_queries.utils.recordings_helper import RecordingsHelper
from posthog.models import Team, Group
from posthog.schema import ActorsQuery
from posthog.schema import ActorsQuery, InsightActorsQuery, TrendsQuery

import orjson as json

Expand Down Expand Up @@ -89,6 +89,8 @@ def get_actors(self, actor_ids, order_by: str = "") -> dict[str, dict]:
return person_uuid_to_person

def input_columns(self) -> list[str]:
if isinstance(self.query.source, InsightActorsQuery) and isinstance(self.query.source.source, TrendsQuery):
return ["person", "id", "person.$delete", "event_distinct_ids"]
return ["person", "id", "person.$delete"]

def filter_conditions(self) -> list[ast.Expr]:
Expand Down
40 changes: 37 additions & 3 deletions posthog/hogql_queries/actors_query_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import itertools
from typing import Optional
from typing import Any, Optional
from collections.abc import Sequence, Iterator

from posthog.hogql import ast
Expand Down Expand Up @@ -66,14 +66,21 @@ def _enrich_with_actors(
actors_lookup,
recordings_column_index: Optional[int],
recordings_lookup: Optional[dict[str, list[dict]]],
events_distinct_id_lookup: Optional[dict[str, list[str]]],
) -> list:
enriched = []

for result in results:
new_row = list(result)
actor_id = str(result[actor_column_index])
actor = actors_lookup.get(actor_id)
new_row[actor_column_index] = actor if actor else {"id": actor_id}
if actor:
new_row[actor_column_index] = actor
else:
actor_data: dict[str, Any] = {"id": actor_id}
if events_distinct_id_lookup is not None:
actor_data["distinct_ids"] = events_distinct_id_lookup.get(actor_id)
new_row[actor_column_index] = actor_data
if recordings_column_index is not None and recordings_lookup is not None:
new_row[recordings_column_index] = (
self._get_recordings(result[recordings_column_index], recordings_lookup) or []
Expand Down Expand Up @@ -119,12 +126,24 @@ def calculate(self) -> ActorsQueryResponse:
actor_column_index = input_columns.index(column_name)
actor_ids = (row[actor_column_index] for row in self.paginator.results)
actors_lookup = self.strategy.get_actors(actor_ids)
person_uuid_to_event_distinct_ids = None

if "event_distinct_ids" in self.strategy.input_columns():
event_distinct_ids_index = self.strategy.input_columns().index("event_distinct_ids")
person_uuid_to_event_distinct_ids = {
str(row[actor_column_index]): row[event_distinct_ids_index] for row in self.paginator.results
}

recordings_column_index, recordings_lookup = self.prepare_recordings(column_name, input_columns)

missing_actors_count = len(self.paginator.results) - len(actors_lookup)
results = self._enrich_with_actors(
results, actor_column_index, actors_lookup, recordings_column_index, recordings_lookup
results,
actor_column_index,
actors_lookup,
recordings_column_index,
recordings_lookup,
person_uuid_to_event_distinct_ids,
)

return ActorsQueryResponse(
Expand Down Expand Up @@ -164,6 +183,18 @@ def source_id_column(self, source_query: ast.SelectQuery | ast.SelectSetQuery) -
return [str(part) for part in column.chain]
raise ValueError("Source query must have an id column")

def source_distinct_id_column(self, source_query: ast.SelectQuery | ast.SelectSetQuery) -> str | None:
if isinstance(source_query, ast.SelectQuery):
select = source_query.select
else:
select = next(extract_select_queries(source_query)).select

for column in select:
if isinstance(column, ast.Alias) and (column.alias in ("event_distinct_ids")):
return column.alias

return None

def source_table_join(self) -> ast.JoinExpr:
assert self.source_query_runner is not None # For type checking
source_query = self.source_query_runner.to_actors_query()
Expand Down Expand Up @@ -264,6 +295,7 @@ def to_query(self) -> ast.SelectQuery:
assert self.source_query_runner is not None # For type checking
source_query = self.source_query_runner.to_actors_query()
source_id_chain = self.source_id_column(source_query)
source_distinct_id_column = self.source_distinct_id_column(source_query)
source_alias = "source"

# If we aren't joining with the origin, give the source the origin_id
Expand All @@ -277,6 +309,8 @@ def to_query(self) -> ast.SelectQuery:
table=source_query,
alias=source_alias,
)
if source_distinct_id_column is not None:
select_query.select.append(ast.Field(chain=[source_distinct_id_column]))

try:
print_ast(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,20 @@
# ---
# name: TestInsightActorsQueryRunner.test_insight_persons_trends_query_with_argmaxV1
'''
SELECT name AS name
SELECT name AS name,
event_distinct_ids AS event_distinct_ids
FROM
(SELECT persons.properties___name AS name
(SELECT persons.properties___name AS name,
source.event_distinct_ids AS event_distinct_ids
FROM
(SELECT actor_id AS actor_id,
count() AS event_count
count() AS event_count,
groupUniqArray(distinct_id) AS event_distinct_ids
FROM
(SELECT if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS actor_id,
toTimeZone(e.timestamp, 'US/Pacific') AS timestamp,
e.uuid AS uuid
e.uuid AS uuid,
e.distinct_id AS distinct_id
FROM events AS e
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
Expand All @@ -342,9 +346,9 @@
WHERE and(equals(person.team_id, 99999), in(id,
(SELECT source.actor_id AS actor_id
FROM
(SELECT actor_id AS actor_id, count() AS event_count
(SELECT actor_id AS actor_id, count() AS event_count, groupUniqArray(distinct_id) AS event_distinct_ids
FROM
(SELECT if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS actor_id, toTimeZone(e.timestamp, 'US/Pacific') AS timestamp, e.uuid AS uuid
(SELECT if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS actor_id, toTimeZone(e.timestamp, 'US/Pacific') AS timestamp, e.uuid AS uuid, e.distinct_id AS distinct_id
FROM events AS e
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id, person_distinct_id_overrides.distinct_id AS distinct_id
Expand Down Expand Up @@ -375,16 +379,20 @@
# ---
# name: TestInsightActorsQueryRunner.test_insight_persons_trends_query_with_argmaxV2
'''
SELECT name AS name
SELECT name AS name,
event_distinct_ids AS event_distinct_ids
FROM
(SELECT persons.properties___name AS name
(SELECT persons.properties___name AS name,
source.event_distinct_ids AS event_distinct_ids
FROM
(SELECT actor_id AS actor_id,
count() AS event_count
count() AS event_count,
groupUniqArray(distinct_id) AS event_distinct_ids
FROM
(SELECT if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS actor_id,
toTimeZone(e.timestamp, 'US/Pacific') AS timestamp,
e.uuid AS uuid
e.uuid AS uuid,
e.distinct_id AS distinct_id
FROM events AS e
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
Expand Down Expand Up @@ -415,9 +423,9 @@
WHERE and(equals(person.team_id, 99999), in(person.id,
(SELECT source.actor_id AS actor_id
FROM
(SELECT actor_id AS actor_id, count() AS event_count
(SELECT actor_id AS actor_id, count() AS event_count, groupUniqArray(distinct_id) AS event_distinct_ids
FROM
(SELECT if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS actor_id, toTimeZone(e.timestamp, 'US/Pacific') AS timestamp, e.uuid AS uuid
(SELECT if(not(empty(e__override.distinct_id)), e__override.person_id, e.person_id) AS actor_id, toTimeZone(e.timestamp, 'US/Pacific') AS timestamp, e.uuid AS uuid, e.distinct_id AS distinct_id
FROM events AS e
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id, person_distinct_id_overrides.distinct_id AS distinct_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def test_insight_persons_trends_query_with_argmaxV1(self):
modifiers={"personsArgMaxVersion": PersonsArgMaxVersion.V1},
)

self.assertEqual([("p2",)], response.results)
self.assertEqual([("p2", ["p2"])], response.results)
assert "in(id," in queries[0]
self.assertEqual(2, queries[0].count("toTimeZone(e.timestamp, 'US/Pacific') AS timestamp"))

Expand Down Expand Up @@ -266,7 +266,7 @@ def test_insight_persons_trends_query_with_argmaxV2(self):
modifiers={"personsArgMaxVersion": PersonsArgMaxVersion.V2},
)

self.assertEqual([("p2",)], response.results)
self.assertEqual([("p2", ["p2"])], response.results)
assert "in(person.id" in queries[0]
self.assertEqual(2, queries[0].count("toTimeZone(e.timestamp, 'US/Pacific') AS timestamp"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,20 @@
SELECT persons.id AS id,
persons.created_at AS created_at,
source.event_count AS event_count,
source.matching_events AS matching_events
source.matching_events AS matching_events,
source.event_distinct_ids AS event_distinct_ids
FROM
(SELECT actor_id AS actor_id,
count() AS event_count,
groupUniqArray(distinct_id) AS event_distinct_ids,
groupUniqArray(100)(tuple(timestamp, uuid, `$session_id`, `$window_id`)) AS matching_events
FROM
(SELECT e.person_id AS actor_id,
toTimeZone(e.timestamp, 'UTC') AS timestamp,
e.uuid AS uuid,
e.`$session_id` AS `$session_id`,
e.`$window_id` AS `$window_id`
e.`$window_id` AS `$window_id`,
e.distinct_id AS distinct_id
FROM events AS e
LEFT JOIN
(SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(groups.group_properties, 'industry'), ''), 'null'), '^"|"$', ''), toTimeZone(groups._timestamp, 'UTC')) AS properties___industry,
Expand All @@ -269,9 +272,9 @@
WHERE and(equals(person.team_id, 99999), in(id,
(SELECT source.actor_id AS actor_id
FROM
(SELECT actor_id AS actor_id, count() AS event_count, groupUniqArray(100)(tuple(timestamp, uuid, `$session_id`, `$window_id`)) AS matching_events
(SELECT actor_id AS actor_id, count() AS event_count, groupUniqArray(distinct_id) AS event_distinct_ids, groupUniqArray(100)(tuple(timestamp, uuid, `$session_id`, `$window_id`)) AS matching_events
FROM
(SELECT e.person_id AS actor_id, toTimeZone(e.timestamp, 'UTC') AS timestamp, e.uuid AS uuid, e.`$session_id` AS `$session_id`, e.`$window_id` AS `$window_id`
(SELECT e.person_id AS actor_id, toTimeZone(e.timestamp, 'UTC') AS timestamp, e.uuid AS uuid, e.`$session_id` AS `$session_id`, e.`$window_id` AS `$window_id`, e.distinct_id AS distinct_id
FROM events AS e
LEFT JOIN
(SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(groups.group_properties, 'industry'), ''), 'null'), '^"|"$', ''), toTimeZone(groups._timestamp, 'UTC')) AS properties___industry, groups.group_type_index AS index, groups.group_key AS key
Expand Down Expand Up @@ -1548,17 +1551,20 @@
SELECT persons.id AS id,
persons.created_at AS created_at,
source.event_count AS event_count,
source.matching_events AS matching_events
source.matching_events AS matching_events,
source.event_distinct_ids AS event_distinct_ids
FROM
(SELECT actor_id AS actor_id,
count() AS event_count,
groupUniqArray(distinct_id) AS event_distinct_ids,
groupUniqArray(100)(tuple(timestamp, uuid, `$session_id`, `$window_id`)) AS matching_events
FROM
(SELECT e.person_id AS actor_id,
toTimeZone(e.timestamp, 'UTC') AS timestamp,
e.uuid AS uuid,
e.`$session_id` AS `$session_id`,
e.`$window_id` AS `$window_id`
e.`$window_id` AS `$window_id`,
e.distinct_id AS distinct_id
FROM events AS e
LEFT JOIN
(SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(groups.group_properties, 'name'), ''), 'null'), '^"|"$', ''), toTimeZone(groups._timestamp, 'UTC')) AS properties___name,
Expand All @@ -1585,9 +1591,9 @@
WHERE and(equals(person.team_id, 99999), in(id,
(SELECT source.actor_id AS actor_id
FROM
(SELECT actor_id AS actor_id, count() AS event_count, groupUniqArray(100)(tuple(timestamp, uuid, `$session_id`, `$window_id`)) AS matching_events
(SELECT actor_id AS actor_id, count() AS event_count, groupUniqArray(distinct_id) AS event_distinct_ids, groupUniqArray(100)(tuple(timestamp, uuid, `$session_id`, `$window_id`)) AS matching_events
FROM
(SELECT e.person_id AS actor_id, toTimeZone(e.timestamp, 'UTC') AS timestamp, e.uuid AS uuid, e.`$session_id` AS `$session_id`, e.`$window_id` AS `$window_id`
(SELECT e.person_id AS actor_id, toTimeZone(e.timestamp, 'UTC') AS timestamp, e.uuid AS uuid, e.`$session_id` AS `$session_id`, e.`$window_id` AS `$window_id`, e.distinct_id AS distinct_id
FROM events AS e
LEFT JOIN
(SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(groups.group_properties, 'name'), ''), 'null'), '^"|"$', ''), toTimeZone(groups._timestamp, 'UTC')) AS properties___name, groups.group_type_index AS index, groups.group_key AS key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,19 @@ def build_actors_query(self) -> ast.SelectQuery | ast.SelectSetQuery:
select=[
ast.Field(chain=["actor_id"]),
ast.Alias(alias="event_count", expr=self._get_actor_value_expr()),
*self._get_event_distinct_ids_expr(),
*self._get_matching_recordings_expr(),
],
select_from=ast.JoinExpr(table=self._get_events_query()),
group_by=[ast.Field(chain=["actor_id"])],
)

def _get_events_query(self) -> ast.SelectQuery:
actor_col = ast.Alias(alias="actor_id", expr=self._actor_id_expr())
actor_distinct_id_expr = self._actor_distinct_id_expr()
actor_distinct_id_col = (
ast.Alias(alias="distinct_id", expr=actor_distinct_id_expr) if actor_distinct_id_expr else None
)
columns: list[ast.Expr] = [
ast.Alias(alias="uuid", expr=ast.Field(chain=["e", "uuid"])),
*(
Expand All @@ -191,8 +197,8 @@ def _get_events_query(self) -> ast.SelectQuery:
if self.include_recordings
else []
),
*([actor_distinct_id_col] if actor_distinct_id_col else []),
]
actor_col = ast.Alias(alias="actor_id", expr=self._actor_id_expr())

if self.trends_aggregation_operations.is_first_time_ever_math():
date_from, date_to = self._date_where_expr()
Expand Down Expand Up @@ -236,20 +242,38 @@ def _get_matching_recordings_expr(self) -> list[ast.Expr]:
placeholders={
"timestamp": ast.Field(
chain=[
"timestamp"
if not self.trends_aggregation_operations.is_first_time_ever_math()
else "min_timestamp"
(
"timestamp"
if not self.trends_aggregation_operations.is_first_time_ever_math()
else "min_timestamp"
)
]
)
},
)
]

def _get_event_distinct_ids_expr(self) -> list[ast.Expr]:
if self.entity.math == "unique_group" and self.entity.math_group_type_index is not None:
return []

return [
ast.Alias(
alias="event_distinct_ids",
expr=ast.Call(name="groupUniqArray", args=[ast.Field(chain=["distinct_id"])]),
)
]

def _actor_id_expr(self) -> ast.Expr:
if self.entity.math == "unique_group" and self.entity.math_group_type_index is not None:
return ast.Field(chain=["e", f"$group_{int(self.entity.math_group_type_index)}"])
return ast.Field(chain=["e", "person_id"])

def _actor_distinct_id_expr(self) -> ast.Expr | None:
if self.entity.math == "unique_group" and self.entity.math_group_type_index is not None:
return None
return ast.Field(chain=["e", "distinct_id"])

def _events_where_expr(
self,
with_breakdown_expr: bool = True,
Expand Down
Loading