Skip to content

Commit

Permalink
Refactor the create operation and move several helper methods to the …
Browse files Browse the repository at this point in the history
…service layer
  • Loading branch information
heisner-tillman committed May 14, 2024
1 parent 10c8d6d commit 93f7bee
Show file tree
Hide file tree
Showing 2 changed files with 334 additions and 289 deletions.
304 changes: 15 additions & 289 deletions lib/galaxy/webapps/galaxy/api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
API operations for Workflows
"""

import json
import logging
import os
from io import BytesIO
from typing import (
Any,
Expand All @@ -21,7 +19,6 @@
Response,
status,
)
from markupsafe import escape
from pydantic import (
UUID1,
UUID4,
Expand All @@ -33,20 +30,14 @@
exceptions,
util,
)
from galaxy.files.uris import (
stream_url_to_str,
validate_uri_access,
)
from galaxy.managers.context import (
ProvidesHistoryContext,
ProvidesUserContext,
)
from galaxy.managers.workflows import (
RefactorRequest,
RefactorResponse,
WorkflowCreateOptions,
)
from galaxy.model.base import transaction
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.invocation import (
Expand Down Expand Up @@ -78,6 +69,7 @@
SetWorkflowMenuPayload,
SetWorkflowMenuSummary,
StoredWorkflowDetailed,
WorkflowCreatePayload,
WorkflowDictEditorSummary,
WorkflowDictExportSummary,
WorkflowDictFormat2Summary,
Expand All @@ -87,14 +79,12 @@
WorkflowUpdatePayload,
)
from galaxy.structured_app import StructuredApp
from galaxy.tool_shed.galaxy_install.install_manager import InstallRepositoryManager
from galaxy.tools import recommendations
from galaxy.tools.parameters import populate_state
from galaxy.tools.parameters.workflow_utils import workflow_building_modes
from galaxy.web import expose_api
from galaxy.webapps.base.controller import (
SharableMixin,
url_for,
UsesStoredWorkflowMixin,
)
from galaxy.webapps.base.webapp import GalaxyWebTransaction
Expand All @@ -121,7 +111,6 @@
WorkflowIndexPayload,
WorkflowsService,
)
from galaxy.workflow.extract import extract_workflow
from galaxy.workflow.modules import module_factory

log = logging.getLogger(__name__)
Expand All @@ -146,142 +135,6 @@ def __init__(self, app: StructuredApp):
self.workflow_contents_manager = app.workflow_contents_manager
self.tool_recommendations = recommendations.ToolRecommendations()

@expose_api
def create(self, trans: GalaxyWebTransaction, payload=None, **kwd):
"""
POST /api/workflows
Create workflows in various ways.
:param from_history_id: Id of history to extract a workflow from.
:type from_history_id: str
:param job_ids: If from_history_id is set - optional list of jobs to include when extracting a workflow from history
:type job_ids: str
:param dataset_ids: If from_history_id is set - optional list of HDA "hid"s corresponding to workflow inputs when extracting a workflow from history
:type dataset_ids: str
:param dataset_collection_ids: If from_history_id is set - optional list of HDCA "hid"s corresponding to workflow inputs when extracting a workflow from history
:type dataset_collection_ids: str
:param workflow_name: If from_history_id is set - name of the workflow to create when extracting a workflow from history
:type workflow_name: str
"""
ways_to_create = {
"archive_file",
"archive_source",
"from_history_id",
"from_path",
"shared_workflow_id",
"workflow",
}

if trans.user_is_bootstrap_admin:
raise exceptions.RealUserRequiredException("Only real users can create or run workflows.")

if payload is None or len(ways_to_create.intersection(payload)) == 0:
message = f"One parameter among - {', '.join(ways_to_create)} - must be specified"
raise exceptions.RequestParameterMissingException(message)

if len(ways_to_create.intersection(payload)) > 1:
message = f"Only one parameter among - {', '.join(ways_to_create)} - must be specified"
raise exceptions.RequestParameterInvalidException(message)

if "archive_source" in payload or "archive_file" in payload:
archive_source = payload.get("archive_source")
archive_file = payload.get("archive_file")
archive_data = None
if archive_source:
validate_uri_access(archive_source, trans.user_is_admin, trans.app.config.fetch_url_allowlist_ips)
if archive_source.startswith("file://"):
workflow_src = {"src": "from_path", "path": archive_source[len("file://") :]}
payload["workflow"] = workflow_src
return self.__api_import_new_workflow(trans, payload, **kwd)
elif archive_source == "trs_tool":
server = None
trs_tool_id = None
trs_version_id = None
import_source = None
if "trs_url" in payload:
parts = self.app.trs_proxy.match_url(payload["trs_url"])
if parts:
server = self.app.trs_proxy.server_from_url(parts["trs_base_url"])
trs_tool_id = parts["tool_id"]
trs_version_id = parts["version_id"]
payload["trs_tool_id"] = trs_tool_id
payload["trs_version_id"] = trs_version_id
else:
raise exceptions.MessageException("Invalid TRS URL.")
else:
trs_server = payload.get("trs_server")
server = self.app.trs_proxy.get_server(trs_server)
trs_tool_id = payload.get("trs_tool_id")
trs_version_id = payload.get("trs_version_id")

archive_data = server.get_version_descriptor(trs_tool_id, trs_version_id)
else:
try:
archive_data = stream_url_to_str(
archive_source, trans.app.file_sources, prefix="gx_workflow_download"
)
import_source = "URL"
except Exception:
raise exceptions.MessageException(f"Failed to open URL '{escape(archive_source)}'.")
elif hasattr(archive_file, "file"):
uploaded_file = archive_file.file
uploaded_file_name = uploaded_file.name
if os.path.getsize(os.path.abspath(uploaded_file_name)) > 0:
archive_data = util.unicodify(uploaded_file.read())
import_source = "uploaded file"
else:
raise exceptions.MessageException("You attempted to upload an empty file.")
else:
raise exceptions.MessageException("Please provide a URL or file.")
return self.__api_import_from_archive(trans, archive_data, import_source, payload=payload)

if "from_history_id" in payload:
from_history_id = payload.get("from_history_id")
from_history_id = self.decode_id(from_history_id)
history = self.history_manager.get_accessible(from_history_id, trans.user, current_history=trans.history)

job_ids = [self.decode_id(_) for _ in payload.get("job_ids", [])]
dataset_ids = payload.get("dataset_ids", [])
dataset_collection_ids = payload.get("dataset_collection_ids", [])
workflow_name = payload["workflow_name"]
stored_workflow = extract_workflow(
trans=trans,
user=trans.user,
history=history,
job_ids=job_ids,
dataset_ids=dataset_ids,
dataset_collection_ids=dataset_collection_ids,
workflow_name=workflow_name,
)
item = stored_workflow.to_dict(value_mapper={"id": trans.security.encode_id})
item["url"] = url_for("workflow", id=item["id"])
return item

if "from_path" in payload:
from_path = payload.get("from_path")
object_id = payload.get("object_id")
workflow_src = {"src": "from_path", "path": from_path}
if object_id is not None:
workflow_src["object_id"] = object_id
payload["workflow"] = workflow_src
return self.__api_import_new_workflow(trans, payload, **kwd)

if "shared_workflow_id" in payload:
workflow_id = payload["shared_workflow_id"]
return self.__api_import_shared_workflow(trans, workflow_id, payload)

if "workflow" in payload:
return self.__api_import_new_workflow(trans, payload, **kwd)

# This was already raised above, but just in case...
raise exceptions.RequestParameterMissingException("No method for workflow creation supplied.")

@expose_api
def import_new_workflow_deprecated(self, trans: GalaxyWebTransaction, payload, **kwd):
"""
Expand All @@ -294,7 +147,7 @@ def import_new_workflow_deprecated(self, trans: GalaxyWebTransaction, payload, *
Deprecated in favor to POST /api/workflows with encoded 'workflow' in
payload the same way.
"""
return self.__api_import_new_workflow(trans, payload, **kwd)
return self.service._api_import_new_workflow(trans, payload, **kwd)

@expose_api
def build_module(self, trans: GalaxyWebTransaction, payload=None):
Expand Down Expand Up @@ -350,63 +203,6 @@ def get_tool_predictions(self, trans: ProvidesUserContext, payload, **kwd):
#
# -- Helper methods --
#
def __api_import_from_archive(self, trans: GalaxyWebTransaction, archive_data, source=None, payload=None):
payload = payload or {}
try:
data = json.loads(archive_data)
except Exception:
if "GalaxyWorkflow" in archive_data:
data = {"yaml_content": archive_data}
else:
raise exceptions.MessageException("The data content does not appear to be a valid workflow.")
if not data:
raise exceptions.MessageException("The data content is missing.")
raw_workflow_description = self.__normalize_workflow(trans, data)
workflow_create_options = WorkflowCreateOptions(**payload)
workflow, missing_tool_tups = self._workflow_from_dict(
trans, raw_workflow_description, workflow_create_options, source=source
)
workflow_id = workflow.id
workflow = workflow.latest_workflow

response = {
"message": f"Workflow '{escape(workflow.name)}' imported successfully.",
"status": "success",
"id": trans.security.encode_id(workflow_id),
}
if workflow.has_errors:
response["message"] = "Imported, but some steps in this workflow have validation errors."
response["status"] = "error"
elif len(workflow.steps) == 0:
response["message"] = "Imported, but this workflow has no steps."
response["status"] = "error"
elif workflow.has_cycles:
response["message"] = "Imported, but this workflow contains cycles."
response["status"] = "error"
return response

def __api_import_new_workflow(self, trans: GalaxyWebTransaction, payload, **kwd):
data = payload["workflow"]
raw_workflow_description = self.__normalize_workflow(trans, data)
workflow_create_options = WorkflowCreateOptions(**payload)
workflow, missing_tool_tups = self._workflow_from_dict(
trans,
raw_workflow_description,
workflow_create_options,
)
# galaxy workflow newly created id
workflow_id = workflow.id
# api encoded, id
encoded_id = trans.security.encode_id(workflow_id)
item = workflow.to_dict(value_mapper={"id": trans.security.encode_id})
item["annotations"] = [x.annotation for x in workflow.annotations]
item["url"] = url_for("workflow", id=encoded_id)
item["owner"] = workflow.user.username
item["number_of_steps"] = len(workflow.latest_workflow.steps)
return item

def __normalize_workflow(self, trans: GalaxyWebTransaction, as_dict):
return self.workflow_contents_manager.normalize_workflow_format(trans, as_dict)

@expose_api
def import_shared_workflow_deprecated(self, trans: GalaxyWebTransaction, payload, **kwd):
Expand All @@ -423,89 +219,7 @@ def import_shared_workflow_deprecated(self, trans: GalaxyWebTransaction, payload
workflow_id = payload.get("workflow_id", None)
if workflow_id is None:
raise exceptions.ObjectAttributeMissingException("Missing required parameter 'workflow_id'.")
self.__api_import_shared_workflow(trans, workflow_id, payload)

def __api_import_shared_workflow(self, trans: GalaxyWebTransaction, workflow_id, payload, **kwd):
try:
stored_workflow = self.get_stored_workflow(trans, workflow_id, check_ownership=False)
except Exception:
raise exceptions.ObjectNotFound("Malformed workflow id specified.")
if stored_workflow.importable is False:
raise exceptions.ItemAccessibilityException(
"The owner of this workflow has disabled imports via this link."
)
elif stored_workflow.deleted:
raise exceptions.ItemDeletionException("You can't import this workflow because it has been deleted.")
imported_workflow = self._import_shared_workflow(trans, stored_workflow)
item = imported_workflow.to_dict(value_mapper={"id": trans.security.encode_id})
encoded_id = trans.security.encode_id(imported_workflow.id)
item["url"] = url_for("workflow", id=encoded_id)
return item

def _workflow_from_dict(self, trans, data, workflow_create_options, source=None):
"""Creates a workflow from a dict.
Created workflow is stored in the database and returned.
"""
publish = workflow_create_options.publish
importable = workflow_create_options.is_importable
if publish and not importable:
raise exceptions.RequestParameterInvalidException("Published workflow must be importable.")

workflow_contents_manager = self.app.workflow_contents_manager
raw_workflow_description = workflow_contents_manager.ensure_raw_description(data)
created_workflow = workflow_contents_manager.build_workflow_from_raw_description(
trans,
raw_workflow_description,
workflow_create_options,
source=source,
)
if importable:
self._make_item_accessible(trans.sa_session, created_workflow.stored_workflow)
with transaction(trans.sa_session):
trans.sa_session.commit()

self._import_tools_if_needed(trans, workflow_create_options, raw_workflow_description)
return created_workflow.stored_workflow, created_workflow.missing_tools

def _import_tools_if_needed(self, trans, workflow_create_options, raw_workflow_description):
if not workflow_create_options.import_tools:
return

if not trans.user_is_admin:
raise exceptions.AdminRequiredException()

data = raw_workflow_description.as_dict

tools = {}
for key in data["steps"]:
item = data["steps"][key]
if item is not None:
if "tool_shed_repository" in item:
tool_shed_repository = item["tool_shed_repository"]
if (
"owner" in tool_shed_repository
and "changeset_revision" in tool_shed_repository
and "name" in tool_shed_repository
and "tool_shed" in tool_shed_repository
):
toolstr = (
tool_shed_repository["owner"]
+ tool_shed_repository["changeset_revision"]
+ tool_shed_repository["name"]
+ tool_shed_repository["tool_shed"]
)
tools[toolstr] = tool_shed_repository

irm = InstallRepositoryManager(self.app)
install_options = workflow_create_options.install_options
for k in tools:
item = tools[k]
tool_shed_url = f"https://{item['tool_shed']}/"
name = item["name"]
owner = item["owner"]
changeset_revision = item["changeset_revision"]
irm.install(tool_shed_url, name, owner, changeset_revision, install_options)
self.service._api_import_shared_workflow(trans, workflow_id, payload)

def __get_stored_workflow(self, trans, workflow_id, **kwd):
instance = util.string_as_bool(kwd.get("instance", "false"))
Expand Down Expand Up @@ -906,6 +620,18 @@ def delete_workflow(
self.service.delete(trans, workflow_id)
return Response(status_code=status.HTTP_204_NO_CONTENT)

@router.post(
"/api/workflows",
summary="Create workflows in various ways",
name="create_workflow",
)
def create(
self,
payload: WorkflowCreatePayload,
trans: ProvidesUserContext = DependsOnTrans,
):
return self.service.create(trans, payload)

@router.post(
"/api/workflows/{workflow_id}/undelete",
summary="Remove the deleted flag from a workflow.",
Expand Down
Loading

0 comments on commit 93f7bee

Please sign in to comment.