Skip to content

Commit

Permalink
fix: Attempt to speed up squash (#20740)
Browse files Browse the repository at this point in the history
* fix: Use team_id filters whenever possible

* fix: Disable persistency of join tables

* fix: Bump timeouts and max attempts for slow activities
  • Loading branch information
tomasfarias authored Mar 6, 2024
1 parent 9aa308f commit 63de132
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
34 changes: 23 additions & 11 deletions posthog/temporal/batch_exports/squash_person_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
max(version) AS latest_version
FROM
{database}.person_distinct_id_overrides
WHERE
((length(%(team_ids)s) = 0) OR (team_id IN %(team_ids)s))
GROUP BY
team_id, distinct_id
SETTINGS
Expand Down Expand Up @@ -105,6 +107,7 @@
{database}.sharded_events
WHERE
(joinGet('{database}.person_distinct_id_overrides_join', 'person_id', team_id, distinct_id) != defaultValueOfTypeName('UUID'))
AND ((length(%(team_ids)s) = 0) OR (team_id IN %(team_ids)s))
GROUP BY
team_id, distinct_id
SETTINGS
Expand Down Expand Up @@ -269,6 +272,9 @@ async def optimize_person_distinct_id_overrides(dry_run: bool) -> None:
activity.logger.info("Optimized person_distinct_id_overrides")


QueryParameters = dict[str, typing.Any]


@dataclass
class TableActivityInputs:
"""Inputs for activities that work with tables.
Expand All @@ -280,6 +286,7 @@ class TableActivityInputs:
"""

name: str
query_parameters: QueryParameters
exists: bool = True
dry_run: bool = True

Expand Down Expand Up @@ -308,7 +315,7 @@ async def create_table(inputs: TableActivityInputs) -> None:

async with heartbeat_every():
async with get_client() as clickhouse_client:
await clickhouse_client.execute_query(create_table_query)
await clickhouse_client.execute_query(create_table_query, query_parameters=inputs.query_parameters)

activity.logger.info("Created JOIN table person_distinct_id_overrides_join_table")

Expand Down Expand Up @@ -434,10 +441,13 @@ async def wait_for_table(inputs: TableActivityInputs) -> None:


@contextlib.asynccontextmanager
async def manage_table(table_name: str, dry_run: bool) -> collections.abc.AsyncGenerator[None, None]:
async def manage_table(
table_name: str, dry_run: bool, query_parameters: QueryParameters
) -> collections.abc.AsyncGenerator[None, None]:
"""A context manager to create ans subsequently drop a table."""
table_activity_inputs = TableActivityInputs(
name=table_name,
query_parameters=query_parameters,
dry_run=dry_run,
exists=True,
)
Expand All @@ -452,8 +462,8 @@ async def manage_table(table_name: str, dry_run: bool) -> collections.abc.AsyncG
await workflow.execute_activity(
wait_for_table,
table_activity_inputs,
start_to_close_timeout=timedelta(hours=4),
retry_policy=RetryPolicy(maximum_attempts=6, initial_interval=timedelta(seconds=20)),
start_to_close_timeout=timedelta(hours=6),
retry_policy=RetryPolicy(maximum_attempts=20, initial_interval=timedelta(seconds=20)),
heartbeat_timeout=timedelta(minutes=2),
)

Expand All @@ -479,9 +489,6 @@ async def manage_table(table_name: str, dry_run: bool) -> collections.abc.AsyncG
)


QueryParameters = dict[str, typing.Any]


@dataclass
class MutationActivityInputs:
"""Inputs for activities that work with mutations.
Expand Down Expand Up @@ -629,8 +636,8 @@ async def submit_and_wait_for_mutation(
await workflow.execute_activity(
wait_for_mutation,
mutation_activity_inputs,
start_to_close_timeout=timedelta(hours=4),
retry_policy=RetryPolicy(maximum_attempts=6, initial_interval=timedelta(seconds=20)),
start_to_close_timeout=timedelta(hours=6),
retry_policy=RetryPolicy(maximum_attempts=20, initial_interval=timedelta(seconds=20)),
heartbeat_timeout=timedelta(minutes=2),
)

Expand Down Expand Up @@ -742,7 +749,10 @@ async def run(self, inputs: SquashPersonOverridesInputs):
heartbeat_timeout=timedelta(minutes=1),
)

async with manage_table("person_distinct_id_overrides_join", inputs.dry_run):
table_query_parameters = {
"team_ids": list(inputs.team_ids),
}
async with manage_table("person_distinct_id_overrides_join", inputs.dry_run, table_query_parameters):
for partition_id in inputs.iter_partition_ids():
mutation_parameters: QueryParameters = {
"partition_id": partition_id,
Expand All @@ -755,7 +765,9 @@ async def run(self, inputs: SquashPersonOverridesInputs):
)
workflow.logger.info("Squash finished for all requested partitions, now deleting person overrides")

async with manage_table("person_distinct_id_overrides_join_to_delete", inputs.dry_run):
async with manage_table(
"person_distinct_id_overrides_join_to_delete", inputs.dry_run, table_query_parameters
):
delete_mutation_parameters: QueryParameters = {
"partition_ids": list(inputs.iter_partition_ids()),
"grace_period": inputs.delete_grace_period_seconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ async def test_create_person_distinct_id_overrides_join_table(

inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)
await activity_environment.run(create_table, inputs)
Expand Down Expand Up @@ -232,6 +233,7 @@ async def test_create_person_distinct_id_overrides_join_with_older_overrides_pre
"""
inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down Expand Up @@ -270,6 +272,7 @@ async def test_create_person_distinct_id_overrides_join_with_newer_overrides_aft
"""Test `person_distinct_id_overrides_join` contains a static set of mappings."""
inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down Expand Up @@ -323,6 +326,7 @@ async def test_create_wait_and_drop_table(activity_environment, person_overrides
"""Test if a table is created, waited on, and dropped in a normal workflow."""
inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down Expand Up @@ -447,6 +451,7 @@ async def overrides_join_table(optimized_person_overrides, activity_environment)
"""
inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down Expand Up @@ -579,6 +584,7 @@ async def test_update_events_with_person_overrides_mutation_with_older_overrides
await activity_environment.run(optimize_person_distinct_id_overrides, False)
inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down Expand Up @@ -616,6 +622,7 @@ async def test_update_events_with_person_overrides_mutation_with_newer_overrides
await activity_environment.run(optimize_person_distinct_id_overrides, False)
inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down Expand Up @@ -709,6 +716,7 @@ async def test_delete_person_overrides_mutation(

join_table_inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand All @@ -717,6 +725,7 @@ async def test_delete_person_overrides_mutation(

delete_table_inputs = TableActivityInputs(
name="person_distinct_id_overrides_join_to_delete",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down Expand Up @@ -790,6 +799,7 @@ async def test_delete_person_overrides_mutation_within_grace_period(

join_table_inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand All @@ -798,6 +808,7 @@ async def test_delete_person_overrides_mutation_within_grace_period(

delete_table_inputs = TableActivityInputs(
name="person_distinct_id_overrides_join_to_delete",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down Expand Up @@ -856,6 +867,7 @@ async def test_delete_squashed_person_overrides_from_clickhouse_dry_run(

join_table_inputs = TableActivityInputs(
name="person_distinct_id_overrides_join",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand All @@ -864,6 +876,7 @@ async def test_delete_squashed_person_overrides_from_clickhouse_dry_run(

delete_table_inputs = TableActivityInputs(
name="person_distinct_id_overrides_join_to_delete",
query_parameters={"team_ids": []},
dry_run=False,
)

Expand Down

0 comments on commit 63de132

Please sign in to comment.