Skip to content

Commit

Permalink
Rewrite datasets cleanup API action
Browse files Browse the repository at this point in the history
Purge associations and datasets in 2 steps:
First synchronously mark all associations as purged in one transaction.
Then purge internal datasets in a celery task or synchronously if not available.

This will ensure that the associations are marked as purged when the request finishes keeping the results consistent between subsequent requests.
  • Loading branch information
davelopez committed Feb 7, 2023
1 parent da57f8a commit a04c4d5
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions lib/galaxy/managers/hdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
)
from galaxy.schema.tasks import (
MaterializeDatasetInstanceTaskRequest,
PurgeDatasetsTaskRequest,
RequestUser,
)
from galaxy.structured_app import (
Expand Down Expand Up @@ -332,8 +333,9 @@ def _set_permissions(self, trans, hda, role_ids_dict):


class HDAStorageCleanerManager(base.StorageCleanerManager):
def __init__(self, hda_manager: HDAManager):
def __init__(self, hda_manager: HDAManager, dataset_manager: datasets.DatasetManager):
self.hda_manager = hda_manager
self.dataset_manager = dataset_manager
self.sort_map = {
StoredItemOrderBy.NAME_ASC: asc(model.HistoryDatasetAssociation.name),
StoredItemOrderBy.NAME_DSC: desc(model.HistoryDatasetAssociation.name),
Expand Down Expand Up @@ -405,24 +407,41 @@ def cleanup_items(self, user: model.User, item_ids: Set[int]) -> StorageItemsCle
success_item_count = 0
total_free_bytes = 0
errors: List[StorageItemCleanupError] = []
dataset_ids_to_remove: Set[int] = set()

with self.hda_manager.session().begin():
for hda_id in item_ids:
try:
hda = self.hda_manager.get_owned(hda_id, user)
self.hda_manager.purge(hda)
hda: model.HistoryDatasetAssociation = self.hda_manager.get_owned(hda_id, user)
hda.deleted = True
quota_amount = int(hda.quota_amount(user))
hda.purge_usage_from_quota(user)
hda.purged = True
dataset_ids_to_remove.add(hda.dataset.id)
success_item_count += 1
total_free_bytes += int(hda.get_size())
total_free_bytes += quota_amount
except BaseException as e:
errors.append(StorageItemCleanupError(item_id=hda_id, error=str(e)))

self._request_full_delete_all(dataset_ids_to_remove)

return StorageItemsCleanupResult(
total_item_count=len(item_ids),
success_item_count=success_item_count,
total_free_bytes=total_free_bytes,
errors=errors,
)

def _request_full_delete_all(self, dataset_ids_to_remove: Set[int]):
use_tasks = self.dataset_manager.app.config.enable_celery_tasks
request = PurgeDatasetsTaskRequest(dataset_ids=list(dataset_ids_to_remove))
if use_tasks:
from galaxy.celery.tasks import purge_datasets

purge_datasets.delay(request=request)
else:
self.dataset_manager.purge_datasets(request)


class HDASerializer( # datasets._UnflattenedMetadataDatasetAssociationSerializer,
datasets.DatasetAssociationSerializer[HDAManager],
Expand Down

0 comments on commit a04c4d5

Please sign in to comment.