From 2c155c44bd4deb5a26b764336865f7d10ce36d72 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 19 Jul 2024 11:46:41 +0200 Subject: [PATCH] Drop unused datasets controller methods --- .../webapps/galaxy/controllers/dataset.py | 271 ------------------ templates/webapps/galaxy/dataset/display.mako | 4 - 2 files changed, 275 deletions(-) diff --git a/lib/galaxy/webapps/galaxy/controllers/dataset.py b/lib/galaxy/webapps/galaxy/controllers/dataset.py index d3e770fae928..a797210f0396 100644 --- a/lib/galaxy/webapps/galaxy/controllers/dataset.py +++ b/lib/galaxy/webapps/galaxy/controllers/dataset.py @@ -454,28 +454,6 @@ def _get_dataset_for_edit(self, trans, dataset_id): trans.app.security_agent.set_dataset_permission(data.dataset, permissions) return data, None - @web.expose - def imp(self, trans, dataset_id=None, **kwd): - """Import another user's dataset via a shared URL; dataset is added to user's current history.""" - # Set referer message. - referer = trans.request.referer - if referer and not referer.startswith(f"{trans.request.application_url}{url_for('/login')}"): - referer_message = f"return to the previous page" - else: - referer_message = f"go to Galaxy's start page" - # Error checking. - if not dataset_id: - return trans.show_error_message( - f"You must specify a dataset to import. You can {referer_message}.", use_panels=True - ) - # Do import. - cur_history = trans.get_history(create=True) - status, message = self._copy_datasets(trans, [dataset_id], [cur_history], imported=True) - message = ( - f"Dataset imported.
You can start using the dataset or {referer_message}." - ) - return trans.show_message(message, type=status, use_panels=True) - @web.expose def display_by_username_and_slug(self, trans, username, slug, filename=None, preview=True, **kwargs): """Display dataset by username and slug; because datasets do not yet have slugs, the slug is the dataset's id.""" @@ -512,34 +490,6 @@ def display_by_username_and_slug(self, trans, username, slug, filename=None, pre first_chunk=first_chunk, ) - @web.expose - def annotate_async(self, trans, id, new_annotation=None, **kwargs): - # TODO:?? why is this an access check only? - decoded_id = self.decode_id(id) - dataset = self.hda_manager.get_accessible(decoded_id, trans.user) - dataset = self.hda_manager.error_if_uploading(dataset) - if not dataset: - web.httpexceptions.HTTPNotFound() - if dataset and new_annotation: - # Sanitize annotation before adding it. - new_annotation = sanitize_html(new_annotation) - self.add_item_annotation(trans.sa_session, trans.get_user(), dataset, new_annotation) - with transaction(trans.sa_session): - trans.sa_session.commit() - return new_annotation - - @web.expose - def get_annotation_async(self, trans, id): - decoded_id = self.decode_id(id) - dataset = self.hda_manager.get_accessible(decoded_id, trans.user) - dataset = self.hda_manager.error_if_uploading(dataset) - if not dataset: - web.httpexceptions.HTTPNotFound() - annotation = self.get_item_annotation_str(trans.sa_session, trans.user, dataset) - if annotation and isinstance(annotation, str): - annotation = annotation.encode("ascii", "replace") # paste needs ascii here - return annotation - @web.expose def display_at(self, trans, dataset_id, filename=None, **kwd): """Sets up a dataset permissions so it is viewable at an external site""" @@ -723,177 +673,6 @@ def display_application( "You do not have permission to view this dataset at an external display application." ) - def _delete(self, trans, dataset_id): - message = None - status = "done" - id = None - try: - id = self.decode_id(dataset_id) - hda = self.hda_manager.get_owned(id, trans.user, current_history=trans.history) - self.hda_manager.error_unless_mutable(hda.history) - hda.mark_deleted() - hda.clear_associated_files() - trans.log_event(f"Dataset id {str(id)} marked as deleted") - self.hda_manager.stop_creating_job(hda, flush=True) - except Exception: - msg = f"HDA deletion failed (encoded: {dataset_id}, decoded: {id})" - log.exception(msg) - trans.log_event(msg) - message = "Dataset deletion failed" - status = "error" - return (message, status) - - def _undelete(self, trans, dataset_id): - message = None - status = "done" - id = None - try: - id = self.decode_id(dataset_id) - item = self.hda_manager.get_owned(id, trans.user, current_history=trans.history) - self.hda_manager.error_unless_mutable(item.history) - self.hda_manager.undelete(item) - trans.log_event(f"Dataset id {str(id)} has been undeleted") - except Exception: - msg = f"HDA undeletion failed (encoded: {dataset_id}, decoded: {id})" - log.exception(msg) - trans.log_event(msg) - message = "Dataset undeletion failed" - status = "error" - return (message, status) - - def _unhide(self, trans, dataset_id): - try: - id = self.decode_id(dataset_id) - item = self.hda_manager.get_owned(id, trans.user, current_history=trans.history) - self.hda_manager.error_unless_mutable(item.history) - item.mark_unhidden() - with transaction(trans.sa_session): - trans.sa_session.commit() - trans.log_event(f"Dataset id {str(id)} has been unhidden") - return True - except Exception: - return False - - def _purge(self, trans, dataset_id): - message = None - status = "done" - try: - id = self.decode_id(dataset_id) - user = trans.get_user() - hda = trans.sa_session.query(self.app.model.HistoryDatasetAssociation).get(id) - # Invalid HDA - assert hda, "Invalid history dataset ID" - - # If the user is anonymous, make sure the HDA is owned by the current session. - if not user: - current_history_id = trans.galaxy_session.current_history_id - assert hda.history.id == current_history_id, "Data does not belong to current user" - # If the user is known, make sure the HDA is owned by the current user. - else: - assert hda.history.user == user, "Data does not belong to current user" - - # Ensure HDA is deleted - hda.deleted = True - # HDA is purgeable - # Decrease disk usage first - hda.purge_usage_from_quota(user, hda.dataset.quota_source_info) - # Mark purged - hda.purged = True - trans.sa_session.add(hda) - trans.log_event(f"HDA id {hda.id} has been purged") - with transaction(trans.sa_session): - trans.sa_session.commit() - # Don't delete anything if there are active HDAs or any LDDAs, even if - # the LDDAs are deleted. Let the cleanup scripts get it in the latter - # case. - if hda.dataset.user_can_purge: - try: - hda.dataset.full_delete() - trans.log_event(f"Dataset id {hda.dataset.id} has been purged upon the purge of HDA id {hda.id}") - trans.sa_session.add(hda.dataset) - except Exception: - log.exception(f"Unable to purge dataset ({hda.dataset.id}) on purge of HDA ({hda.id}):") - with transaction(trans.sa_session): - trans.sa_session.commit() - except Exception: - msg = f"HDA purge failed (encoded: {dataset_id}, decoded: {id})" - log.exception(msg) - trans.log_event(msg) - message = "Dataset removal from disk failed" - status = "error" - return (message, status) - - @web.expose - def delete(self, trans, dataset_id, filename, show_deleted_on_refresh=False): - message, status = self._delete(trans, dataset_id) - return trans.response.send_redirect( - web.url_for( - controller="root", - action="history", - show_deleted=show_deleted_on_refresh, - message=message, - status=status, - ) - ) - - @web.expose - def delete_async(self, trans, dataset_id, filename): - message, status = self._delete(trans, dataset_id) - if status == "done": - return "OK" - else: - raise Exception(message) - - @web.expose - def undelete(self, trans, dataset_id, filename): - message, status = self._undelete(trans, dataset_id) - return trans.response.send_redirect( - web.url_for(controller="root", action="history", show_deleted=True, message=message, status=status) - ) - - @web.expose - def undelete_async(self, trans, dataset_id, filename): - message, status = self._undelete(trans, dataset_id) - if status == "done": - return "OK" - else: - raise Exception(message) - - @web.expose - def unhide(self, trans, dataset_id, filename): - if self._unhide(trans, dataset_id): - return trans.response.send_redirect(web.url_for(controller="root", action="history", show_hidden=True)) - raise Exception("Error unhiding") - - @web.expose - def purge(self, trans, dataset_id, filename, show_deleted_on_refresh=False): - if trans.app.config.allow_user_dataset_purge: - message, status = self._purge(trans, dataset_id) - else: - message = "Removal of datasets by users is not allowed in this Galaxy instance. Please contact your Galaxy administrator." - status = "error" - return trans.response.send_redirect( - web.url_for( - controller="root", - action="history", - show_deleted=show_deleted_on_refresh, - message=message, - status=status, - ) - ) - - @web.expose - def purge_async(self, trans, dataset_id, filename): - if trans.app.config.allow_user_dataset_purge: - message, status = self._purge(trans, dataset_id) - else: - message = "Removal of datasets by users is not allowed in this Galaxy instance. Please contact your Galaxy administrator." - status = "error" - if status == "done": - return "OK" - else: - raise Exception(message) - @web.expose def copy_datasets( self, @@ -1039,53 +818,3 @@ def copy_datasets( error_msg=error_msg, refresh_frames=refresh_frames, ) - - def _copy_datasets(self, trans, dataset_ids, target_histories, imported=False): - """Helper method for copying datasets.""" - user = trans.get_user() - done_msg = error_msg = "" - - invalid_datasets = 0 - if not dataset_ids or not target_histories: - error_msg = "You must provide both source datasets and target histories." - else: - # User must own target histories to copy datasets to them. - for history in target_histories: - if user != history.user: - error_msg = ( - error_msg - + "You do not have permission to add datasets to %i requested histories. " - % (len(target_histories)) - ) - for dataset_id in dataset_ids: - decoded_id = self.decode_id(dataset_id) - data = self.hda_manager.get_accessible(decoded_id, trans.user) - data = self.hda_manager.error_if_uploading(data) - - if data is None: - error_msg = f"{error_msg}You tried to copy a dataset that does not exist or that you do not have access to. " - invalid_datasets += 1 - else: - for hist in target_histories: - dataset_copy = data.copy() - if imported: - dataset_copy.name = f"imported: {dataset_copy.name}" - hist.add_dataset(dataset_copy) - with transaction(trans.sa_session): - trans.sa_session.commit() - num_datasets_copied = len(dataset_ids) - invalid_datasets - done_msg = "%i dataset%s copied to %i histor%s." % ( - num_datasets_copied, - iff(num_datasets_copied == 1, "", "s"), - len(target_histories), - iff(len(target_histories) == 1, "y", "ies"), - ) - trans.sa_session.refresh(history) - - if error_msg != "": - status = ERROR - message = error_msg - else: - status = SUCCESS - message = done_msg - return status, message diff --git a/templates/webapps/galaxy/dataset/display.mako b/templates/webapps/galaxy/dataset/display.mako index d389836b40f7..d77734acfba9 100644 --- a/templates/webapps/galaxy/dataset/display.mako +++ b/templates/webapps/galaxy/dataset/display.mako @@ -82,10 +82,6 @@ %if data.deleted:
You are viewing a deleted dataset. - %if data.history and data.history.user == trans.get_user(): -
- Undelete - %endif
%endif