Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add status property to WorkflowTaskImpl #72

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions frinx/common/workflow/task.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import typing
from enum import Enum
from typing import Any
from typing import Optional
from typing import TypeAlias
from typing import Union
from typing import cast

from pydantic import BaseModel
from pydantic import ConfigDict
Expand All @@ -14,6 +15,7 @@

from frinx.common.conductor_enums import DoWhileEvaluatorType
from frinx.common.conductor_enums import SwitchEvaluatorType
from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.conductor_enums import WorkflowStatus
from frinx.common.util import snake_to_camel_case
from frinx.common.worker.worker import WorkerImpl
Expand Down Expand Up @@ -93,6 +95,10 @@ class WorkflowTaskImpl(BaseModel):
populate_by_name=True
)

@property
def status(self) -> TaskResultStatus:
return cast(TaskResultStatus, f'${{{self.task_reference_name}.status}}')

def output_ref(self, path: str | None = None) -> str:
if not path or path is None:
return f'${{{self.task_reference_name}.output}}'
Expand All @@ -118,7 +124,7 @@ class DynamicForkTaskInputParameters(BaseModel):


class DynamicForkTaskFromDefInputParameters(BaseModel):
dynamic_tasks: typing.Union[str, object]
dynamic_tasks: Union[str, object]
dynamic_tasks_input: str

model_config = ConfigDict(
Expand Down Expand Up @@ -181,7 +187,7 @@ class DynamicForkTask(WorkflowTaskImpl):
type: TaskType = TaskType.FORK_JOIN_DYNAMIC
dynamic_fork_tasks_param: str = Field(default='dynamicTasks')
dynamic_fork_tasks_input_param_name: str = Field(default='dynamicTasksInput')
input_parameters: typing.Union[
input_parameters: Union[
DynamicForkArraysTaskInputParameters,
DynamicForkTaskInputParameters,
DynamicForkArraysTaskFromDefInputParameters, DynamicForkTaskFromDefInputParameters
Expand Down Expand Up @@ -336,7 +342,7 @@ def check_input_values(cls, values: dict[str, Any]) -> dict[str, Any]:


class StartWorkflowTaskInputParameters(BaseModel):
start_workflow: typing.Union[StartWorkflowTaskPlainInputParameters, StartWorkflowTaskFromDefInputParameters]
start_workflow: Union[StartWorkflowTaskPlainInputParameters, StartWorkflowTaskFromDefInputParameters]

model_config = ConfigDict(
validate_assignment=True,
Expand Down
Loading