Skip to content

Commit

Permalink
chore(data-warehouse): delete folder when source is deleted (#19059)
Browse files Browse the repository at this point in the history
delete folder when source is deleted
  • Loading branch information
EDsCODE authored Dec 4, 2023
1 parent 4561ac4 commit a7b4684
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
6 changes: 6 additions & 0 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
trigger_external_data_workflow,
delete_external_data_schedule,
cancel_external_data_workflow,
delete_data_import_folder,
)
from posthog.warehouse.models import ExternalDataSource
from posthog.warehouse.models import ExternalDataJob
Expand Down Expand Up @@ -128,6 +129,11 @@ def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response:
)
if latest_job and latest_job.workflow_id and latest_job.status == "Running":
cancel_external_data_workflow(latest_job.workflow_id)
try:
delete_data_import_folder(latest_job.folder_path)
except Exception as e:
logger.exception(f"Could not clean up data import folder: {latest_job.folder_path}", exc_info=e)
pass

delete_external_data_schedule(instance)
return super().destroy(request, *args, **kwargs)
Expand Down
12 changes: 12 additions & 0 deletions posthog/warehouse/data_load/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from temporalio.client import Client as TemporalClient
from asgiref.sync import async_to_sync

from django.conf import settings
import s3fs


def sync_external_data_job_workflow(external_data_source: ExternalDataSource, create: bool = False) -> str:
temporal = sync_connect()
Expand Down Expand Up @@ -94,3 +97,12 @@ def cancel_external_data_workflow(workflow_id: str):
async def cancel_workflow(temporal: TemporalClient, workflow_id: str):
handle = temporal.get_workflow_handle(workflow_id)
await handle.cancel()


def delete_data_import_folder(folder_path: str):
s3 = s3fs.S3FileSystem(
key=settings.AIRBYTE_BUCKET_KEY,
secret=settings.AIRBYTE_BUCKET_SECRET,
)
bucket_name = settings.BUCKET_URL
s3.delete(f"{bucket_name}/{folder_path}", recursive=True)

0 comments on commit a7b4684

Please sign in to comment.