diff --git a/frinx/common/workflow/task.py b/frinx/common/workflow/task.py index b0effd2..dca9699 100644 --- a/frinx/common/workflow/task.py +++ b/frinx/common/workflow/task.py @@ -1,8 +1,9 @@ -import typing from enum import Enum from typing import Any from typing import Optional from typing import TypeAlias +from typing import Union +from typing import cast from pydantic import BaseModel from pydantic import ConfigDict @@ -14,6 +15,7 @@ from frinx.common.conductor_enums import DoWhileEvaluatorType from frinx.common.conductor_enums import SwitchEvaluatorType +from frinx.common.conductor_enums import TaskResultStatus from frinx.common.conductor_enums import WorkflowStatus from frinx.common.util import snake_to_camel_case from frinx.common.worker.worker import WorkerImpl @@ -93,6 +95,10 @@ class WorkflowTaskImpl(BaseModel): populate_by_name=True ) + @property + def status(self) -> TaskResultStatus: + return cast(TaskResultStatus, f'${{{self.task_reference_name}.status}}') + def output_ref(self, path: str | None = None) -> str: if not path or path is None: return f'${{{self.task_reference_name}.output}}' @@ -118,7 +124,7 @@ class DynamicForkTaskInputParameters(BaseModel): class DynamicForkTaskFromDefInputParameters(BaseModel): - dynamic_tasks: typing.Union[str, object] + dynamic_tasks: Union[str, object] dynamic_tasks_input: str model_config = ConfigDict( @@ -181,7 +187,7 @@ class DynamicForkTask(WorkflowTaskImpl): type: TaskType = TaskType.FORK_JOIN_DYNAMIC dynamic_fork_tasks_param: str = Field(default='dynamicTasks') dynamic_fork_tasks_input_param_name: str = Field(default='dynamicTasksInput') - input_parameters: typing.Union[ + input_parameters: Union[ DynamicForkArraysTaskInputParameters, DynamicForkTaskInputParameters, DynamicForkArraysTaskFromDefInputParameters, DynamicForkTaskFromDefInputParameters @@ -336,7 +342,7 @@ def check_input_values(cls, values: dict[str, Any]) -> dict[str, Any]: class StartWorkflowTaskInputParameters(BaseModel): - start_workflow: typing.Union[StartWorkflowTaskPlainInputParameters, StartWorkflowTaskFromDefInputParameters] + start_workflow: Union[StartWorkflowTaskPlainInputParameters, StartWorkflowTaskFromDefInputParameters] model_config = ConfigDict( validate_assignment=True,