Skip to content

Commit

Permalink
fix: reduce cohort calculation memory usage (#23419)
Browse files Browse the repository at this point in the history
  • Loading branch information
aspicer authored Jul 2, 2024
1 parent aee5a79 commit 39a4d27
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 57 deletions.
18 changes: 9 additions & 9 deletions ee/clickhouse/models/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 0
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
# name: TestCohort.test_cohortpeople_with_not_in_cohort_operator
Expand Down Expand Up @@ -66,7 +66,7 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 0
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
# name: TestCohort.test_cohortpeople_with_not_in_cohort_operator.1
Expand Down Expand Up @@ -115,7 +115,7 @@
HAVING max(is_deleted) = 0
AND (((((NOT has(['something1'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), '$some_prop'), '^"|"$', ''))))))) SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND ((((performed_event_condition_X_level_level_0_level_0_level_0_0)))) ) as person
AND ((((performed_event_condition_X_level_level_0_level_0_level_0_0)))) SETTINGS optimize_aggregation_in_order=1 ) as person
UNION ALL
SELECT person_id,
cohort_id,
Expand All @@ -126,7 +126,7 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 0
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
# name: TestCohort.test_cohortpeople_with_not_in_cohort_operator_and_no_precalculation
Expand Down Expand Up @@ -181,7 +181,7 @@
HAVING max(is_deleted) = 0
AND (((((NOT has(['something1'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), '$some_prop'), '^"|"$', ''))))))) SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND ((((performed_event_condition_X_level_level_0_level_0_level_0_0)))) ) ))
AND ((((performed_event_condition_X_level_level_0_level_0_level_0_0)))) SETTINGS optimize_aggregation_in_order=1 ) ))
'''
# ---
# name: TestCohort.test_cohortpeople_with_not_in_cohort_operator_for_behavioural_cohorts
Expand Down Expand Up @@ -211,7 +211,7 @@
AND event IN ['signup']
GROUP BY person_id) behavior_query
WHERE 1 = 1
AND (((first_time_condition_X_level_level_0_level_0_0))) ) as person
AND (((first_time_condition_X_level_level_0_level_0_0))) SETTINGS optimize_aggregation_in_order=1 ) as person
UNION ALL
SELECT person_id,
cohort_id,
Expand All @@ -222,7 +222,7 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 0
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
# name: TestCohort.test_cohortpeople_with_not_in_cohort_operator_for_behavioural_cohorts.1
Expand Down Expand Up @@ -257,7 +257,7 @@
GROUP BY person_id) behavior_query
WHERE 1 = 1
AND ((((performed_event_condition_X_level_level_0_level_0_level_0_0))
AND ((((NOT first_time_condition_X_level_level_0_level_1_level_0_level_0_level_0_0)))))) ) as person
AND ((((NOT first_time_condition_X_level_level_0_level_1_level_0_level_0_level_0_0)))))) SETTINGS optimize_aggregation_in_order=1 ) as person
UNION ALL
SELECT person_id,
cohort_id,
Expand All @@ -268,7 +268,7 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 0
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
# name: TestCohort.test_static_cohort_precalculated
Expand Down
3 changes: 3 additions & 0 deletions ee/clickhouse/queries/enterprise_cohort_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,14 @@ def get_query(self) -> tuple[str, dict[str, Any]]:
# Since we can FULL OUTER JOIN, we may end up with pairs of uuids where one side is blank. Always try to choose the non blank ID
q, fields = self._build_sources(subq)

# optimize_aggregation_in_order slows down this query but massively decreases memory usage
# this is fine for offline cohort calculation
final_query = f"""
SELECT {fields} AS id FROM
{q}
WHERE 1 = 1
{conditions}
SETTINGS optimize_aggregation_in_order=1
"""

return final_query, self.params
Expand Down
57 changes: 15 additions & 42 deletions ee/clickhouse/queries/test/__snapshots__/test_cohort_query.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
WHERE 1 = 1
AND ((((performed_event_condition_None_level_level_0_level_0_level_0_0)
OR (performed_event_condition_None_level_level_0_level_0_level_1_0))
AND ((first_time_condition_None_level_level_0_level_1_level_0_0))))
AND ((first_time_condition_None_level_level_0_level_1_level_0_0)))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_cohort_filter_with_another_cohort_with_event_sequence
Expand Down Expand Up @@ -117,7 +117,7 @@
AND (((has(['test'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), 'name'), '^"|"$', ''))))) SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = funnel_query.person_id
WHERE 1 = 1
AND ((((steps_0))
AND (steps_1)))
AND (steps_1))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_cohort_filter_with_extra
Expand Down Expand Up @@ -159,7 +159,7 @@
HAVING max(is_deleted) = 0
AND ((((has(['test'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), 'name'), '^"|"$', '')))))) SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND (((performed_event_condition_None_level_level_0_level_0_0)))
AND (((performed_event_condition_None_level_level_0_level_0_0))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_cohort_filter_with_extra.1
Expand Down Expand Up @@ -197,7 +197,7 @@
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND ((((has(['test'], replaceRegexpAll(JSONExtractRaw(person_props, 'name'), '^"|"$', ''))))
OR ((performed_event_condition_None_level_level_0_level_1_level_0_0))))
OR ((performed_event_condition_None_level_level_0_level_1_level_0_0)))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_performed_event_sequence
Expand Down Expand Up @@ -240,7 +240,7 @@
AND timestamp >= now() - INTERVAL 7 day ))
GROUP BY person_id) funnel_query
WHERE 1 = 1
AND (((steps_0)))
AND (((steps_0))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_performed_event_sequence_and_clause_with_additional_event
Expand Down Expand Up @@ -288,7 +288,7 @@
GROUP BY person_id) funnel_query
WHERE 1 = 1
AND (((steps_0)
OR (performed_event_multiple_condition_None_level_level_0_level_1_0)))
OR (performed_event_multiple_condition_None_level_level_0_level_1_0))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_performed_event_sequence_with_person_properties
Expand Down Expand Up @@ -351,34 +351,7 @@
AND ((has(['[email protected]'], replaceRegexpAll(JSONExtractRaw(argMax(person.properties, version), 'email'), '^"|"$', '')))) SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = funnel_query.person_id
WHERE 1 = 1
AND (((steps_0)
AND (performed_event_multiple_condition_None_level_level_0_level_1_0)))
'''
# ---
# name: TestCohortQuery.test_performed_event_with_event_filters
'''

SELECT behavior_query.person_id AS id
FROM
(SELECT pdi.person_id AS person_id,
countIf(timestamp > now() - INTERVAL 1 week
AND timestamp < now()
AND event = '$pageview'
AND (has(['something'], replaceRegexpAll(JSONExtractRaw(properties, '$filter_prop'), '^"|"$', '')))) > 0 AS performed_event_condition_None_level_level_0_level_0_0
FROM events e
INNER JOIN
(SELECT distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id2
WHERE team_id = 2
GROUP BY distinct_id
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
WHERE team_id = 2
AND event IN ['$pageview']
AND timestamp <= now()
AND timestamp >= now() - INTERVAL 1 week
GROUP BY person_id) behavior_query
WHERE 1 = 1
AND (((performed_event_condition_None_level_level_0_level_0_0)))
AND (performed_event_multiple_condition_None_level_level_0_level_1_0))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_performed_event_with_event_filters_and_explicit_date
Expand All @@ -405,7 +378,7 @@
AND timestamp >= now() - INTERVAL 4 day
GROUP BY person_id) behavior_query
WHERE 1 = 1
AND (((performed_event_condition_None_level_level_0_level_0_0)))
AND (((performed_event_condition_None_level_level_0_level_0_0))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_person
Expand Down Expand Up @@ -443,7 +416,7 @@
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND (((performed_event_condition_None_level_level_0_level_0_0)
OR (has(['[email protected]'], replaceRegexpAll(JSONExtractRaw(person_props, '$sample_field'), '^"|"$', '')))))
OR (has(['[email protected]'], replaceRegexpAll(JSONExtractRaw(person_props, '$sample_field'), '^"|"$', ''))))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_person_materialized
Expand Down Expand Up @@ -481,7 +454,7 @@
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND (((performed_event_condition_None_level_level_0_level_0_0)
OR (has(['[email protected]'], "pmat_$sample_field"))))
OR (has(['[email protected]'], "pmat_$sample_field")))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_person_properties_with_pushdowns
Expand Down Expand Up @@ -533,7 +506,7 @@
AND ((((performed_event_condition_None_level_level_0_level_0_level_0_0)
OR (performed_event_condition_None_level_level_0_level_0_level_1_0)
OR (has(['special'], replaceRegexpAll(JSONExtractRaw(person_props, 'name'), '^"|"$', ''))))
AND ((first_time_condition_None_level_level_0_level_1_level_0_0))))
AND ((first_time_condition_None_level_level_0_level_1_level_0_0)))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_person_props_only
Expand Down Expand Up @@ -623,7 +596,7 @@
(SELECT person_id as id
FROM person_static_cohort
WHERE cohort_id = 2
AND team_id = 2))))
AND team_id = 2)))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter_with_extra
Expand Down Expand Up @@ -673,7 +646,7 @@
FROM person_static_cohort
WHERE cohort_id = 2
AND team_id = 2))
AND (performed_event_condition_None_level_level_0_level_1_0)))
AND (performed_event_condition_None_level_level_0_level_1_0))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_static_cohort_filter_with_extra.2
Expand Down Expand Up @@ -714,7 +687,7 @@
FROM person_static_cohort
WHERE cohort_id = 2
AND team_id = 2)))
OR ((performed_event_condition_None_level_level_0_level_1_level_0_0))))
OR ((performed_event_condition_None_level_level_0_level_1_level_0_0)))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
# name: TestCohortQuery.test_unwrapping_static_cohort_filter_hidden_in_layers_of_cohorts
Expand Down Expand Up @@ -769,6 +742,6 @@
FROM person_static_cohort
WHERE cohort_id = 2
AND team_id = 2)))
OR (performed_event_condition_None_level_level_0_level_1_0)))
OR (performed_event_condition_None_level_level_0_level_1_0))) SETTINGS optimize_aggregation_in_order=1
'''
# ---
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,7 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 1
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
# name: TestExperimentAuxiliaryEndpoints.test_create_exposure_cohort_for_experiment_with_custom_action_filters_exposure.1
Expand Down Expand Up @@ -1981,7 +1981,7 @@
AND timestamp >= now() - INTERVAL 6 day
GROUP BY person_id) behavior_query
WHERE 1 = 1
AND (((performed_event_condition_X_level_level_0_level_0_0))) ) as person
AND (((performed_event_condition_X_level_level_0_level_0_0))) SETTINGS optimize_aggregation_in_order=1 ) as person
UNION ALL
SELECT person_id,
cohort_id,
Expand All @@ -1992,6 +1992,6 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 1
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
6 changes: 3 additions & 3 deletions posthog/api/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
HAVING max(is_deleted) = 0 SETTINGS optimize_aggregation_in_order = 1)) person ON person.person_id = behavior_query.person_id
WHERE 1 = 1
AND ((((has(['something'], replaceRegexpAll(JSONExtractRaw(person_props, '$some_prop'), '^"|"$', ''))))
OR ((performed_event_condition_X_level_level_0_level_1_level_0_0)))) ) as person
OR ((performed_event_condition_X_level_level_0_level_1_level_0_0)))) SETTINGS optimize_aggregation_in_order=1 ) as person
UNION ALL
SELECT person_id,
cohort_id,
Expand All @@ -62,7 +62,7 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 1
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
# name: TestCohort.test_async_deletion_of_cohort.10
Expand Down Expand Up @@ -144,7 +144,7 @@
WHERE team_id = 2
AND cohort_id = 2
AND version < 2
AND sign = 1
AND sign = 1 SETTINGS optimize_aggregation_in_order = 1
'''
# ---
# name: TestCohort.test_async_deletion_of_cohort.6
Expand Down
2 changes: 2 additions & 0 deletions posthog/models/cohort/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"""

# Continually ensure that all previous version rows are deleted and insert persons that match the criteria
# optimize_aggregation_in_order = 1 is necessary to avoid oom'ing for our biggest clients
RECALCULATE_COHORT_BY_ID = """
INSERT INTO cohortpeople
SELECT id, %(cohort_id)s as cohort_id, %(team_id)s as team_id, 1 AS sign, %(new_version)s AS version
Expand All @@ -45,6 +46,7 @@
SELECT person_id, cohort_id, team_id, -1, version
FROM cohortpeople
WHERE team_id = %(team_id)s AND cohort_id = %(cohort_id)s AND version < %(new_version)s AND sign = 1
SETTINGS optimize_aggregation_in_order = 1
"""

# NOTE: Group by version id to ensure that signs are summed between corresponding rows.
Expand Down

0 comments on commit 39a4d27

Please sign in to comment.