Skip to content

Commit

Permalink
Add method for update operation from Workflows API
Browse files Browse the repository at this point in the history
  • Loading branch information
heisner-tillman committed Apr 12, 2024
1 parent 1820ec3 commit 077822e
Showing 1 changed file with 103 additions and 0 deletions.
103 changes: 103 additions & 0 deletions lib/galaxy/webapps/galaxy/services/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
from galaxy.managers.histories import HistoryManager
from galaxy.managers.notification import NotificationManager
from galaxy.managers.workflows import (
MissingToolsException,
RefactorResponse,
WorkflowContentsManager,
WorkflowSerializer,
WorkflowsManager,
WorkflowUpdateOptions,
)
from galaxy.model import StoredWorkflow
from galaxy.model.base import transaction
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.schema.invocation import WorkflowInvocationResponse
from galaxy.schema.schema import (
InvocationsStateCounts,
Expand All @@ -47,7 +50,9 @@
WorkflowDictFormat2WrappedYamlSummary,
WorkflowDictPreviewSummary,
WorkflowDictRunSummary,
WorkflowUpdatePayload,
)
from galaxy.util.sanitize_html import sanitize_html
from galaxy.util.tool_shed.tool_shed_registry import Registry
from galaxy.webapps.galaxy.services.base import ServiceBase
from galaxy.webapps.galaxy.services.sharable import ShareableService
Expand All @@ -70,13 +75,15 @@ def __init__(
tool_shed_registry: Registry,
notification_manager: NotificationManager,
history_manager: HistoryManager,
uses_annotations: UsesAnnotations,
):
self._workflows_manager = workflows_manager
self._workflow_contents_manager = workflow_contents_manager
self._serializer = serializer
self.shareable_service = ShareableService(workflows_manager, serializer, notification_manager)
self._tool_shed_registry = tool_shed_registry
self._history_manager = history_manager
self._uses_annotations = uses_annotations

def download_workflow(self, trans, workflow_id, history_id, style, format, version, instance):
stored_workflow = self._workflows_manager.get_stored_accessible_workflow(
Expand Down Expand Up @@ -350,3 +357,99 @@ def __get_full_shed_url(self, url):
if url in shed_url:
return shed_url
return None

def update_workflow(
self,
trans: ProvidesHistoryContext,
payload: WorkflowUpdatePayload,
instance: Optional[bool],
workflow_id: int,
):
stored_workflow = self._workflows_manager.get_stored_workflow(trans, workflow_id, by_stored_id=not instance)
workflow_dict = payload.model_dump(exclude_unset=True)
if workflow_dict:
require_flush = False
raw_workflow_description = self._workflow_contents_manager.normalize_workflow_format(trans, workflow_dict)
workflow_dict = raw_workflow_description.as_dict
new_workflow_name = workflow_dict.get("name")
old_workflow = stored_workflow.latest_workflow
name_updated = new_workflow_name and new_workflow_name != stored_workflow.name
steps_updated = "steps" in workflow_dict
if name_updated and not steps_updated:
sanitized_name = sanitize_html(new_workflow_name or old_workflow.name)
if not sanitized_name:
raise exceptions.MessageException("Workflow must have a valid name.")
workflow = old_workflow.copy(user=trans.user)
workflow.stored_workflow = stored_workflow
workflow.name = sanitized_name
stored_workflow.name = sanitized_name
stored_workflow.latest_workflow = workflow
trans.sa_session.add(workflow, stored_workflow)
require_flush = True

if "hidden" in workflow_dict and stored_workflow.hidden != workflow_dict["hidden"]:
stored_workflow.hidden = workflow_dict["hidden"]
require_flush = True

if "published" in workflow_dict and stored_workflow.published != workflow_dict["published"]:
stored_workflow.published = workflow_dict["published"]
require_flush = True

if "importable" in workflow_dict and stored_workflow.importable != workflow_dict["importable"]:
stored_workflow.importable = workflow_dict["importable"]
require_flush = True

if "annotation" in workflow_dict and not steps_updated:
newAnnotation = sanitize_html(workflow_dict["annotation"])
self._uses_annotations.add_item_annotation(trans.sa_session, trans.user, stored_workflow, newAnnotation)
require_flush = True

if "menu_entry" in workflow_dict or "show_in_tool_panel" in workflow_dict:
show_in_panel = workflow_dict.get("menu_entry") or workflow_dict.get("show_in_tool_panel")
stored_workflow_menu_entries = trans.user.stored_workflow_menu_entries
if show_in_panel:
workflow_ids = [wf.stored_workflow_id for wf in stored_workflow_menu_entries]
if workflow_id not in workflow_ids:
menu_entry = model.StoredWorkflowMenuEntry()
menu_entry.stored_workflow = stored_workflow
stored_workflow_menu_entries.append(menu_entry)
trans.sa_session.add(menu_entry)
require_flush = True
else:
# remove if in list
entries = {x.stored_workflow_id: x for x in stored_workflow_menu_entries}
if workflow_id in entries:
stored_workflow_menu_entries.remove(entries[workflow_id])
require_flush = True
# set tags
if "tags" in workflow_dict:
trans.tag_handler.set_tags_from_list(
user=trans.user,
item=stored_workflow,
new_tags_list=workflow_dict["tags"],
)

if require_flush:
with transaction(trans.sa_session):
trans.sa_session.commit()

if "steps" in workflow_dict or "comments" in workflow_dict:
try:
workflow_update_options = WorkflowUpdateOptions(**payload.model_dump(exclude_unset=True))
workflow, errors = self._workflow_contents_manager.update_workflow_from_raw_description(
trans,
stored_workflow,
raw_workflow_description,
workflow_update_options,
)
except MissingToolsException:
raise exceptions.MessageException(
"This workflow contains missing tools. It cannot be saved until they have been removed from the workflow or installed."
)

else:
message = "Updating workflow requires dictionary containing 'workflow' attribute with new JSON description."
raise exceptions.RequestParameterInvalidException(message)
return StoredWorkflowDetailed(
**self._workflow_contents_manager.workflow_to_dict(trans, stored_workflow, style="instance")
)

0 comments on commit 077822e

Please sign in to comment.