Skip to content

Commit

Permalink
perf: Speed up limited persons queries
Browse files Browse the repository at this point in the history
  • Loading branch information
timgl committed Oct 25, 2024
1 parent 156b875 commit 87762fc
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 22 deletions.
8 changes: 7 additions & 1 deletion posthog/hogql/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,12 @@ class WindowFunction(Expr):
over_identifier: Optional[str] = None


@dataclass(kw_only=True)
class LimitByExpr(Expr):
offset_value: int
exprs: list[Expr]


@dataclass(kw_only=True)
class SelectQuery(Expr):
# :TRICKY: When adding new fields, make sure they're handled in visitor.py and resolver.py
Expand All @@ -813,7 +819,7 @@ class SelectQuery(Expr):
group_by: Optional[list[Expr]] = None
order_by: Optional[list[OrderExpr]] = None
limit: Optional[Expr] = None
limit_by: Optional[list[Expr]] = None
limit_by: Optional[LimitByExpr] = None
limit_with_ties: Optional[bool] = None
offset: Optional[Expr] = None
settings: Optional[HogQLQuerySettings] = None
Expand Down
56 changes: 45 additions & 11 deletions posthog/hogql/database/schema/persons.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,54 @@ def select_from_persons_table(
and_conditions.append(filter)

# For now, only do this optimization for directly querying the persons table (without joins or as part of a subquery) to avoid knock-on effects to insight queries

if (
node.select_from
and node.select_from.type
and hasattr(node.select_from.type, "table")
and node.select_from.type.table
and isinstance(node.select_from.type.table, PersonsTable)
):
inner_select = cast(
ast.SelectQuery,
parse_select(
"""
SELECT id FROM raw_persons as where_optimization
"""
),
)
inner_select_where: list[ast.Expr] = []

extractor = WhereClauseExtractor(context)
extractor.add_local_tables(join_or_table)
where = extractor.get_inner_where(node)
if where:
extracted_where = extractor.get_inner_where(node)
if extracted_where:
inner_select_where.append(extracted_where)

if node.limit:
inner_select.limit_by = ast.LimitByExpr(offset_value=1, exprs=[ast.Field(chain=["id"])])
inner_select.limit = clone_expr(node.limit, clear_locations=True, clear_types=True)
inner_select.order_by = (
[clone_expr(expr, clear_locations=True, clear_types=True) for expr in node.order_by]
if node.order_by
else None
)

if node.offset:
inner_select.offset = clone_expr(node.offset, clear_locations=True, clear_types=True)
# Note: we actually change the outer node's behaviour here. This only works because in the if statement above we check whether the query is a 'simple' query (ie no joins/not in a subquery). This would mess up if we tried this with ie events
node.offset = None

# This is safe to do, as there is never a case where one version is deleted but a later version isn't
inner_select_where.append(
ast.CompareOperation(
left=ast.Field(chain=["where_optimization", "id"]),
right=parse_select("SELECT id FROM raw_persons as limit_delete_optimization WHERE is_deleted = 1"),
op=ast.CompareOperationOp.NotIn,
)
)

if len(inner_select_where) > 0:
select = argmax_select(
table_name="raw_persons",
select_fields=join_or_table.fields_accessed,
Expand All @@ -83,15 +120,12 @@ def select_from_persons_table(
deleted_field="is_deleted",
timestamp_field_to_clamp="created_at",
)
inner_select = cast(
ast.SelectQuery,
parse_select(
"""
SELECT id FROM raw_persons as where_optimization
"""
),
)
inner_select.where = where

if len(inner_select_where) == 1:
inner_select.where = inner_select_where[0]
else:
inner_select.where = ast.And(exprs=inner_select_where)

select.where = ast.CompareOperation(
left=ast.Field(chain=["id"]), right=inner_select, op=ast.CompareOperationOp.In
)
Expand Down
96 changes: 96 additions & 0 deletions posthog/hogql/database/schema/test/__snapshots__/test_persons.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,102 @@
max_bytes_before_external_group_by=0
'''
# ---
# name: TestPersonOptimization.test_limit_and_order_by
'''
SELECT persons.id AS id,
persons.`properties___$some_prop` AS `$some_prop`,
persons.created_at AS created_at
FROM
(SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, '$some_prop'), ''), 'null'), '^"|"$', ''), person.version) AS `properties___$some_prop`,
argMax(toTimeZone(person.created_at, 'UTC'), person.version) AS created_at,
person.id AS id
FROM person
WHERE and(equals(person.team_id, 2), in(id,
(SELECT where_optimization.id AS id
FROM person AS where_optimization
WHERE and(equals(where_optimization.team_id, 2), notIn(where_optimization.id,
(SELECT limit_delete_optimization.id AS id
FROM person AS limit_delete_optimization
WHERE and(equals(limit_delete_optimization.team_id, 2), equals(limit_delete_optimization.is_deleted, 1)))))
ORDER BY toTimeZone(where_optimization.created_at, 'UTC') ASC
LIMIT 1 BY where_optimization.id
LIMIT 3)))
GROUP BY person.id
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons
ORDER BY persons.created_at ASC
LIMIT 3 SETTINGS readonly=2,
max_execution_time=60,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=0
'''
# ---
# name: TestPersonOptimization.test_limit_and_order_by.1
'''
SELECT persons.id AS id,
persons.`properties___$some_prop` AS `$some_prop`
FROM
(SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, '$some_prop'), ''), 'null'), '^"|"$', ''), person.version) AS `properties___$some_prop`,
argMax(toTimeZone(person.created_at, 'UTC'), person.version) AS created_at,
person.id AS id
FROM person
WHERE and(equals(person.team_id, 2), in(id,
(SELECT where_optimization.id AS id
FROM person AS where_optimization
WHERE and(equals(where_optimization.team_id, 2), notIn(where_optimization.id,
(SELECT limit_delete_optimization.id AS id
FROM person AS limit_delete_optimization
WHERE and(equals(limit_delete_optimization.team_id, 2), equals(limit_delete_optimization.is_deleted, 1)))))
ORDER BY toTimeZone(where_optimization.created_at, 'UTC') ASC
LIMIT 1 BY where_optimization.id
LIMIT 2
OFFSET 1)))
GROUP BY person.id
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0))) AS persons
ORDER BY persons.created_at ASC
LIMIT 2 SETTINGS readonly=2,
max_execution_time=60,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=0
'''
# ---
# name: TestPersonOptimization.test_limit_and_order_by.2
'''
SELECT persons.id AS id,
persons.`properties___$some_prop` AS `$some_prop`
FROM events
LEFT OUTER JOIN
(SELECT argMax(person_distinct_id_overrides.person_id, person_distinct_id_overrides.version) AS person_id,
person_distinct_id_overrides.distinct_id AS distinct_id
FROM person_distinct_id_overrides
WHERE equals(person_distinct_id_overrides.team_id, 2)
GROUP BY person_distinct_id_overrides.distinct_id
HAVING ifNull(equals(argMax(person_distinct_id_overrides.is_deleted, person_distinct_id_overrides.version), 0), 0) SETTINGS optimize_aggregation_in_order=1) AS events__override ON equals(events.distinct_id, events__override.distinct_id)
LEFT JOIN
(SELECT person.id AS id,
replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, '$some_prop'), ''), 'null'), '^"|"$', '') AS `properties___$some_prop`
FROM person
WHERE and(equals(person.team_id, 2), ifNull(in(tuple(person.id, person.version),
(SELECT person.id AS id, max(person.version) AS version
FROM person
WHERE equals(person.team_id, 2)
GROUP BY person.id
HAVING and(ifNull(equals(argMax(person.is_deleted, person.version), 0), 0), ifNull(less(argMax(toTimeZone(person.created_at, 'UTC'), person.version), plus(now64(6, 'UTC'), toIntervalDay(1))), 0)))), 0)) SETTINGS optimize_aggregation_in_order=1) AS persons ON equals(if(not(empty(events__override.distinct_id)), events__override.person_id, events.person_id), persons.id)
WHERE and(equals(events.team_id, 2), ifNull(notEquals(persons.`properties___$some_prop`, 'something'), 1))
LIMIT 1 SETTINGS readonly=2,
max_execution_time=60,
allow_experimental_object_type=1,
format_csv_allow_double_quotes=0,
max_ast_elements=4000000,
max_expanded_ast_elements=4000000,
max_bytes_before_external_group_by=0
'''
# ---
# name: TestPersonOptimization.test_simple_filter
'''
SELECT persons.id AS id,
Expand Down
31 changes: 31 additions & 0 deletions posthog/hogql/database/schema/test/test_persons.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,34 @@ def test_person_modal_not_optimized_yet(self):
response = execute_hogql_query(query_runner.to_query(), self.team, modifiers=self.modifiers)
assert response.clickhouse
self.assertNotIn("where_optimization", response.clickhouse)

@snapshot_clickhouse_queries
def test_limit_and_order_by(self):
response = execute_hogql_query(
parse_select("select id, properties.$some_prop, created_at from persons ORDER BY created_at limit 3"),
self.team,
)
assert response.clickhouse
self.assertIn("where_optimization", response.clickhouse)
assert [x[0] for x in response.results] == [
self.first_person.uuid,
self.second_person.uuid,
self.third_person.uuid,
]

response = execute_hogql_query(
parse_select("select id, properties.$some_prop from persons ORDER BY created_at limit 2, 1"),
self.team,
)
assert [x[0] for x in response.results] == [self.second_person.uuid, self.third_person.uuid]

_create_event(event="$pageview", distinct_id="1", team=self.team)
_create_event(event="$pageview", distinct_id="2", team=self.team)
_create_event(event="$pageview", distinct_id="3", team=self.team)
response = execute_hogql_query(
parse_select(
"select id, persons.properties.$some_prop from events left join persons ON (events.person_id=persons.id) where persons.properties.$some_prop != 'something' limit 1"
),
self.team,
)
assert len(response.results) == 1
7 changes: 5 additions & 2 deletions posthog/hogql/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,17 @@ def visit_select_query(self, node: ast.SelectQuery):
else:
limit = ast.Constant(value=MAX_SELECT_RETURNED_ROWS)

if node.limit_by is not None:
clauses.append(
f"LIMIT {node.limit_by.offset_value} BY {', '.join([self.visit(expr) for expr in node.limit_by.exprs])}"
)

if limit is not None:
clauses.append(f"LIMIT {self.visit(limit)}")
if node.limit_with_ties:
clauses.append("WITH TIES")
if node.offset is not None:
clauses.append(f"OFFSET {self.visit(node.offset)}")
if node.limit_by is not None:
clauses.append(f"BY {', '.join([self.visit(expr) for expr in node.limit_by])}")

if node.settings is not None and self.dialect == "clickhouse":
settings = self._print_settings(node.settings)
Expand Down
3 changes: 1 addition & 2 deletions posthog/hogql/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,7 @@ def visit_select_query(self, node: ast.SelectQuery):
new_node.group_by = [self.visit(expr) for expr in node.group_by]
if node.order_by:
new_node.order_by = [self.visit(expr) for expr in node.order_by]
if node.limit_by:
new_node.limit_by = [self.visit(expr) for expr in node.limit_by]
new_node.limit_by = self.visit(node.limit_by)
new_node.limit = self.visit(node.limit)
new_node.limit_with_ties = node.limit_with_ties
new_node.offset = self.visit(node.offset)
Expand Down
2 changes: 1 addition & 1 deletion posthog/hogql/test/_test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ def test_select_limit_offset(self):
select_from=ast.JoinExpr(table=ast.Field(chain=["events"])),
limit=ast.Constant(value=1),
offset=ast.Constant(value=3),
limit_by=[ast.Constant(value=1), ast.Field(chain=["event"])],
limit_by=ast.LimitByExpr(offset_value=1, exprs=[ast.Field(chain=["event"])]),
),
)

Expand Down
2 changes: 1 addition & 1 deletion posthog/hogql/test/test_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_everything_visitor(self):
group_by=[ast.Constant(value=True)],
order_by=[ast.OrderExpr(expr=ast.Constant(value=True), order="DESC")],
limit=ast.Constant(value=1),
limit_by=[ast.Constant(value=True)],
limit_by=ast.LimitByExpr(offset_value=1, exprs=[ast.Constant(value=True)]),
limit_with_ties=True,
offset=ast.Or(exprs=[ast.Constant(value=1)]),
distinct=True,
Expand Down
12 changes: 9 additions & 3 deletions posthog/hogql/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ def visit_select_query(self, node: ast.SelectQuery):
self.visit(expr)
for expr in node.order_by or []:
self.visit(expr)
for expr in node.limit_by or []:
self.visit(expr)
self.visit(node.limit_by)
self.visit(node.limit)
self.visit(node.offset)
for expr in (node.window_exprs or {}).values():
Expand Down Expand Up @@ -287,6 +286,10 @@ def visit_program(self, node: ast.Program):
for expr in node.declarations:
self.visit(expr)

def visit_limit_by_expr(self, node: ast.LimitByExpr):
for expr in node.exprs:
self.visit(expr)

def visit_statement(self, node: ast.Statement):
raise NotImplementedError("Abstract 'visit_statement' not implemented")

Expand Down Expand Up @@ -588,7 +591,7 @@ def visit_select_query(self, node: ast.SelectQuery):
having=self.visit(node.having),
group_by=[self.visit(expr) for expr in node.group_by] if node.group_by else None,
order_by=[self.visit(expr) for expr in node.order_by] if node.order_by else None,
limit_by=[self.visit(expr) for expr in node.limit_by] if node.limit_by else None,
limit_by=self.visit(node.limit_by),
limit=self.visit(node.limit),
limit_with_ties=node.limit_with_ties,
offset=self.visit(node.offset),
Expand Down Expand Up @@ -641,6 +644,9 @@ def visit_window_frame_expr(self, node: ast.WindowFrameExpr):
frame_value=node.frame_value,
)

def visit_limit_by_expr(self, node: ast.LimitByExpr) -> ast.LimitByExpr:
return ast.LimitByExpr(offset_value=node.offset_value, exprs=[self.visit(expr) for expr in node.exprs])

def visit_join_constraint(self, node: ast.JoinConstraint) -> ast.JoinConstraint:
return ast.JoinConstraint(expr=self.visit(node.expr), constraint_type=node.constraint_type)

Expand Down
2 changes: 1 addition & 1 deletion posthog/models/person/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def bulk_create_persons(persons_list: list[dict]):
distinct_id_inserts.append(f"('{distinct_id}', '{person.uuid}', {person.team_id}, 0, 0, now(), 0, 0)")
person_mapping[distinct_id] = person

created_at = now().strftime("%Y-%m-%d %H:%M:%S.%f")
created_at = persons_list[index].get("created_at", now()).strftime("%Y-%m-%d %H:%M:%S.%f")
timestamp = now().strftime("%Y-%m-%d %H:%M:%S")
person_inserts.append(
f"('{person.uuid}', '{created_at}', {person.team_id}, '{json.dumps(person.properties)}', {'1' if person.is_identified else '0'}, '{timestamp}', 0, 0, 0)"
Expand Down

0 comments on commit 87762fc

Please sign in to comment.