Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cohorts): optimized select from cohort_people #21564

Merged
merged 14 commits into from
Apr 17, 2024
38 changes: 24 additions & 14 deletions posthog/hogql/database/schema/cohort_people.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,40 @@
}


def select_from_cohort_people_table(requested_fields: Dict[str, List[str | int]]):
def select_from_cohort_people_table(requested_fields: Dict[str, List[str | int]], team_id: int):
from posthog.hogql import ast
from posthog.models import Cohort

cohort_tuples = list(
Cohort.objects.filter(is_static=False, team_id=team_id)
.exclude(version__isnull=True)
.values_list("id", "version")
)

table_name = "raw_cohort_people"

# must always include the person and cohort ids regardless of what other fields are requested
requested_fields = {
"person_id": ["person_id"],
"cohort_id": ["cohort_id"],
**requested_fields,
}
if "person_id" not in requested_fields:
requested_fields = {**requested_fields, "person_id": ["person_id"]}
if "cohort_id" not in requested_fields:
requested_fields = {**requested_fields, "cohort_id": ["cohort_id"]}

fields: List[ast.Expr] = [
ast.Alias(alias=name, expr=ast.Field(chain=[table_name] + chain)) for name, chain in requested_fields.items()
]

return ast.SelectQuery(
select=fields,
distinct=True,
select_from=ast.JoinExpr(table=ast.Field(chain=[table_name])),
group_by=[ast.Field(chain=[name]) for name, chain in requested_fields.items()],
having=ast.CompareOperation(
op=ast.CompareOperationOp.Gt,
left=ast.Call(name="sum", args=[ast.Field(chain=[table_name, "sign"])]),
right=ast.Constant(value=0),
),
where=ast.CompareOperation(
op=ast.CompareOperationOp.In,
left=ast.Tuple(
exprs=[ast.Field(chain=[table_name, "cohort_id"]), ast.Field(chain=[table_name, "version"])]
),
right=ast.Constant(value=cohort_tuples),
)
if len(cohort_tuples) > 0
else ast.Constant(value=False),
)


Expand All @@ -67,7 +77,7 @@ class CohortPeople(LazyTable):
fields: Dict[str, FieldOrTable] = COHORT_PEOPLE_FIELDS

def lazy_select(self, requested_fields: Dict[str, List[str | int]], context, node):
return select_from_cohort_people_table(requested_fields)
return select_from_cohort_people_table(requested_fields, context.team_id)

def to_printed_clickhouse(self, context):
return "cohortpeople"
Expand Down
45 changes: 45 additions & 0 deletions posthog/hogql/database/schema/test/test_cohort_people.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from posthog.hogql.parser import parse_select
from posthog.hogql.query import execute_hogql_query
from posthog.models import Person, Cohort
from posthog.test.base import (
APIBaseTest,
ClickhouseTestMixin,
)


class TestCohortPeopleTable(ClickhouseTestMixin, APIBaseTest):
def test_select_star(self):
Person.objects.create(
team_id=self.team.pk,
distinct_ids=["1"],
properties={"$some_prop": "something", "$another_prop": "something1"},
)
Person.objects.create(
team_id=self.team.pk,
distinct_ids=["2"],
properties={"$some_prop": "something", "$another_prop": "something2"},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will like this test more if it also had a third person where $some_prop = "not something"

cohort1 = Cohort.objects.create(
team=self.team,
groups=[
{
"properties": [
{"key": "$some_prop", "value": "something", "type": "person"},
]
}
],
name="cohort1",
)
cohort1.calculate_people_ch(pending_version=0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very optional, to make it even more rock solid, you can

Suggested change
cohort1.calculate_people_ch(pending_version=0)
cohort1.calculate_people_ch(pending_version=0)
cohort1.calculate_people_ch(pending_version=2)
cohort1.calculate_people_ch(pending_version=4)

to simulate a few recalculations


response = execute_hogql_query(
parse_select(
"select *, person.properties.$another_prop from cohort_people order by person.properties.$another_prop"
),
self.team,
)
assert response.columns == ["person_id", "cohort_id", "$another_prop"]
assert response.results is not None
assert len(response.results) == 2
assert response.results[0][2] == "something1"
assert response.results[1][2] == "something2"
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
FROM events LEFT JOIN (
SELECT person_static_cohort.person_id AS cohort_person_id, 1 AS matched, person_static_cohort.cohort_id AS cohort_id
FROM person_static_cohort
WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [11]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id)
WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [12]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id)
WHERE and(equals(events.team_id, 420), 1, ifNull(equals(__in_cohort.matched, 1), 0))
LIMIT 100
SETTINGS readonly=2, max_execution_time=60, allow_experimental_object_type=1
Expand All @@ -42,7 +42,7 @@
FROM events LEFT JOIN (
SELECT person_id AS cohort_person_id, 1 AS matched, cohort_id
FROM static_cohort_people
WHERE in(cohort_id, [11])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id)
WHERE in(cohort_id, [12])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id)
WHERE and(1, equals(__in_cohort.matched, 1))
LIMIT 100
'''
Expand All @@ -55,7 +55,7 @@
FROM events LEFT JOIN (
SELECT person_static_cohort.person_id AS cohort_person_id, 1 AS matched, person_static_cohort.cohort_id AS cohort_id
FROM person_static_cohort
WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [12]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id)
WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [13]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id)
WHERE and(equals(events.team_id, 420), 1, ifNull(equals(__in_cohort.matched, 1), 0))
LIMIT 100
SETTINGS readonly=2, max_execution_time=60, allow_experimental_object_type=1
Expand All @@ -66,7 +66,7 @@
FROM events LEFT JOIN (
SELECT person_id AS cohort_person_id, 1 AS matched, cohort_id
FROM static_cohort_people
WHERE in(cohort_id, [12])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id)
WHERE in(cohort_id, [13])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id)
WHERE and(1, equals(__in_cohort.matched, 1))
LIMIT 100
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

SELECT cohort_people__new_person.id AS id
FROM (
SELECT cohortpeople.person_id AS person_id, cohortpeople.cohort_id AS cohort_id, cohortpeople.person_id AS cohort_people___person_id
SELECT DISTINCT cohortpeople.person_id AS cohort_people___person_id, cohortpeople.person_id AS person_id, cohortpeople.cohort_id AS cohort_id
FROM cohortpeople
WHERE equals(cohortpeople.team_id, 420)
GROUP BY person_id, cohort_id, cohort_people___person_id
HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0)) AS cohort_people LEFT JOIN (
WHERE and(equals(cohortpeople.team_id, 420), false)) AS cohort_people LEFT JOIN (
SELECT persons.id AS id, id AS cohort_people__new_person___id
FROM (
SELECT person.id AS id
Expand Down Expand Up @@ -42,11 +40,9 @@

SELECT cohort_people__new_person.id AS id
FROM (
SELECT cohortpeople.person_id AS person_id, cohortpeople.cohort_id AS cohort_id, cohortpeople.person_id AS cohort_people___person_id
SELECT DISTINCT cohortpeople.person_id AS cohort_people___person_id, cohortpeople.person_id AS person_id, cohortpeople.cohort_id AS cohort_id
FROM cohortpeople
WHERE equals(cohortpeople.team_id, 420)
GROUP BY person_id, cohort_id, cohort_people___person_id
HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0)) AS cohort_people LEFT JOIN (
WHERE and(equals(cohortpeople.team_id, 420), false)) AS cohort_people LEFT JOIN (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

false - I'm assuming because wherever this test is hasn't setup the cohort version?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precisely. We have never stored any data in the cohort, so no need to query anything. Added an extra test to check it as well.

SELECT persons.id AS id, persons.properties___email AS cohort_people__new_person___properties___email
FROM (
SELECT argMax(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(person.properties, %(hogql_val_0)s), ''), 'null'), '^"|"$', ''), person.version) AS properties___email, person.id AS id
Expand Down
Loading