Skip to content

Commit

Permalink
fix(data-warehouse): source deletion fix (#25664)
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE authored Oct 18, 2024
1 parent d4d5ca5 commit 0727c88
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
12 changes: 6 additions & 6 deletions posthog/temporal/tests/external_data/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ExternalDataJob,
ExternalDataSource,
ExternalDataSchema,
get_external_data_job,
)

from posthog.temporal.data_imports.pipelines.schemas import (
Expand Down Expand Up @@ -379,9 +380,7 @@ async def setup_job_1():
schema=customer_schema,
)

new_job = await sync_to_async(
ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").prefetch_related("schema").get
)()
new_job = await get_external_data_job(new_job.id)

inputs = ImportDataActivityInputs(
team_id=team.id,
Expand All @@ -403,16 +402,17 @@ async def setup_job_2():
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

charge_schema = await _create_schema("Charge", new_source, team)

new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
schema=charge_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

charge_schema = await _create_schema("Charge", new_source, team)
new_job = await get_external_data_job(new_job.id)

inputs = ImportDataActivityInputs(
team_id=team.id,
Expand Down
15 changes: 5 additions & 10 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,21 +690,16 @@ def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response:
if latest_running_job and latest_running_job.workflow_id and latest_running_job.status == "Running":
cancel_external_data_workflow(latest_running_job.workflow_id)

all_jobs = ExternalDataJob.objects.filter(
pipeline_id=instance.pk, team_id=instance.team_id, status="Completed"
).all()
for job in all_jobs:
try:
delete_data_import_folder(job.folder_path())
except Exception as e:
logger.exception(f"Could not clean up data import folder: {job.folder_path()}", exc_info=e)
pass

for schema in (
ExternalDataSchema.objects.exclude(deleted=True)
.filter(team_id=self.team_id, source_id=instance.id, should_sync=True)
.all()
):
try:
delete_data_import_folder(schema.folder_path())
except Exception as e:
logger.exception(f"Could not clean up data import folder: {schema.folder_path()}", exc_info=e)
pass
delete_external_data_schedule(str(schema.id))

delete_external_data_schedule(str(instance.id))
Expand Down
5 changes: 4 additions & 1 deletion posthog/warehouse/models/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ class Status(models.TextChoices):
__repr__ = sane_repr("id")

def folder_path(self) -> str:
return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema_id)}".lower().replace("-", "_")
if self.schema:
return self.schema.folder_path()
else:
raise ValueError("Job does not have a schema")

def deprecated_folder_path(self) -> str:
return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.pk)}".lower().replace("-", "_")
Expand Down
3 changes: 3 additions & 0 deletions posthog/warehouse/models/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class SyncFrequency(models.TextChoices):

__repr__ = sane_repr("name")

def folder_path(self) -> str:
return f"team_{self.team_id}_{self.source.source_type}_{str(self.id)}".lower().replace("-", "_")

@property
def is_incremental(self):
return self.sync_type == self.SyncType.INCREMENTAL
Expand Down

0 comments on commit 0727c88

Please sign in to comment.