Skip to content

Commit

Permalink
Allow workflows to consumer deferred {src: url} dicts..
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Sep 10, 2024
1 parent 3dad4c7 commit 6b7ec05
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 16 deletions.
15 changes: 15 additions & 0 deletions lib/galaxy/managers/hdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@
taggable,
users,
)
from galaxy.managers.context import ProvidesHistoryContext
from galaxy.model import (
Job,
JobStateHistory,
JobToOutputDatasetAssociation,
)
from galaxy.model.base import transaction
from galaxy.model.deferred import materializer_factory
from galaxy.model.dereference import dereference_to_model
from galaxy.schema.schema import DatasetSourceType
from galaxy.schema.storage_cleaner import (
CleanableItemsSummary,
Expand All @@ -68,6 +70,7 @@
MinimalManagerApp,
StructuredApp,
)
from galaxy.tool_util.parameters import DataRequestUri
from galaxy.util.compression_utils import get_fileobj

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -343,6 +346,18 @@ def _set_permissions(self, trans, hda, role_ids_dict):
raise exceptions.RequestParameterInvalidException(error)


def dereference_input(
trans: ProvidesHistoryContext, data_request: DataRequestUri, history: Optional[model.History] = None
) -> model.HistoryDatasetAssociation:
target_history = history or trans.history
hda = dereference_to_model(trans.sa_session, trans.user, target_history, data_request)
permissions = trans.app.security_agent.history_get_default_permissions(target_history)
trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False)
with transaction(trans.sa_session):
trans.sa_session.commit()
return hda


class HDAStorageCleanerManager(base.StorageCleanerManager):
def __init__(self, hda_manager: HDAManager, dataset_manager: datasets.DatasetManager):
self.hda_manager = hda_manager
Expand Down
12 changes: 5 additions & 7 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
ProvidesUserContext,
)
from galaxy.managers.datasets import DatasetManager
from galaxy.managers.hdas import HDAManager
from galaxy.managers.hdas import (
dereference_input,
HDAManager,
)
from galaxy.managers.histories import HistoryManager
from galaxy.managers.lddas import LDDAManager
from galaxy.managers.users import UserManager
Expand All @@ -67,7 +70,6 @@
YIELD_PER_ROWS,
)
from galaxy.model.base import transaction
from galaxy.model.dereference import dereference_to_model
from galaxy.model.index_filter_util import (
raw_text_column_filter,
text_column_filter,
Expand Down Expand Up @@ -1219,12 +1221,8 @@ def dereference(

def dereference_callback(data_request: DataRequestUri) -> DataRequestInternalHda:
# a deferred dataset corresponding to request
hda = dereference_to_model(trans.sa_session, trans.user, trans.history, data_request)
hda = dereference_input(trans, data_request)
new_hdas.append(DereferencedDatasetPair(hda, data_request))
permissions = trans.app.security_agent.history_get_default_permissions(trans.history)
trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False)
with transaction(trans.sa_session):
trans.sa_session.commit()
return DataRequestInternalHda(id=hda.id)

tool_state = RequestInternalToolState(tool_request.request)
Expand Down
21 changes: 17 additions & 4 deletions lib/galaxy/workflow/run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)

from galaxy import exceptions
from galaxy.managers.hdas import dereference_input
from galaxy.model import (
EffectiveOutput,
History,
Expand All @@ -25,6 +26,7 @@
ensure_object_added_to_session,
transaction,
)
from galaxy.tool_util.parameters import DataRequestUri
from galaxy.tools.parameters.meta import expand_workflow_inputs
from galaxy.workflow.resources import get_resource_mapper_function

Expand Down Expand Up @@ -368,23 +370,29 @@ def build_workflow_run_configs(
raise exceptions.RequestParameterInvalidException(
f"Not input source type defined for input '{input_dict}'."
)
if "id" not in input_dict:
raise exceptions.RequestParameterInvalidException(f"Not input id defined for input '{input_dict}'.")
input_source = input_dict["src"]
if "id" not in input_dict and input_source != "url":
raise exceptions.RequestParameterInvalidException(f"No input id defined for input '{input_dict}'.")
elif input_source == "url" and not input_dict.get("url"):
raise exceptions.RequestParameterInvalidException(
f"Supplied 'url' is empty or absent for input '{input_dict}'."
)
if "content" in input_dict:
raise exceptions.RequestParameterInvalidException(
f"Input cannot specify explicit 'content' attribute {input_dict}'."
)
input_source = input_dict["src"]
input_id = input_dict["id"]
input_id = input_dict.get("id")
try:
if input_source == "ldda":
assert input_id
ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, trans.security.decode_id(input_id))
assert ldda
assert trans.user_is_admin or trans.app.security_agent.can_access_dataset(
trans.get_current_user_roles(), ldda.dataset
)
content = ldda.to_history_dataset_association(history, add_to_history=add_to_history)
elif input_source == "ld":
assert input_id
library_dataset = trans.sa_session.get(LibraryDataset, trans.security.decode_id(input_id))
assert library_dataset
ldda = library_dataset.library_dataset_dataset_association
Expand All @@ -394,13 +402,18 @@ def build_workflow_run_configs(
)
content = ldda.to_history_dataset_association(history, add_to_history=add_to_history)
elif input_source == "hda":
assert input_id
# Get dataset handle, add to dict and history if necessary
content = trans.sa_session.get(HistoryDatasetAssociation, trans.security.decode_id(input_id))
assert trans.user_is_admin or trans.app.security_agent.can_access_dataset(
trans.get_current_user_roles(), content.dataset
)
elif input_source == "hdca":
content = app.dataset_collection_manager.get_dataset_collection_instance(trans, "history", input_id)
elif input_source == "url":
data_request = DataRequestUri.model_validate(input_dict)
hda: HistoryDatasetAssociation = dereference_input(trans, data_request, history)
content = hda
else:
raise exceptions.RequestParameterInvalidException(
f"Unknown workflow input source '{input_source}' specified."
Expand Down
13 changes: 11 additions & 2 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1496,11 +1496,20 @@ def test_run_workflow_by_name(self):
def test_run_workflow(self):
self.__run_cat_workflow(inputs_by="step_id")

def __run_cat_workflow(self, inputs_by):
@skip_without_tool("cat1")
def test_run_workflow_by_url(self):
with self.dataset_populator.test_history() as history_id:
self.__run_cat_workflow(inputs_by="url", history_id=history_id)
input_dataset_details = self.dataset_populator.get_history_dataset_details(history_id, hid=1)
assert input_dataset_details["state"] == "deferred"

def __run_cat_workflow(self, inputs_by, history_id: Optional[str] = None):
workflow = self.workflow_populator.load_workflow(name="test_for_run")
workflow["steps"]["0"]["uuid"] = str(uuid4())
workflow["steps"]["1"]["uuid"] = str(uuid4())
workflow_request, _, workflow_id = self._setup_workflow_run(workflow, inputs_by=inputs_by)
workflow_request, _, workflow_id = self._setup_workflow_run(
workflow, inputs_by=inputs_by, history_id=history_id
)
invocation_id = self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request).json()[
"id"
]
Expand Down
19 changes: 16 additions & 3 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2182,12 +2182,16 @@ def setup_workflow_run(
workflow_id = self.create_workflow(workflow)
if not history_id:
history_id = self.dataset_populator.new_history()
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3", wait=True)
hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6", wait=True)
hda1: Optional[Dict[str, Any]] = None
hda2: Optional[Dict[str, Any]] = None
label_map: Optional[Dict[str, Any]] = None
if inputs_by != "url":
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3", wait=True)
hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6", wait=True)
label_map = {"WorkflowInput1": ds_entry(hda1), "WorkflowInput2": ds_entry(hda2)}
workflow_request = dict(
history=f"hist_id={history_id}",
)
label_map = {"WorkflowInput1": ds_entry(hda1), "WorkflowInput2": ds_entry(hda2)}
if inputs_by == "step_id":
ds_map = self.build_ds_map(workflow_id, label_map)
workflow_request["ds_map"] = ds_map
Expand All @@ -2207,6 +2211,15 @@ def setup_workflow_run(
workflow_request["inputs"] = json.dumps(uuid_map)
if inputs_by == "step_uuid":
workflow_request["inputs_by"] = "step_uuid"
elif inputs_by == "url":
input_b64_1 = base64.b64encode("1 2 3".encode("utf-8")).decode("utf-8")
input_b64_2 = base64.b64encode("4 5 6".encode("utf-8")).decode("utf-8")
inputs = {
"WorkflowInput1": {"src": "url", "url": f"base64://{input_b64_1}", "ext": "txt"},
"WorkflowInput2": {"src": "url", "url": f"base64://{input_b64_2}", "ext": "txt"},
}
workflow_request["inputs"] = json.dumps(inputs)
workflow_request["inputs_by"] = "name"

return workflow_request, history_id, workflow_id

Expand Down

0 comments on commit 6b7ec05

Please sign in to comment.