Skip to content

Commit

Permalink
Add status property to WorkflowTaskImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
dpavlov-frinx committed Oct 3, 2024
1 parent 693168f commit 63e45c3
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions frinx/common/workflow/task.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import typing
from enum import Enum
from typing import Any
from typing import Optional
from typing import TypeAlias
from typing import Union
from typing import cast

from pydantic import BaseModel
from pydantic import ConfigDict
Expand All @@ -14,6 +15,7 @@

from frinx.common.conductor_enums import DoWhileEvaluatorType
from frinx.common.conductor_enums import SwitchEvaluatorType
from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.conductor_enums import WorkflowStatus
from frinx.common.util import snake_to_camel_case
from frinx.common.worker.worker import WorkerImpl
Expand Down Expand Up @@ -93,6 +95,10 @@ class WorkflowTaskImpl(BaseModel):
populate_by_name=True
)

@property
def status(self) -> TaskResultStatus:
return cast(TaskResultStatus, f'${{{self.task_reference_name}.status}}')

def output_ref(self, path: str | None = None) -> str:
if not path or path is None:
return f'${{{self.task_reference_name}.output}}'
Expand All @@ -118,7 +124,7 @@ class DynamicForkTaskInputParameters(BaseModel):


class DynamicForkTaskFromDefInputParameters(BaseModel):
dynamic_tasks: typing.Union[str, object]
dynamic_tasks: Union[str, object]
dynamic_tasks_input: str

model_config = ConfigDict(
Expand Down Expand Up @@ -181,7 +187,7 @@ class DynamicForkTask(WorkflowTaskImpl):
type: TaskType = TaskType.FORK_JOIN_DYNAMIC
dynamic_fork_tasks_param: str = Field(default='dynamicTasks')
dynamic_fork_tasks_input_param_name: str = Field(default='dynamicTasksInput')
input_parameters: typing.Union[
input_parameters: Union[
DynamicForkArraysTaskInputParameters,
DynamicForkTaskInputParameters,
DynamicForkArraysTaskFromDefInputParameters, DynamicForkTaskFromDefInputParameters
Expand Down Expand Up @@ -336,7 +342,7 @@ def check_input_values(cls, values: dict[str, Any]) -> dict[str, Any]:


class StartWorkflowTaskInputParameters(BaseModel):
start_workflow: typing.Union[StartWorkflowTaskPlainInputParameters, StartWorkflowTaskFromDefInputParameters]
start_workflow: Union[StartWorkflowTaskPlainInputParameters, StartWorkflowTaskFromDefInputParameters]

model_config = ConfigDict(
validate_assignment=True,
Expand Down

0 comments on commit 63e45c3

Please sign in to comment.