Skip to content

Commit

Permalink
fix: tune up tests for async deletes (#24090)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuziontech authored Jul 31, 2024
1 parent 1707c2e commit 6499bde
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 41 deletions.
1 change: 1 addition & 0 deletions posthog/models/async_deletion/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from posthog.models.async_deletion import AsyncDeletion, DeletionType


logger = structlog.get_logger(__name__)


Expand Down
70 changes: 29 additions & 41 deletions posthog/models/async_deletion/delete_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
from posthog.settings.data_stores import CLICKHOUSE_CLUSTER


deletions_counter = Counter("deletions_executed", "Total number of deletions sent to clickhouse", ["deletion_type"])
logger.setLevel("DEBUG")

deletions_counter = Counter("deletions_executed", "Total number of deletions sent to clickhouse", ["deletion_type"])

MAX_PREDICATE_SIZE = 240_000 # 240KB
# We purposely set this lower than the 256KB limit in ClickHouse to account for the potential overhead of the argument
# substitution and settings injection. This is a conservative estimate, but it's better to be safe than hit the limit.
MAX_QUERY_SIZE = 230_000 # 230KB which is less than 256KB limit in ClickHouse

# Note: Session recording, dead letter queue, logs deletion will be handled by TTL
TABLES_TO_DELETE_TEAM_DATA_FROM = [
Expand Down Expand Up @@ -45,40 +48,38 @@ def process(self, deletions: list[AsyncDeletion]):
},
)

conditions, args = self._conditions(deletions)
conditions, args = [], {}
for i, deletion in enumerate(deletions):
condition, arg = self._condition(deletion, str(i))

# Split the deletions into chunks to avoid hitting the max query size
query_predicate = []
for condition in conditions:
query_predicate.append(condition)
conditions.append(condition)
args.update(arg)

# Get estimated byte size of the query
str_predicate = " OR ".join(query_predicate)
query_size = len(str_predicate.encode("utf-8"))
str_predicate = " OR ".join(conditions)
query = f"DELETE FROM sharded_events ON CLUSTER '{CLICKHOUSE_CLUSTER}' WHERE {str_predicate}"
query_size = len(query.encode("utf-8"))

logger.debug(f"Query size: {query_size}")
logger.debug(f"Query: {query}")
logger.debug(f"Query deletions: {deletions}")

# If the query size is greater than the max predicate size, execute the query and reset the query predicate
if query_size > MAX_PREDICATE_SIZE:
next_args, rest_args = split_dict(args, len(query_predicate) - 1)
if query_size > MAX_QUERY_SIZE:
logger.debug(f"Executing query with args: {args}")
sync_execute(
f"""
DELETE FROM posthog.sharded_events
ON CLUSTER '{CLICKHOUSE_CLUSTER}'
WHERE {str_predicate}
""",
next_args,
query,
args,
settings={},
)
# Reset the query predicate and predicate args
args = rest_args
query_predicate = []

# This is the default condition if we don't hit the MAX_PREDICATE_SIZE
conditions, args = [], {}

logger.debug(f"Executing query with args: {args}")

# This is the default condition if we don't hit the MAX_QUERY_SIZE
sync_execute(
f"""
DELETE FROM sharded_events
ON CLUSTER '{CLICKHOUSE_CLUSTER}'
WHERE {str_predicate}
""",
query,
args,
settings={},
)
Expand All @@ -99,12 +100,9 @@ def process(self, deletions: list[AsyncDeletion]):
)
conditions, args = self._conditions(team_deletions)
for table in TABLES_TO_DELETE_TEAM_DATA_FROM:
query = f"""DELETE FROM {table} ON CLUSTER '{CLICKHOUSE_CLUSTER}' WHERE {" OR ".join(conditions)}"""
sync_execute(
f"""
DELETE FROM {table}
ON CLUSTER '{CLICKHOUSE_CLUSTER}'
WHERE {" OR ".join(conditions)}
""",
query,
args,
settings={},
)
Expand Down Expand Up @@ -163,13 +161,3 @@ def _condition(self, async_deletion: AsyncDeletion, suffix: str) -> tuple[str, d
f"key{suffix}": async_deletion.key,
},
)


def split_dict(original_dict, n):
items = list(original_dict.items())

# Split the items
first_n = dict(items[:n])
rest = dict(items[n:])

return first_n, rest

0 comments on commit 6499bde

Please sign in to comment.