Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dry run to delete process for testing in prod #24251

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions posthog/management/commands/start_delete_mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ class Command(BaseCommand):
"Useful when you need data deleted asap (cannot wait for the scheduled job)"
)

def handle(self, *args, **options):
run()
def add_arguments(self, parser):
parser.add_argument(
"--dry-run", default=False, type=bool, help="Don't run the delete, just print out the delete statement"
)

def handle(self, *args, **kwargs):
run(**kwargs)

def run():

def run(dry_run):
logger.info("Starting deletion of data for teams")
clickhouse_clear_removed_data()
clickhouse_clear_removed_data(dry_run)
logger.info("Finished deletion of data for teams")
18 changes: 17 additions & 1 deletion posthog/models/async_deletion/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.utils import timezone
from prometheus_client import Counter

from posthog.client import sync_execute
from posthog.models.async_deletion import AsyncDeletion, DeletionType


Expand All @@ -19,8 +20,14 @@ class AsyncDeletionProcess(ABC):
CLICKHOUSE_VERIFY_CHUNK_SIZE = 300
DELETION_TYPES: list[DeletionType] = []

def __init__(self) -> None:
def __init__(self, dry_run: bool = False) -> None:
super().__init__()
self.dry_run = dry_run
if self.dry_run:
logger.warn("Dry run enabled, no deletions will be performed")
self.sync_execute = self._dry_sync_execute
else:
self.sync_execute = self._query

def run(self):
queued_deletions = list(
Expand Down Expand Up @@ -84,3 +91,12 @@ def _conditions(self, async_deletions: list[AsyncDeletion]) -> tuple[list[str],
@abstractmethod
def _condition(self, async_deletion: AsyncDeletion, suffix: str) -> tuple[str, dict]:
raise NotImplementedError()

@staticmethod
def _query(query, args, settings):
logger.info("Executing query", {"query": query, "args": args, "settings": settings})
return sync_execute(query, args, settings)

@staticmethod
def _dry_sync_execute(query, args, settings):
logger.info("Dry run", {"query": query, "args": args, "settings": settings})
5 changes: 2 additions & 3 deletions posthog/models/async_deletion/delete_cohorts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Any

from posthog.client import sync_execute
from posthog.models.async_deletion import AsyncDeletion, DeletionType
from posthog.models.async_deletion.delete import AsyncDeletionProcess, logger

Expand All @@ -23,7 +22,7 @@

conditions, args = self._conditions(deletions)

sync_execute(
self._sync_execute(

Check failure on line 25 in posthog/models/async_deletion/delete_cohorts.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

"AsyncCohortDeletion" has no attribute "_sync_execute"
f"""
DELETE FROM cohortpeople
WHERE {" OR ".join(conditions)}
Expand All @@ -43,7 +42,7 @@

def _verify_by_column(self, distinct_columns: str, async_deletions: list[AsyncDeletion]) -> set[tuple[Any, ...]]:
conditions, args = self._conditions(async_deletions)
clickhouse_result = sync_execute(
clickhouse_result = self._query(
f"""
SELECT DISTINCT {distinct_columns}
FROM cohortpeople
Expand Down
9 changes: 4 additions & 5 deletions posthog/models/async_deletion/delete_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from prometheus_client import Counter

from posthog.client import sync_execute
from posthog.models.async_deletion import AsyncDeletion, DeletionType
from posthog.models.async_deletion.delete import AsyncDeletionProcess, logger
from posthog.settings.data_stores import CLICKHOUSE_CLUSTER
Expand Down Expand Up @@ -69,7 +68,7 @@ def process(self, deletions: list[AsyncDeletion]):
# If the query size is greater than the max predicate size, execute the query and reset the query predicate
if query_size > MAX_QUERY_SIZE:
logger.debug(f"Executing query with args: {args}")
sync_execute(
self.sync_execute(
query,
args,
settings={},
Expand All @@ -80,7 +79,7 @@ def process(self, deletions: list[AsyncDeletion]):
logger.debug(f"Executing query with args: {args}")

# This is the default condition if we don't hit the MAX_QUERY_SIZE
sync_execute(
self.sync_execute(
query,
args,
settings={},
Expand All @@ -103,7 +102,7 @@ def process(self, deletions: list[AsyncDeletion]):
conditions, args = self._conditions(team_deletions)
for table in TABLES_TO_DELETE_TEAM_DATA_FROM:
query = f"""DELETE FROM {table} ON CLUSTER '{CLICKHOUSE_CLUSTER}' WHERE {" OR ".join(conditions)}"""
sync_execute(
self.sync_execute(
query,
args,
settings={},
Expand All @@ -122,7 +121,7 @@ def _verify_by_group(self, deletion_type: int, async_deletions: list[AsyncDeleti

def _verify_by_column(self, distinct_columns: str, async_deletions: list[AsyncDeletion]) -> set[tuple[Any, ...]]:
conditions, args = self._conditions(async_deletions)
clickhouse_result = sync_execute(
clickhouse_result = self._query(
f"""
SELECT DISTINCT {distinct_columns}
FROM events
Expand Down
4 changes: 2 additions & 2 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,12 @@


@shared_task(ignore_result=True)
def clickhouse_clear_removed_data() -> None:
def clickhouse_clear_removed_data(dry_run=False) -> None:

Check failure on line 484 in posthog/tasks/tasks.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Function is missing a type annotation for one or more arguments
from posthog.models.async_deletion.delete_cohorts import AsyncCohortDeletion
from posthog.models.async_deletion.delete_events import AsyncEventDeletion
from posthog.pagerduty.pd import create_incident

runner = AsyncEventDeletion()
runner = AsyncEventDeletion(dry_run)

try:
runner.mark_deletions_done()
Expand Down
Loading