From 63de132c972396ee5c3602aed3f64afb78c0a8c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 6 Mar 2024 21:29:04 +0100 Subject: [PATCH] fix: Attempt to speed up squash (#20740) * fix: Use team_id filters whenever possible * fix: Disable persistency of join tables * fix: Bump timeouts and max attempts for slow activities --- .../batch_exports/squash_person_overrides.py | 34 +++++++++++++------ .../test_squash_person_overrides_workflow.py | 13 +++++++ 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index f0b57f24bad55..1f1de02a99938 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -32,6 +32,8 @@ max(version) AS latest_version FROM {database}.person_distinct_id_overrides + WHERE + ((length(%(team_ids)s) = 0) OR (team_id IN %(team_ids)s)) GROUP BY team_id, distinct_id SETTINGS @@ -105,6 +107,7 @@ {database}.sharded_events WHERE (joinGet('{database}.person_distinct_id_overrides_join', 'person_id', team_id, distinct_id) != defaultValueOfTypeName('UUID')) + AND ((length(%(team_ids)s) = 0) OR (team_id IN %(team_ids)s)) GROUP BY team_id, distinct_id SETTINGS @@ -269,6 +272,9 @@ async def optimize_person_distinct_id_overrides(dry_run: bool) -> None: activity.logger.info("Optimized person_distinct_id_overrides") +QueryParameters = dict[str, typing.Any] + + @dataclass class TableActivityInputs: """Inputs for activities that work with tables. @@ -280,6 +286,7 @@ class TableActivityInputs: """ name: str + query_parameters: QueryParameters exists: bool = True dry_run: bool = True @@ -308,7 +315,7 @@ async def create_table(inputs: TableActivityInputs) -> None: async with heartbeat_every(): async with get_client() as clickhouse_client: - await clickhouse_client.execute_query(create_table_query) + await clickhouse_client.execute_query(create_table_query, query_parameters=inputs.query_parameters) activity.logger.info("Created JOIN table person_distinct_id_overrides_join_table") @@ -434,10 +441,13 @@ async def wait_for_table(inputs: TableActivityInputs) -> None: @contextlib.asynccontextmanager -async def manage_table(table_name: str, dry_run: bool) -> collections.abc.AsyncGenerator[None, None]: +async def manage_table( + table_name: str, dry_run: bool, query_parameters: QueryParameters +) -> collections.abc.AsyncGenerator[None, None]: """A context manager to create ans subsequently drop a table.""" table_activity_inputs = TableActivityInputs( name=table_name, + query_parameters=query_parameters, dry_run=dry_run, exists=True, ) @@ -452,8 +462,8 @@ async def manage_table(table_name: str, dry_run: bool) -> collections.abc.AsyncG await workflow.execute_activity( wait_for_table, table_activity_inputs, - start_to_close_timeout=timedelta(hours=4), - retry_policy=RetryPolicy(maximum_attempts=6, initial_interval=timedelta(seconds=20)), + start_to_close_timeout=timedelta(hours=6), + retry_policy=RetryPolicy(maximum_attempts=20, initial_interval=timedelta(seconds=20)), heartbeat_timeout=timedelta(minutes=2), ) @@ -479,9 +489,6 @@ async def manage_table(table_name: str, dry_run: bool) -> collections.abc.AsyncG ) -QueryParameters = dict[str, typing.Any] - - @dataclass class MutationActivityInputs: """Inputs for activities that work with mutations. @@ -629,8 +636,8 @@ async def submit_and_wait_for_mutation( await workflow.execute_activity( wait_for_mutation, mutation_activity_inputs, - start_to_close_timeout=timedelta(hours=4), - retry_policy=RetryPolicy(maximum_attempts=6, initial_interval=timedelta(seconds=20)), + start_to_close_timeout=timedelta(hours=6), + retry_policy=RetryPolicy(maximum_attempts=20, initial_interval=timedelta(seconds=20)), heartbeat_timeout=timedelta(minutes=2), ) @@ -742,7 +749,10 @@ async def run(self, inputs: SquashPersonOverridesInputs): heartbeat_timeout=timedelta(minutes=1), ) - async with manage_table("person_distinct_id_overrides_join", inputs.dry_run): + table_query_parameters = { + "team_ids": list(inputs.team_ids), + } + async with manage_table("person_distinct_id_overrides_join", inputs.dry_run, table_query_parameters): for partition_id in inputs.iter_partition_ids(): mutation_parameters: QueryParameters = { "partition_id": partition_id, @@ -755,7 +765,9 @@ async def run(self, inputs: SquashPersonOverridesInputs): ) workflow.logger.info("Squash finished for all requested partitions, now deleting person overrides") - async with manage_table("person_distinct_id_overrides_join_to_delete", inputs.dry_run): + async with manage_table( + "person_distinct_id_overrides_join_to_delete", inputs.dry_run, table_query_parameters + ): delete_mutation_parameters: QueryParameters = { "partition_ids": list(inputs.iter_partition_ids()), "grace_period": inputs.delete_grace_period_seconds, diff --git a/posthog/temporal/tests/persons_on_events_squash/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/persons_on_events_squash/test_squash_person_overrides_workflow.py index 8654ba164f891..16a00cb609f6b 100644 --- a/posthog/temporal/tests/persons_on_events_squash/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/persons_on_events_squash/test_squash_person_overrides_workflow.py @@ -136,6 +136,7 @@ async def test_create_person_distinct_id_overrides_join_table( inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) await activity_environment.run(create_table, inputs) @@ -232,6 +233,7 @@ async def test_create_person_distinct_id_overrides_join_with_older_overrides_pre """ inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -270,6 +272,7 @@ async def test_create_person_distinct_id_overrides_join_with_newer_overrides_aft """Test `person_distinct_id_overrides_join` contains a static set of mappings.""" inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -323,6 +326,7 @@ async def test_create_wait_and_drop_table(activity_environment, person_overrides """Test if a table is created, waited on, and dropped in a normal workflow.""" inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -447,6 +451,7 @@ async def overrides_join_table(optimized_person_overrides, activity_environment) """ inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -579,6 +584,7 @@ async def test_update_events_with_person_overrides_mutation_with_older_overrides await activity_environment.run(optimize_person_distinct_id_overrides, False) inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -616,6 +622,7 @@ async def test_update_events_with_person_overrides_mutation_with_newer_overrides await activity_environment.run(optimize_person_distinct_id_overrides, False) inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -709,6 +716,7 @@ async def test_delete_person_overrides_mutation( join_table_inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -717,6 +725,7 @@ async def test_delete_person_overrides_mutation( delete_table_inputs = TableActivityInputs( name="person_distinct_id_overrides_join_to_delete", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -790,6 +799,7 @@ async def test_delete_person_overrides_mutation_within_grace_period( join_table_inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -798,6 +808,7 @@ async def test_delete_person_overrides_mutation_within_grace_period( delete_table_inputs = TableActivityInputs( name="person_distinct_id_overrides_join_to_delete", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -856,6 +867,7 @@ async def test_delete_squashed_person_overrides_from_clickhouse_dry_run( join_table_inputs = TableActivityInputs( name="person_distinct_id_overrides_join", + query_parameters={"team_ids": []}, dry_run=False, ) @@ -864,6 +876,7 @@ async def test_delete_squashed_person_overrides_from_clickhouse_dry_run( delete_table_inputs = TableActivityInputs( name="person_distinct_id_overrides_join_to_delete", + query_parameters={"team_ids": []}, dry_run=False, )