From 6499bded5585bf1423fcf6673b2f560fe4f941af Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 31 Jul 2024 16:18:57 -0700 Subject: [PATCH] fix: tune up tests for async deletes (#24090) --- posthog/models/async_deletion/delete.py | 1 + .../models/async_deletion/delete_events.py | 70 ++++++++----------- 2 files changed, 30 insertions(+), 41 deletions(-) diff --git a/posthog/models/async_deletion/delete.py b/posthog/models/async_deletion/delete.py index 005e93c583c19..527b0c34cec44 100644 --- a/posthog/models/async_deletion/delete.py +++ b/posthog/models/async_deletion/delete.py @@ -7,6 +7,7 @@ from posthog.models.async_deletion import AsyncDeletion, DeletionType + logger = structlog.get_logger(__name__) diff --git a/posthog/models/async_deletion/delete_events.py b/posthog/models/async_deletion/delete_events.py index fa1c59679df33..f5a4e39ce148a 100644 --- a/posthog/models/async_deletion/delete_events.py +++ b/posthog/models/async_deletion/delete_events.py @@ -8,10 +8,13 @@ from posthog.settings.data_stores import CLICKHOUSE_CLUSTER -deletions_counter = Counter("deletions_executed", "Total number of deletions sent to clickhouse", ["deletion_type"]) +logger.setLevel("DEBUG") +deletions_counter = Counter("deletions_executed", "Total number of deletions sent to clickhouse", ["deletion_type"]) -MAX_PREDICATE_SIZE = 240_000 # 240KB +# We purposely set this lower than the 256KB limit in ClickHouse to account for the potential overhead of the argument +# substitution and settings injection. This is a conservative estimate, but it's better to be safe than hit the limit. +MAX_QUERY_SIZE = 230_000 # 230KB which is less than 256KB limit in ClickHouse # Note: Session recording, dead letter queue, logs deletion will be handled by TTL TABLES_TO_DELETE_TEAM_DATA_FROM = [ @@ -45,40 +48,38 @@ def process(self, deletions: list[AsyncDeletion]): }, ) - conditions, args = self._conditions(deletions) + conditions, args = [], {} + for i, deletion in enumerate(deletions): + condition, arg = self._condition(deletion, str(i)) - # Split the deletions into chunks to avoid hitting the max query size - query_predicate = [] - for condition in conditions: - query_predicate.append(condition) + conditions.append(condition) + args.update(arg) # Get estimated byte size of the query - str_predicate = " OR ".join(query_predicate) - query_size = len(str_predicate.encode("utf-8")) + str_predicate = " OR ".join(conditions) + query = f"DELETE FROM sharded_events ON CLUSTER '{CLICKHOUSE_CLUSTER}' WHERE {str_predicate}" + query_size = len(query.encode("utf-8")) + + logger.debug(f"Query size: {query_size}") + logger.debug(f"Query: {query}") + logger.debug(f"Query deletions: {deletions}") # If the query size is greater than the max predicate size, execute the query and reset the query predicate - if query_size > MAX_PREDICATE_SIZE: - next_args, rest_args = split_dict(args, len(query_predicate) - 1) + if query_size > MAX_QUERY_SIZE: + logger.debug(f"Executing query with args: {args}") sync_execute( - f""" - DELETE FROM posthog.sharded_events - ON CLUSTER '{CLICKHOUSE_CLUSTER}' - WHERE {str_predicate} - """, - next_args, + query, + args, settings={}, ) - # Reset the query predicate and predicate args - args = rest_args - query_predicate = [] - # This is the default condition if we don't hit the MAX_PREDICATE_SIZE + conditions, args = [], {} + + logger.debug(f"Executing query with args: {args}") + + # This is the default condition if we don't hit the MAX_QUERY_SIZE sync_execute( - f""" - DELETE FROM sharded_events - ON CLUSTER '{CLICKHOUSE_CLUSTER}' - WHERE {str_predicate} - """, + query, args, settings={}, ) @@ -99,12 +100,9 @@ def process(self, deletions: list[AsyncDeletion]): ) conditions, args = self._conditions(team_deletions) for table in TABLES_TO_DELETE_TEAM_DATA_FROM: + query = f"""DELETE FROM {table} ON CLUSTER '{CLICKHOUSE_CLUSTER}' WHERE {" OR ".join(conditions)}""" sync_execute( - f""" - DELETE FROM {table} - ON CLUSTER '{CLICKHOUSE_CLUSTER}' - WHERE {" OR ".join(conditions)} - """, + query, args, settings={}, ) @@ -163,13 +161,3 @@ def _condition(self, async_deletion: AsyncDeletion, suffix: str) -> tuple[str, d f"key{suffix}": async_deletion.key, }, ) - - -def split_dict(original_dict, n): - items = list(original_dict.items()) - - # Split the items - first_n = dict(items[:n]) - rest = dict(items[n:]) - - return first_n, rest