From 4e0e19fe3ecfc12213e02f68451b28ba2c70ed58 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 7 Aug 2024 15:55:42 -0700 Subject: [PATCH 1/2] feat: add dry run to delete process for testing in prod --- posthog/models/async_deletion/delete.py | 18 +++++++++++++++++- .../models/async_deletion/delete_cohorts.py | 5 ++--- posthog/models/async_deletion/delete_events.py | 9 ++++----- posthog/tasks/tasks.py | 4 ++-- 4 files changed, 25 insertions(+), 11 deletions(-) diff --git a/posthog/models/async_deletion/delete.py b/posthog/models/async_deletion/delete.py index c17816b14bbe5..ea14c70b7ebc5 100644 --- a/posthog/models/async_deletion/delete.py +++ b/posthog/models/async_deletion/delete.py @@ -5,6 +5,7 @@ from django.utils import timezone from prometheus_client import Counter +from posthog.client import sync_execute from posthog.models.async_deletion import AsyncDeletion, DeletionType @@ -19,8 +20,14 @@ class AsyncDeletionProcess(ABC): CLICKHOUSE_VERIFY_CHUNK_SIZE = 300 DELETION_TYPES: list[DeletionType] = [] - def __init__(self) -> None: + def __init__(self, dry_run: bool = False) -> None: super().__init__() + self.dry_run = dry_run + if self.dry_run: + logger.warn("Dry run enabled, no deletions will be performed") + self.sync_execute = self._dry_sync_execute + else: + self.sync_execute = self._query def run(self): queued_deletions = list( @@ -84,3 +91,12 @@ def _conditions(self, async_deletions: list[AsyncDeletion]) -> tuple[list[str], @abstractmethod def _condition(self, async_deletion: AsyncDeletion, suffix: str) -> tuple[str, dict]: raise NotImplementedError() + + @staticmethod + def _query(query, args, settings): + logger.info("Executing query", {"query": query, "args": args, "settings": settings}) + return sync_execute(query, args, settings) + + @staticmethod + def _dry_sync_execute(query, args, settings): + logger.info("Dry run", {"query": query, "args": args, "settings": settings}) diff --git a/posthog/models/async_deletion/delete_cohorts.py b/posthog/models/async_deletion/delete_cohorts.py index cbc6de0b75fd0..0092036208b9e 100644 --- a/posthog/models/async_deletion/delete_cohorts.py +++ b/posthog/models/async_deletion/delete_cohorts.py @@ -1,6 +1,5 @@ from typing import Any -from posthog.client import sync_execute from posthog.models.async_deletion import AsyncDeletion, DeletionType from posthog.models.async_deletion.delete import AsyncDeletionProcess, logger @@ -23,7 +22,7 @@ def process(self, deletions: list[AsyncDeletion]): conditions, args = self._conditions(deletions) - sync_execute( + self._sync_execute( f""" DELETE FROM cohortpeople WHERE {" OR ".join(conditions)} @@ -43,7 +42,7 @@ def _verify_by_group(self, deletion_type: int, async_deletions: list[AsyncDeleti def _verify_by_column(self, distinct_columns: str, async_deletions: list[AsyncDeletion]) -> set[tuple[Any, ...]]: conditions, args = self._conditions(async_deletions) - clickhouse_result = sync_execute( + clickhouse_result = self._query( f""" SELECT DISTINCT {distinct_columns} FROM cohortpeople diff --git a/posthog/models/async_deletion/delete_events.py b/posthog/models/async_deletion/delete_events.py index 36ca66e03d697..1939562944b51 100644 --- a/posthog/models/async_deletion/delete_events.py +++ b/posthog/models/async_deletion/delete_events.py @@ -2,7 +2,6 @@ from prometheus_client import Counter -from posthog.client import sync_execute from posthog.models.async_deletion import AsyncDeletion, DeletionType from posthog.models.async_deletion.delete import AsyncDeletionProcess, logger from posthog.settings.data_stores import CLICKHOUSE_CLUSTER @@ -69,7 +68,7 @@ def process(self, deletions: list[AsyncDeletion]): # If the query size is greater than the max predicate size, execute the query and reset the query predicate if query_size > MAX_QUERY_SIZE: logger.debug(f"Executing query with args: {args}") - sync_execute( + self.sync_execute( query, args, settings={}, @@ -80,7 +79,7 @@ def process(self, deletions: list[AsyncDeletion]): logger.debug(f"Executing query with args: {args}") # This is the default condition if we don't hit the MAX_QUERY_SIZE - sync_execute( + self.sync_execute( query, args, settings={}, @@ -103,7 +102,7 @@ def process(self, deletions: list[AsyncDeletion]): conditions, args = self._conditions(team_deletions) for table in TABLES_TO_DELETE_TEAM_DATA_FROM: query = f"""DELETE FROM {table} ON CLUSTER '{CLICKHOUSE_CLUSTER}' WHERE {" OR ".join(conditions)}""" - sync_execute( + self.sync_execute( query, args, settings={}, @@ -122,7 +121,7 @@ def _verify_by_group(self, deletion_type: int, async_deletions: list[AsyncDeleti def _verify_by_column(self, distinct_columns: str, async_deletions: list[AsyncDeletion]) -> set[tuple[Any, ...]]: conditions, args = self._conditions(async_deletions) - clickhouse_result = sync_execute( + clickhouse_result = self._query( f""" SELECT DISTINCT {distinct_columns} FROM events diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 5c4b085072287..57570c14c5ae7 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -481,12 +481,12 @@ def clickhouse_mutation_count() -> None: @shared_task(ignore_result=True) -def clickhouse_clear_removed_data() -> None: +def clickhouse_clear_removed_data(dry_run=False) -> None: from posthog.models.async_deletion.delete_cohorts import AsyncCohortDeletion from posthog.models.async_deletion.delete_events import AsyncEventDeletion from posthog.pagerduty.pd import create_incident - runner = AsyncEventDeletion() + runner = AsyncEventDeletion(dry_run) try: runner.mark_deletions_done() From ccded49a7b4ec4b4ad43fa9162d9f061a92c5a7f Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 7 Aug 2024 15:59:35 -0700 Subject: [PATCH 2/2] add dry run flag to django management command --- .../management/commands/start_delete_mutation.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/posthog/management/commands/start_delete_mutation.py b/posthog/management/commands/start_delete_mutation.py index 23d810173ed9c..87a13a233e5a8 100644 --- a/posthog/management/commands/start_delete_mutation.py +++ b/posthog/management/commands/start_delete_mutation.py @@ -15,11 +15,16 @@ class Command(BaseCommand): "Useful when you need data deleted asap (cannot wait for the scheduled job)" ) - def handle(self, *args, **options): - run() + def add_arguments(self, parser): + parser.add_argument( + "--dry-run", default=False, type=bool, help="Don't run the delete, just print out the delete statement" + ) + def handle(self, *args, **kwargs): + run(**kwargs) -def run(): + +def run(dry_run): logger.info("Starting deletion of data for teams") - clickhouse_clear_removed_data() + clickhouse_clear_removed_data(dry_run) logger.info("Finished deletion of data for teams")