Skip to content

Commit

Permalink
Allow workflows to consumer deferred {src: url} dicts..
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Sep 23, 2024
1 parent fa0f3c9 commit 6214878
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 18 deletions.
9 changes: 8 additions & 1 deletion client/src/api/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11856,7 +11856,14 @@ export interface components {
* InvocationState
* @enum {string}
*/
InvocationState: "new" | "ready" | "scheduled" | "cancelled" | "cancelling" | "failed";
InvocationState:
| "new"
| "requires_materialization"
| "ready"
| "scheduled"
| "cancelled"
| "cancelling"
| "failed";
/**
* InvocationStep
* @description Information about workflow invocation step
Expand Down
15 changes: 15 additions & 0 deletions lib/galaxy/config/schemas/config_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3738,6 +3738,21 @@ mapping:
Optional configuration file similar to `job_config_file` to specify
which Galaxy processes should schedule workflows.
workflow_scheduling_separate_materialization_iteration:
type: bool
default: false
required: false
desc: |
Workflows launched with URI/URL inputs that are not marked as 'deferred'
are "materialized" (or undeferred) by the workflow scheduler. This might be
a lengthy process. Setting this to 'True' will place the invocation back in
the queue after materialization before scheduling the workflow so it is less
likely to starve other workflow scheduling. Ideally, Galaxy would allow more
fine grain control of handlers but until then, this provides a way to tip the
balance between "doing more work" and "being more fair". The default here is
pretty arbitrary - it has been to False to optimize Galaxy for automated,
single user applications where "fairness" is mostly irrelevant.
cache_user_job_count:
type: bool
default: false
Expand Down
8 changes: 7 additions & 1 deletion lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ def setup_query(self):
if self.grab_model is model.Job:
grab_condition = self.grab_model.state == self.grab_model.states.NEW
elif self.grab_model is model.WorkflowInvocation:
grab_condition = self.grab_model.state.in_((self.grab_model.states.NEW, self.grab_model.states.CANCELLING))
grab_condition = self.grab_model.state.in_(
(
self.grab_model.states.NEW,
self.grab_model.states.REQUIRES_MATERIALIZATION,
self.grab_model.states.CANCELLING,
)
)
else:
raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented")
subq = (
Expand Down
28 changes: 27 additions & 1 deletion lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import string
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from datetime import (
datetime,
timedelta,
Expand Down Expand Up @@ -8549,6 +8550,12 @@ class StoredWorkflowMenuEntry(Base, RepresentById):
)


@dataclass
class InputWithRequest:
input: Any
request: Dict[str, Any]


class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializable):
__tablename__ = "workflow_invocation"

Expand Down Expand Up @@ -8780,6 +8787,7 @@ def poll_active_workflow_ids(engine, scheduler=None, handler=None):
and_conditions = [
or_(
WorkflowInvocation.state == WorkflowInvocation.states.NEW,
WorkflowInvocation.state == WorkflowInvocation.states.REQUIRES_MATERIALIZATION,
WorkflowInvocation.state == WorkflowInvocation.states.READY,
WorkflowInvocation.state == WorkflowInvocation.states.CANCELLING,
),
Expand Down Expand Up @@ -8871,6 +8879,14 @@ def input_associations(self):
inputs.append(input_dataset_collection_assoc)
return inputs

def inputs_requiring_materialization(self):
hdas_to_materialize = []
for input_dataset_assoc in self.input_datasets:
request = input_dataset_assoc.request
if request and not request.get("deferred", False):
hdas_to_materialize.append(input_dataset_assoc.dataset)
return hdas_to_materialize

def _serialize(self, id_encoder, serialization_options):
invocation_attrs = dict_for(self)
invocation_attrs["state"] = self.state
Expand Down Expand Up @@ -9033,20 +9049,28 @@ def attach_step(request_to_content):
else:
request_to_content.workflow_step = step

request: Optional[Dict[str, Any]] = None
if isinstance(content, InputWithRequest):
request = content.request
content = content.input

history_content_type = getattr(content, "history_content_type", None)
if history_content_type == "dataset":
request_to_content = WorkflowRequestToInputDatasetAssociation()
request_to_content.dataset = content
request_to_content.request = request
attach_step(request_to_content)
self.input_datasets.append(request_to_content)
elif history_content_type == "dataset_collection":
request_to_content = WorkflowRequestToInputDatasetCollectionAssociation()
request_to_content.dataset_collection = content
request_to_content.request = request
attach_step(request_to_content)
self.input_dataset_collections.append(request_to_content)
else:
request_to_content = WorkflowRequestInputStepParameter()
request_to_content.parameter_value = content
request_to_content.request = request
attach_step(request_to_content)
self.input_step_parameters.append(request_to_content)

Expand Down Expand Up @@ -9470,6 +9494,7 @@ class WorkflowRequestToInputDatasetAssociation(Base, Dictifiable, Serializable):
workflow_invocation_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_invocation.id"), index=True)
workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"))
dataset_id: Mapped[Optional[int]] = mapped_column(ForeignKey("history_dataset_association.id"), index=True)
request: Mapped[Optional[Dict]] = mapped_column(JSONType)

workflow_step: Mapped[Optional["WorkflowStep"]] = relationship()
dataset: Mapped[Optional["HistoryDatasetAssociation"]] = relationship()
Expand Down Expand Up @@ -9505,6 +9530,7 @@ class WorkflowRequestToInputDatasetCollectionAssociation(Base, Dictifiable, Seri
workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(
back_populates="input_dataset_collections"
)
request: Mapped[Optional[Dict]] = mapped_column(JSONType)

history_content_type = "dataset_collection"
dict_collection_visible_keys = ["id", "workflow_invocation_id", "workflow_step_id", "dataset_collection_id", "name"]
Expand All @@ -9528,6 +9554,7 @@ class WorkflowRequestInputStepParameter(Base, Dictifiable, Serializable):
workflow_invocation_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_invocation.id"), index=True)
workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"))
parameter_value: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
request: Mapped[Optional[Dict]] = mapped_column(JSONType)

workflow_step: Mapped[Optional["WorkflowStep"]] = relationship()
workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="input_step_parameters")
Expand All @@ -9551,7 +9578,6 @@ class WorkflowInvocationOutputDatasetAssociation(Base, Dictifiable, Serializable
workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"), index=True)
dataset_id: Mapped[Optional[int]] = mapped_column(ForeignKey("history_dataset_association.id"), index=True)
workflow_output_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_output.id"), index=True)

workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="output_datasets")
workflow_step: Mapped[Optional["WorkflowStep"]] = relationship()
dataset: Mapped[Optional["HistoryDatasetAssociation"]] = relationship()
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/schema/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ class InvocationMessageResponseModel(RootModel):

class InvocationState(str, Enum):
NEW = "new" # Brand new workflow invocation... maybe this should be same as READY
REQUIRES_MATERIALIZATION = "requires_materialization" # an otherwise NEW or READY workflow that requires inputs to be materialized (undeferred)
READY = "ready" # Workflow ready for another iteration of scheduling.
SCHEDULED = "scheduled" # Workflow has been scheduled.
CANCELLED = "cancelled"
Expand Down
7 changes: 6 additions & 1 deletion lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ def queue_invoke(
)
workflow_invocation = workflow_run_config_to_request(trans, workflow_run_config, workflow)
workflow_invocation.workflow = workflow
return trans.app.workflow_scheduling_manager.queue(workflow_invocation, request_params, flush=flush)
initial_state = model.WorkflowInvocation.states.NEW
if workflow_run_config.requires_materialization:
initial_state = model.WorkflowInvocation.states.REQUIRES_MATERIALIZATION
return trans.app.workflow_scheduling_manager.queue(
workflow_invocation, request_params, flush=flush, initial_state=initial_state
)


class WorkflowInvoker:
Expand Down
39 changes: 33 additions & 6 deletions lib/galaxy/workflow/run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
)

from galaxy import exceptions
from galaxy.managers.hdas import dereference_input
from galaxy.model import (
EffectiveOutput,
History,
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
InputWithRequest,
LibraryDataset,
LibraryDatasetDatasetAssociation,
WorkflowInvocation,
Expand All @@ -25,6 +27,7 @@
ensure_object_added_to_session,
transaction,
)
from galaxy.tool_util.parameters import DataRequestUri
from galaxy.tools.parameters.meta import expand_workflow_inputs
from galaxy.workflow.resources import get_resource_mapper_function

Expand Down Expand Up @@ -57,6 +60,9 @@ class WorkflowRunConfig:
:param inputs: Map from step ids to dict's containing HDA for these steps.
:type inputs: dict
:param requires_materialization: True if an input requires materialization before
the workflow is scheduled.
:param inputs_by: How inputs maps to inputs (datasets/collections) to workflows
steps - by unencoded database id ('step_id'), index in workflow
'step_index' (independent of database), or by input name for
Expand All @@ -78,6 +84,7 @@ def __init__(
copy_inputs_to_history: bool = False,
use_cached_job: bool = False,
resource_params: Optional[Dict[int, Any]] = None,
requires_materialization: bool = False,
preferred_object_store_id: Optional[str] = None,
preferred_outputs_object_store_id: Optional[str] = None,
preferred_intermediate_object_store_id: Optional[str] = None,
Expand All @@ -91,6 +98,7 @@ def __init__(
self.resource_params = resource_params or {}
self.allow_tool_state_corrections = allow_tool_state_corrections
self.use_cached_job = use_cached_job
self.requires_materialization = requires_materialization
self.preferred_object_store_id = preferred_object_store_id
self.preferred_outputs_object_store_id = preferred_outputs_object_store_id
self.preferred_intermediate_object_store_id = preferred_intermediate_object_store_id
Expand Down Expand Up @@ -310,7 +318,7 @@ def build_workflow_run_configs(
legacy = payload.get("legacy", False)
already_normalized = payload.get("parameters_normalized", False)
raw_parameters = payload.get("parameters", {})

requires_materialization: bool = False
run_configs = []
unexpanded_param_map = _normalize_step_parameters(
workflow.steps, raw_parameters, legacy=legacy, already_normalized=already_normalized
Expand Down Expand Up @@ -368,23 +376,30 @@ def build_workflow_run_configs(
raise exceptions.RequestParameterInvalidException(
f"Not input source type defined for input '{input_dict}'."
)
if "id" not in input_dict:
raise exceptions.RequestParameterInvalidException(f"Not input id defined for input '{input_dict}'.")
input_source = input_dict["src"]
if "id" not in input_dict and input_source != "url":
raise exceptions.RequestParameterInvalidException(f"No input id defined for input '{input_dict}'.")
elif input_source == "url" and not input_dict.get("url"):
raise exceptions.RequestParameterInvalidException(
f"Supplied 'url' is empty or absent for input '{input_dict}'."
)
if "content" in input_dict:
raise exceptions.RequestParameterInvalidException(
f"Input cannot specify explicit 'content' attribute {input_dict}'."
)
input_source = input_dict["src"]
input_id = input_dict["id"]
input_id = input_dict.get("id")
try:
added_to_history = False
if input_source == "ldda":
assert input_id
ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, trans.security.decode_id(input_id))
assert ldda
assert trans.user_is_admin or trans.app.security_agent.can_access_dataset(
trans.get_current_user_roles(), ldda.dataset
)
content = ldda.to_history_dataset_association(history, add_to_history=add_to_history)
elif input_source == "ld":
assert input_id
library_dataset = trans.sa_session.get(LibraryDataset, trans.security.decode_id(input_id))
assert library_dataset
ldda = library_dataset.library_dataset_dataset_association
Expand All @@ -394,18 +409,29 @@ def build_workflow_run_configs(
)
content = ldda.to_history_dataset_association(history, add_to_history=add_to_history)
elif input_source == "hda":
assert input_id
# Get dataset handle, add to dict and history if necessary
content = trans.sa_session.get(HistoryDatasetAssociation, trans.security.decode_id(input_id))
assert trans.user_is_admin or trans.app.security_agent.can_access_dataset(
trans.get_current_user_roles(), content.dataset
)
elif input_source == "hdca":
content = app.dataset_collection_manager.get_dataset_collection_instance(trans, "history", input_id)
elif input_source == "url":
data_request = DataRequestUri.model_validate(input_dict)
hda: HistoryDatasetAssociation = dereference_input(trans, data_request, history)
added_to_history = True
content = InputWithRequest(
input=hda,
request=data_request.model_dump(mode="json"),
)
if not data_request.deferred:
requires_materialization = True
else:
raise exceptions.RequestParameterInvalidException(
f"Unknown workflow input source '{input_source}' specified."
)
if add_to_history and content.history != history:
if not added_to_history and add_to_history and content.history != history:
if isinstance(content, HistoryDatasetCollectionAssociation):
content = content.copy(element_destination=history, flush=False)
else:
Expand Down Expand Up @@ -474,6 +500,7 @@ def build_workflow_run_configs(
allow_tool_state_corrections=allow_tool_state_corrections,
use_cached_job=use_cached_job,
resource_params=resource_params,
requires_materialization=requires_materialization,
preferred_object_store_id=preferred_object_store_id,
preferred_outputs_object_store_id=preferred_outputs_object_store_id,
preferred_intermediate_object_store_id=preferred_intermediate_object_store_id,
Expand Down
29 changes: 26 additions & 3 deletions lib/galaxy/workflow/scheduling_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import os
from functools import partial
from typing import Optional

import galaxy.workflow.schedulers
from galaxy import model
from galaxy.exceptions import HandlerAssignmentError
from galaxy.jobs.handler import InvocationGrabber
from galaxy.model.base import transaction
from galaxy.schema.invocation import InvocationState
from galaxy.schema.tasks import (
MaterializeDatasetInstanceTaskRequest,
RequestUser,
)
from galaxy.util import plugin_config
from galaxy.util.custom_logging import get_logger
from galaxy.util.monitors import Monitors
Expand Down Expand Up @@ -154,8 +160,9 @@ def shutdown(self):
if exception:
raise exception

def queue(self, workflow_invocation, request_params, flush=True):
workflow_invocation.set_state(model.WorkflowInvocation.states.NEW)
def queue(self, workflow_invocation, request_params, flush=True, initial_state: Optional[InvocationState] = None):
initial_state = initial_state or model.WorkflowInvocation.states.NEW
workflow_invocation.set_state(initial_state)
workflow_invocation.scheduler = request_params.get("scheduler", None) or self.default_scheduler_id
sa_session = self.app.model.context
sa_session.add(workflow_invocation)
Expand Down Expand Up @@ -329,7 +336,23 @@ def __schedule(self, workflow_scheduler_id, workflow_scheduler):
def __attempt_schedule(self, invocation_id, workflow_scheduler):
with self.app.model.context() as session:
workflow_invocation = session.get(model.WorkflowInvocation, invocation_id)

if workflow_invocation.state == workflow_invocation.states.REQUIRES_MATERIALIZATION:
hdas_to_materialize = workflow_invocation.inputs_requiring_materialization()
for hda in hdas_to_materialize:
user = RequestUser(user_id=workflow_invocation.history.user_id)
task_request = MaterializeDatasetInstanceTaskRequest(
user=user,
history_id=workflow_invocation.history.id,
source="hda",
content=hda.id,
)
self.app.hda_manager.materialize(task_request, in_place=True)
# place back into ready and let it proceed normally on next iteration?
workflow_invocation.set_state(model.WorkflowInvocation.states.READY)
session.add(workflow_invocation)
session.commit()
if self.app.config.workflow_scheduling_separate_materialization_iteration:
return None
try:
if workflow_invocation.state == workflow_invocation.states.CANCELLING:
workflow_invocation.cancel_invocation_steps()
Expand Down
Loading

0 comments on commit 6214878

Please sign in to comment.