Skip to content

Commit

Permalink
imprrove and refactor models for workflow invocation API
Browse files Browse the repository at this point in the history
- refactor parts of invocation schema into dedicated module
- define model for workflow invocation step
- improve sedcription of some invocation API params
- improve documentation
  • Loading branch information
martenson committed Sep 30, 2023
1 parent 5dacf2e commit be9d510
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 50 deletions.
19 changes: 6 additions & 13 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@
from galaxy.model.orm.now import now
from galaxy.model.orm.util import add_object_to_object_session
from galaxy.objectstore import ObjectStore
from galaxy.schema.invocation import (
InvocationState,
InvocationStepState,
)
from galaxy.schema.schema import (
DatasetCollectionPopulatedState,
DatasetState,
Expand Down Expand Up @@ -8047,13 +8051,7 @@ class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializabl
"state",
]

class states(str, Enum):
NEW = "new" # Brand new workflow invocation... maybe this should be same as READY
READY = "ready" # Workflow ready for another iteration of scheduling.
SCHEDULED = "scheduled" # Workflow has been scheduled.
CANCELLED = "cancelled"
FAILED = "failed"

states = InvocationState
non_terminal_states = [states.NEW, states.READY]

def create_subworkflow_invocation_for_step(self, step):
Expand Down Expand Up @@ -8621,12 +8619,7 @@ class WorkflowInvocationStep(Base, Dictifiable, Serializable):
"action",
]

class states(str, Enum):
NEW = "new" # Brand new workflow invocation step
READY = "ready" # Workflow invocation step ready for another iteration of scheduling.
SCHEDULED = "scheduled" # Workflow invocation step has been scheduled.
# CANCELLED = 'cancelled', TODO: implement and expose
# FAILED = 'failed', TODO: implement and expose
states = InvocationStepState

@property
def is_new(self):
Expand Down
130 changes: 129 additions & 1 deletion lib/galaxy/schema/invocation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from datetime import datetime
from enum import Enum
from typing import (
Any,
Dict,
Generic,
List,
Optional,
TypeVar,
Union,
Expand All @@ -18,7 +21,21 @@
Literal,
)

from galaxy.schema.fields import EncodedDatabaseIdField
from galaxy.schema import schema
from galaxy.schema.fields import (
DecodedDatabaseIdField,
EncodedDatabaseIdField,
literal_to_value,
)
from galaxy.schema.schema import Model

INVOCATION_STEP_OUTPUT_SRC = Literal["hda"]
INVOCATION_STEP_COLLECTION_OUTPUT_SRC = Literal["hdca"]

InvocationStepAction: bool = Field(
title="Action",
description="Whether to take action on the invocation step.",
)


class WarningReason(str, Enum):
Expand Down Expand Up @@ -214,3 +231,114 @@ class InvocationMessageResponseModel(BaseModel):

class Config:
orm_mode = True


class InvocationState(str, Enum):
NEW = "new" # Brand new workflow invocation... maybe this should be same as READY
READY = "ready" # Workflow ready for another iteration of scheduling.
SCHEDULED = "scheduled" # Workflow has been scheduled.
CANCELLED = "cancelled"
FAILED = "failed"


class InvocationStepState(str, Enum):
NEW = "new" # Brand new workflow invocation step
READY = "ready" # Workflow invocation step ready for another iteration of scheduling.
SCHEDULED = "scheduled" # Workflow invocation step has been scheduled.
# CANCELLED = 'cancelled', TODO: implement and expose
# FAILED = 'failed', TODO: implement and expose


class InvocationStepOutput(Model):
src: str = Field(
literal_to_value(INVOCATION_STEP_OUTPUT_SRC),
title="src",
description="The source model of the output.",
const=True,
mark_required_in_schema=True,
)
id: DecodedDatabaseIdField = Field(
...,
title="Dataset ID",
description="Dataset ID of the workflow step output.",
)
uuid: Optional[schema.UUID4] = Field(
None,
title="UUID",
description="Universal unique identifier of the workflow step output dataset.",
)


class InvocationStepCollectionOutput(Model):
src: str = Field(
literal_to_value(INVOCATION_STEP_COLLECTION_OUTPUT_SRC),
title="src",
description="The source model of the output.",
const=True,
mark_required_in_schema=True,
)
id: DecodedDatabaseIdField = Field(
...,
title="Dataset Collection ID",
description="Dataset Collection ID of the workflow step output.",
)


class InvocationStep(Model):
"""Information about Workflow Invocation Step"""

model_class: schema.INVOCATION_STEP_MODEL_CLASS = schema.ModelClassField(schema.INVOCATION_STEP_MODEL_CLASS)
id: DecodedDatabaseIdField = schema.EntityIdField
update_time: Optional[datetime] = schema.UpdateTimeField
job_id: Optional[DecodedDatabaseIdField] = Field(
title="Job ID",
description="The encoded ID of the job associated with this workflow invocation step.",
)
workflow_step_id: DecodedDatabaseIdField = Field(
...,
title="Workflow step ID",
description="The encoded ID of the workflow step associated with this workflow invocation step.",
)
subworkflow_invocation_id: DecodedDatabaseIdField = Field(
title="Subworkflow invocation ID",
description="The encoded ID of the subworkflow invocation.",
)
state: InvocationStepState = Field(
...,
title="State of the invocation step",
description="Describes where in the scheduling process the workflow invocation step is.",
)
action: bool = InvocationStepAction
order_index: int = Field(
...,
title="Order index",
description="The index of the workflow step in the workflow.",
)
workflow_step_label: str = Field(
title="Step label",
description="The label of the workflow step",
)
workflow_step_uuid: Optional[schema.UUID4] = Field(
None,
title="UUID",
description="Universal unique identifier of the workflow step.",
)
outputs: Dict[str, InvocationStepOutput] = Field(
{},
title="Outputs",
description="The outputs of the workflow invocation step.",
)
output_collections: Dict[str, InvocationStepCollectionOutput] = Field(
{},
title="Output collections",
description="The dataset collection outputs of the workflow invocation step.",
)
jobs: List[schema.JobBaseModel] = Field(
[],
title="Jobs",
description="Jobs associated with the workflow invocation step.",
)


class InvocationUpdatePayload(Model):
action: bool = InvocationStepAction
8 changes: 1 addition & 7 deletions lib/galaxy/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
JOB_MODEL_CLASS = Literal["Job"]
STORED_WORKFLOW_MODEL_CLASS = Literal["StoredWorkflow"]
PAGE_MODEL_CLASS = Literal["Page"]
INVOCATION_STEP_MODEL_CLASS = Literal["WorkflowInvocationStep"]

OptionalNumberT = Optional[Union[int, float]]

Expand Down Expand Up @@ -1374,13 +1375,6 @@ class JobIndexQueryPayload(Model):
offset: int = 0


class InvocationUpdatePayload(Model):
action: bool = Field(
title="Action",
description="Whether to take action.",
)


class InvocationSortByEnum(str, Enum):
create_time = "create_time"
update_time = "update_time"
Expand Down
77 changes: 59 additions & 18 deletions lib/galaxy/webapps/galaxy/api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from markupsafe import escape
from pydantic import Extra
from starlette.responses import StreamingResponse
from typing_extensions import Annotated

from galaxy import (
exceptions,
Expand All @@ -45,11 +46,14 @@
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.model.store import BcoExportOptions
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.invocation import InvocationMessageResponseModel
from galaxy.schema.invocation import (
InvocationMessageResponseModel,
InvocationStep,
InvocationUpdatePayload,
)
from galaxy.schema.schema import (
AsyncFile,
AsyncTaskResultSummary,
InvocationUpdatePayload,
SetSlugPayload,
ShareWithPayload,
ShareWithStatus,
Expand Down Expand Up @@ -1208,6 +1212,29 @@ def get_workflow_menu(
)


StepDetailQueryParam = Annotated[
bool,
Query(
title="Include step details",
description=(
"Include details for individual invocation steps and populate a steps attribute in the resulting dictionary."
),
),
]
LegacyJobStateQueryParam = Annotated[
bool,
Query(
title="Replace with job state",
description=(
"""Populate the invocation step state with the job state instead of the invocation step state.
This will also produce one step per job in mapping jobs to mimic the older behavior with respect to collections.
Partially scheduled steps may provide incomplete information and the listed steps outputs
are not the mapped over step outputs but the individual job outputs."""
),
),
]


@router.cbv
class FastAPIInvocations:
invocations_service: InvocationsService = depends(InvocationsService)
Expand Down Expand Up @@ -1250,8 +1277,8 @@ def show_invocation(
self,
trans: ProvidesUserContext = DependsOnTrans,
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
step_details: Optional[bool] = False,
legacy_job_state: Optional[bool] = False,
step_details: StepDetailQueryParam = False,
legacy_job_state: LegacyJobStateQueryParam = False,
):
serialization_params = InvocationSerializationParams(
step_details=step_details, legacy_job_state=legacy_job_state
Expand All @@ -1273,19 +1300,19 @@ def show_workflow_invocation(
trans: ProvidesUserContext = DependsOnTrans,
workflow_id: DecodedDatabaseIdField = StoredWorkflowIDPathParam,
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
step_details: Optional[bool] = False,
legacy_job_state: Optional[bool] = False,
step_details: StepDetailQueryParam = False,
legacy_job_state: LegacyJobStateQueryParam = False,
):
"""A wrapper for multiple API endpoints providing the same logic."""
"""An alias for `GET /api/invocations/{invocation_id}`. `workflow_id` is ignored."""
return self.show_invocation(trans, invocation_id, step_details, legacy_job_state)

@router.delete("/api/invocations/{invocation_id}", summary="Cancel the specified workflow invocation.")
def cancel_invocation(
self,
trans: ProvidesUserContext = DependsOnTrans,
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
step_details: Optional[bool] = False,
legacy_job_state: Optional[bool] = False,
step_details: StepDetailQueryParam = False,
legacy_job_state: LegacyJobStateQueryParam = False,
):
serialization_params = InvocationSerializationParams(
step_details=step_details, legacy_job_state=legacy_job_state
Expand All @@ -1306,10 +1333,11 @@ def cancel_workflow_invocation(
trans: ProvidesUserContext = DependsOnTrans,
workflow_id: DecodedDatabaseIdField = StoredWorkflowIDPathParam,
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
step_details: Optional[bool] = False,
legacy_job_state: Optional[bool] = False,
step_details: StepDetailQueryParam = False,
legacy_job_state: LegacyJobStateQueryParam = False,
):
"""A wrapper for multiple API endpoints providing the same logic."""
"""An alias for `DELETE /api/invocations/{invocation_id}`. `workflow_id` is ignored."""

return self.cancel_invocation(trans, invocation_id, step_details, legacy_job_state)

@router.get(
Expand Down Expand Up @@ -1338,7 +1366,7 @@ def show_workflow_invocation_report(
workflow_id: DecodedDatabaseIdField = StoredWorkflowIDPathParam,
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
):
"""A wrapper for multiple API endpoints providing the same logic."""
"""An alias for `GET /api/invocations/{invocation_id}/report`. `workflow_id` is ignored."""
return self.show_invocation_report(trans, invocation_id)

@router.get(
Expand Down Expand Up @@ -1378,9 +1406,20 @@ def show_workflow_invocation_report_pdf(
workflow_id: DecodedDatabaseIdField = StoredWorkflowIDPathParam,
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
):
"""A wrapper for multiple API endpoints providing the same logic."""
"""An alias for `GET /api/invocations/{invocation_id}/report.pdf`. `workflow_id` is ignored."""
return self.show_invocation_report_pdf(trans, invocation_id)

@router.get(
"/api/invocations/steps/{step_id}",
summary="Show details of workflow invocation step.",
)
def step(
self,
trans: ProvidesUserContext = DependsOnTrans,
step_id: DecodedDatabaseIdField = WorkflowInvocationStepIDPathParam,
) -> InvocationStep:
return self.invocations_service.show_invocation_step(trans, step_id)

@router.get(
"/api/invocations/{invocation_id}/steps/{step_id}",
summary="Show details of workflow invocation step.",
Expand All @@ -1391,7 +1430,8 @@ def invocation_step(
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
step_id: DecodedDatabaseIdField = WorkflowInvocationStepIDPathParam,
):
return self.invocations_service.show_invocation_step(trans, step_id)
"""An alias for `GET /api/invocations/steps/{step_id}`. `invocation_id` is ignored."""
return self.step(trans, step_id)

@router.get(
"/api/workflows/{workflow_id}/invocations/{invocation_id}/steps/{step_id}",
Expand All @@ -1409,6 +1449,7 @@ def workflow_invocation_step(
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
step_id: DecodedDatabaseIdField = WorkflowInvocationStepIDPathParam,
):
"""An alias for `GET /api/invocations/{invocation_id}/steps/{step_id}`. `workflow_id` and `invocation_id` are ignored."""
return self.invocation_step(trans, step_id=step_id)

@router.put(
Expand Down Expand Up @@ -1441,7 +1482,7 @@ def update_workflow_invocation_step(
step_id: DecodedDatabaseIdField = WorkflowInvocationStepIDPathParam,
payload: InvocationUpdatePayload = Body(...),
):
"""A wrapper for multiple API endpoints providing the same logic."""
"""An alias for `PUT /api/invocations/{invocation_id}/steps/{step_id}`. `workflow_id` is ignored."""
return self.update_invocation_step(trans, step_id=step_id, payload=payload)

@router.get(
Expand Down Expand Up @@ -1476,7 +1517,7 @@ def workflow_invocation_step_jobs_summary(
workflow_id: DecodedDatabaseIdField = StoredWorkflowIDPathParam,
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
):
"""A wrapper for multiple API endpoints providing the same logic."""
"""An alias for `GET /api/invocations/{invocation_id}/step_jobs_summary`. `workflow_id` is ignored."""
return self.invocation_step_jobs_summary(trans, invocation_id)

@router.get(
Expand Down Expand Up @@ -1511,7 +1552,7 @@ def workflow_invocation_jobs_summary(
workflow_id: DecodedDatabaseIdField = StoredWorkflowIDPathParam,
invocation_id: DecodedDatabaseIdField = InvocationIDPathParam,
):
"""A wrapper for multiple API endpoints providing the same logic."""
"""An alias for `GET /api/invocations/{invocation_id}/jobs_summary`. `workflow_id` is ignored."""
return self.invocation_jobs_summary(trans, invocation_id)

# TODO: remove this endpoint after 23.1 release
Expand Down
Loading

0 comments on commit be9d510

Please sign in to comment.