-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Speed up filtering persons #25604
Changes from 59 commits
d97020e
5d4800a
f18f976
21165a2
2bf8231
2c39341
54f96a2
02d6717
51761c8
d7f4b21
ae4c773
fb7ea54
275f425
b8c8aed
29b2f47
de2423b
1547c9b
20ed939
ab77dc1
65a42d6
46d1b2e
3d431ab
1a7652f
de66639
7ec2625
580543a
cc31913
43fad40
a5d2eeb
48a4186
cc7f3d7
0b6c96f
e2a7d61
2b97ef6
794a974
28f6ec8
f5e0af4
ab7b1d3
9211e50
01816b5
8cee817
2c97f0b
648ba8e
6bf6c82
a471925
f2a2573
b525972
47e891b
fab24df
d57b6c4
9f2d558
eb8426a
66d4f2b
3d0dbc1
57d4f02
bdf8f10
9bb74d6
be3a78a
f699b33
92e1b7f
eb398df
676153a
c44b3ef
2599db1
1f305bb
f89bf3a
96ec39c
811c94e
3a7d182
49bc8e9
160dc36
5b24503
4c03ae8
0da6fd1
8cc2ab3
3f7ed19
49bc2cd
e54795f
aeed74e
f8927f0
44e140e
a905711
9e51d80
1690a36
e5b4546
8599fd4
9ee4ffb
fe96ba1
4d01e25
e7e4f29
6e061ff
ddf47d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -442,14 +442,17 @@ | |
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS events__override ON equals(events.distinct_id, events__override.distinct_id) | ||
LEFT JOIN | ||
(SELECT person.id AS id, | ||
replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, 'email'), ''), 'null'), '^"|"$', '') AS properties___email | ||
argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, 'email'), ''), 'null'), '^"|"$', ''), person.version) AS properties___email | ||
FROM person | ||
WHERE and(equals(person.team_id, 2), ifNull(in(tuple(person.id, person.version), | ||
(SELECT person.id AS id, max(person.version) AS version | ||
FROM person | ||
WHERE equals(person.team_id, 2) | ||
GROUP BY person.id | ||
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0)))), 0)) SETTINGS optimize_aggregation_in_order=1) AS events__person ON equals(if(not(empty(events__override.distinct_id)), events__override.person_id, events.person_id), events__person.id) | ||
WHERE and(equals(person.team_id, 2), in(person.id, | ||
(SELECT person.id AS id | ||
FROM person | ||
WHERE and(equals(person.team_id, 2), notIn(person.id, | ||
(SELECT person.id AS id | ||
FROM person | ||
WHERE and(equals(person.team_id, 2), equals(person.is_deleted, 1)))), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, 'email'), ''), 'null'), '^"|"$', ''), '[email protected]'), 0)))), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, 'email'), ''), 'null'), '^"|"$', ''), '[email protected]'), 0)) | ||
GROUP BY person.id | ||
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0)) SETTINGS optimize_aggregation_in_order=1) AS events__person ON equals(if(not(empty(events__override.distinct_id)), events__override.person_id, events.person_id), events__person.id) | ||
WHERE and(equals(events.team_id, 2), ifNull(equals(events__person.properties___email, '[email protected]'), 0), less(toTimeZone(events.timestamp, 'UTC'), toDateTime64('2020-01-10 12:14:05.000000', 6, 'UTC')), greater(toTimeZone(events.timestamp, 'UTC'), toDateTime64('2020-01-09 12:14:00.000000', 6, 'UTC'))) | ||
ORDER BY events.event ASC | ||
LIMIT 101 | ||
|
@@ -480,14 +483,17 @@ | |
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS events__override ON equals(events.distinct_id, events__override.distinct_id) | ||
LEFT JOIN | ||
(SELECT person.id AS id, | ||
nullIf(nullIf(person.pmat_email, ''), 'null') AS properties___email | ||
argMax(nullIf(nullIf(person.pmat_email, ''), 'null'), person.version) AS properties___email | ||
FROM person | ||
WHERE and(equals(person.team_id, 2), ifNull(in(tuple(person.id, person.version), | ||
(SELECT person.id AS id, max(person.version) AS version | ||
FROM person | ||
WHERE equals(person.team_id, 2) | ||
GROUP BY person.id | ||
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0)))), 0)) SETTINGS optimize_aggregation_in_order=1) AS events__person ON equals(if(not(empty(events__override.distinct_id)), events__override.person_id, events.person_id), events__person.id) | ||
WHERE and(equals(person.team_id, 2), in(person.id, | ||
(SELECT person.id AS id | ||
FROM person | ||
WHERE and(equals(person.team_id, 2), notIn(person.id, | ||
(SELECT person.id AS id | ||
FROM person | ||
WHERE and(equals(person.team_id, 2), equals(person.is_deleted, 1)))), ifNull(equals(nullIf(nullIf(person.pmat_email, ''), 'null'), '[email protected]'), 0)))), ifNull(equals(nullIf(nullIf(person.pmat_email, ''), 'null'), '[email protected]'), 0)) | ||
GROUP BY person.id | ||
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0)) SETTINGS optimize_aggregation_in_order=1) AS events__person ON equals(if(not(empty(events__override.distinct_id)), events__override.person_id, events.person_id), events__person.id) | ||
WHERE and(equals(events.team_id, 2), ifNull(equals(events__person.properties___email, '[email protected]'), 0), less(toTimeZone(events.timestamp, 'UTC'), toDateTime64('2020-01-10 12:14:05.000000', 6, 'UTC')), greater(toTimeZone(events.timestamp, 'UTC'), toDateTime64('2020-01-09 12:14:00.000000', 6, 'UTC'))) | ||
ORDER BY events.event ASC | ||
LIMIT 101 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,6 @@ | |
from posthog.hogql.base import Expr | ||
from posthog.hogql.constants import HogQLQuerySettings | ||
from posthog.hogql.context import HogQLContext | ||
from posthog.hogql.database.argmax import argmax_select | ||
from posthog.hogql.database.models import ( | ||
BooleanDatabaseField, | ||
DateTimeDatabaseField, | ||
|
@@ -24,7 +23,6 @@ | |
from posthog.hogql.errors import ResolutionError | ||
from posthog.hogql.visitor import clone_expr | ||
from posthog.models.organization import Organization | ||
from posthog.schema import PersonsArgMaxVersion | ||
|
||
PERSONS_FIELDS: dict[str, FieldOrTable] = { | ||
"id": StringDatabaseField(name="id"), | ||
|
@@ -46,72 +44,73 @@ def select_from_persons_table( | |
node: SelectQuery, | ||
*, | ||
filter: Optional[Expr] = None, | ||
is_join: Optional[bool] = None, | ||
): | ||
version = context.modifiers.personsArgMaxVersion | ||
if version == PersonsArgMaxVersion.AUTO: | ||
version = PersonsArgMaxVersion.V1 | ||
# If selecting properties, use the faster v2 query. Otherwise, v1 is faster. | ||
for field_chain in join_or_table.fields_accessed.values(): | ||
if field_chain[0] == "properties": | ||
version = PersonsArgMaxVersion.V2 | ||
break | ||
|
||
if version == PersonsArgMaxVersion.V2: | ||
from posthog.hogql import ast | ||
from posthog.hogql.parser import parse_select | ||
|
||
select = cast( | ||
from posthog.hogql import ast | ||
from posthog.hogql.parser import parse_select | ||
|
||
select = cast( | ||
ast.SelectQuery, | ||
parse_select( | ||
""" | ||
SELECT id | ||
FROM raw_persons | ||
GROUP BY id | ||
HAVING equals(argMax(raw_persons.is_deleted, raw_persons.version), 0) | ||
AND argMax(raw_persons.created_at, raw_persons.version) < now() + interval 1 day | ||
""" | ||
), | ||
) | ||
select.settings = HogQLQuerySettings(optimize_aggregation_in_order=True) | ||
|
||
# This bit optimizes the query by first selecting all IDs for all persons (regardless of whether it's the latest version), and only then aggregating the results | ||
# We only do this if there are where clauses, _and_ WhereClauseExtractor can extract them | ||
if node.where: | ||
inner_select = cast( | ||
ast.SelectQuery, | ||
parse_select( | ||
""" | ||
SELECT id FROM raw_persons WHERE (id, version) IN ( | ||
SELECT id, max(version) as version | ||
FROM raw_persons | ||
GROUP BY id | ||
HAVING equals(argMax(raw_persons.is_deleted, raw_persons.version), 0) | ||
AND argMax(raw_persons.created_at, raw_persons.version) < now() + interval 1 day | ||
) | ||
SELECT id | ||
FROM raw_persons | ||
WHERE | ||
-- Much faster to pre-select out any deleted persons than doing it in aggregation | ||
-- This is correct because there are no instances where we'd un-delete a person (ie there are no cases where one version has is_deleted=1 and a later version has is_deleted = 0) | ||
id NOT IN (select id from raw_persons where is_deleted = 1) | ||
timgl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
), | ||
) | ||
select.settings = HogQLQuerySettings(optimize_aggregation_in_order=True) | ||
if filter is not None: | ||
cast(ast.SelectQuery, cast(ast.CompareOperation, select.where).right).where = filter | ||
|
||
for field_name, field_chain in join_or_table.fields_accessed.items(): | ||
# We need to always select the 'id' field for the join constraint. The field name here is likely to | ||
# be "persons__id" if anything, but just in case, let's avoid duplicates. | ||
if field_name != "id": | ||
select.select.append( | ||
ast.Alias( | ||
alias=field_name, | ||
expr=ast.Field(chain=field_chain), | ||
) | ||
) | ||
else: | ||
select = argmax_select( | ||
table_name="raw_persons", | ||
select_fields=join_or_table.fields_accessed, | ||
group_fields=["id"], | ||
argmax_field="version", | ||
deleted_field="is_deleted", | ||
timestamp_field_to_clamp="created_at", | ||
) | ||
select.settings = HogQLQuerySettings(optimize_aggregation_in_order=True) | ||
if filter is not None: | ||
if select.where: | ||
select.where = And(exprs=[select.where, filter]) | ||
else: | ||
select.where = filter | ||
|
||
if context.modifiers.optimizeJoinedFilters: | ||
extractor = WhereClauseExtractor(context) | ||
extractor = WhereClauseExtractor(context, is_join=is_join) | ||
extractor.add_local_tables(join_or_table) | ||
where = extractor.get_inner_where(node) | ||
if where and select.where: | ||
select.where = And(exprs=[select.where, where]) | ||
elif where: | ||
select.where = where | ||
|
||
if where and inner_select.where: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be empty if we didn't manage to extract any clauses, in which case we don't want to use this optimization (as you'd just be doing a subquery with all persons). |
||
inner_select.where = ast.And(exprs=[inner_select.where, where]) | ||
select.where = ast.And( | ||
exprs=[ | ||
ast.CompareOperation( | ||
left=ast.Field(chain=["id"]), right=inner_select, op=ast.CompareOperationOp.In | ||
), | ||
where, # Technically, adding the where clause here is duplicative, because the outer node filters this out _again_. However, if you're trying to debug the results stay consistent throughout the query (otherwise old versions might pop up again in this subquery) | ||
] | ||
) | ||
if filter is not None: | ||
if select.where: | ||
cast(ast.SelectQuery, cast(ast.CompareOperation, select.where).right).where = ast.And( | ||
exprs=[select.where, filter] | ||
) | ||
Comment on lines
+98
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand what this |
||
else: | ||
select.where = filter | ||
|
||
for field_name, field_chain in join_or_table.fields_accessed.items(): | ||
# We need to always select the 'id' field for the join constraint. The field name here is likely to | ||
# be "persons__id" if anything, but just in case, let's avoid duplicates. | ||
if field_name != "id": | ||
select.select.append( | ||
ast.Alias( | ||
alias=field_name, | ||
expr=ast.Call(name="argMax", args=[ast.Field(chain=field_chain), ast.Field(chain=["version"])]), | ||
) | ||
) | ||
|
||
return select | ||
|
||
|
@@ -125,7 +124,7 @@ def join_with_persons_table( | |
|
||
if not join_to_add.fields_accessed: | ||
raise ResolutionError("No fields requested from persons table") | ||
join_expr = ast.JoinExpr(table=select_from_persons_table(join_to_add, context, node)) | ||
join_expr = ast.JoinExpr(table=select_from_persons_table(join_to_add, context, node, is_join=True)) | ||
|
||
organization: Organization = context.team.organization if context.team else None | ||
# TODO: @raquelmsmith: Remove flag check and use left join for all once deletes are caught up | ||
|
@@ -224,9 +223,16 @@ def create_new_table_with_filter(self, join: JoinExpr) -> Self: | |
return self | ||
|
||
def lazy_select(self, table_to_add: LazyTableToAdd, context, node): | ||
# assume that if the select_from is not persons table we're doing a join | ||
try: | ||
is_join = not isinstance(node.select_from.type.table, PersonsTable) | ||
except AttributeError: | ||
is_join = False | ||
if self.filter is not None: | ||
return select_from_persons_table(table_to_add, context, node, filter=clone_expr(self.filter, True)) | ||
return select_from_persons_table(table_to_add, context, node) | ||
return select_from_persons_table( | ||
table_to_add, context, node, filter=clone_expr(self.filter, True), is_join=is_join | ||
) | ||
return select_from_persons_table(table_to_add, context, node, is_join=is_join) | ||
|
||
def to_printed_clickhouse(self, context): | ||
return "person" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bit of a nitpick, but wouldn't this be marginally faster as a
set
/frozenset
for it's use inwhere_clause_extractor
? (I didn't actually benchmark this myself.)