Skip to content

Commit

Permalink
Merge pull request #22 from qua-platform/workflow_status_error
Browse files Browse the repository at this point in the history
Workflow status and error
  • Loading branch information
maxim-v4s authored Sep 25, 2024
2 parents 1ee7acf + 5621d2f commit 7cc8d99
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 44 deletions.
11 changes: 6 additions & 5 deletions qualibrate_runner/api/routes/last_run.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Annotated, Any, Mapping, Optional, cast

from fastapi import APIRouter, Depends
from qualibrate.orchestration.execution_history import ExecutionHistory
from qualibrate.models.execution_history import ExecutionHistory
from qualibrate.qualibration_graph import QualibrationGraph

from qualibrate_runner.api.dependencies import get_state
from qualibrate_runner.config import State
from qualibrate_runner.core.models.last_run import LastRun
from qualibrate_runner.core.models.last_run import LastRun, RunStatus
from qualibrate_runner.core.models.workflow import WorkflowStatus

last_run_router = APIRouter(prefix="/last_run")
Expand All @@ -26,16 +26,17 @@ def get_workflow_status(
if not isinstance(state.run_item, QualibrationGraph):
return None
graph: QualibrationGraph = state.run_item
last_run = state.last_run
run_duration = float(
state.last_run.run_duration # type: ignore
if state.last_run
else 0.0
last_run.run_duration if last_run else 0.0 # type: ignore
)
return WorkflowStatus(
status=last_run.status if last_run else RunStatus.FINISHED,
active=state.is_running,
nodes_completed=graph.completed_count(),
nodes_total=len(graph._nodes),
run_duration=run_duration,
error=last_run.error if last_run else None,
)


Expand Down
4 changes: 2 additions & 2 deletions qualibrate_runner/core/models/last_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from typing import Any, Mapping, Optional, Union

from pydantic import BaseModel, Field, computed_field
from qualibrate.run_summary.graph import GraphRunSummary
from qualibrate.run_summary.node import NodeRunSummary
from qualibrate.models.run_summary.graph import GraphRunSummary
from qualibrate.models.run_summary.node import NodeRunSummary


class RunStatus(Enum):
Expand Down
6 changes: 6 additions & 0 deletions qualibrate_runner/core/models/workflow.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from typing import Optional

from pydantic import BaseModel

from qualibrate_runner.core.models.last_run import RunError, RunStatus


class WorkflowStatus(BaseModel):
active: bool
status: RunStatus
nodes_completed: int
nodes_total: int
run_duration: float
error: Optional[RunError] = None
68 changes: 31 additions & 37 deletions qualibrate_runner/core/run_job.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import traceback
from datetime import datetime
from typing import Any, Mapping, Type, cast
from typing import Any, Mapping, Type

from fastapi import HTTPException, status
from pydantic import BaseModel, ValidationError
from qualibrate.qualibration_graph import QualibrationGraph
from qualibrate.qualibration_library import QualibrationLibrary
from qualibrate.qualibration_node import QualibrationNode
from qualibrate.run_summary.node import NodeRunSummary

from qualibrate_runner.config import State
from qualibrate_runner.core.models.last_run import (
Expand Down Expand Up @@ -42,6 +41,7 @@ def run_node(
state: State,
) -> None:
state.run_item = node
run_status = RunStatus.RUNNING
state.last_run = LastRun(
name=node.name,
status=RunStatus.RUNNING,
Expand All @@ -50,39 +50,36 @@ def run_node(
started_at=datetime.now(),
runnable_type=RunnableType.NODE,
)
idx = -1
run_error = None
try:
result = node.run(passed_parameters=passed_input_parameters)
result = cast(NodeRunSummary, result)
node.run(passed_parameters=passed_input_parameters)
node = QualibrationNode.last_executed_node
except Exception as ex:
state.last_run = LastRun(
name=state.last_run.name,
status=RunStatus.ERROR,
idx=-1,
started_at=state.last_run.started_at,
completed_at=datetime.now(),
runnable_type=state.last_run.runnable_type,
passed_parameters=passed_input_parameters,
error=RunError(
error_class=ex.__class__.__name__,
message=str(ex),
traceback=traceback.format_tb(ex.__traceback__),
),
run_status = RunStatus.ERROR
run_error = RunError(
error_class=ex.__class__.__name__,
message=str(ex),
traceback=traceback.format_tb(ex.__traceback__),
)
run_status = RunStatus.ERROR
raise
else:
idx = node.snapshot_idx if hasattr(node, "snapshot_idx") else -1
idx = idx if idx is not None else -1
run_status = RunStatus.FINISHED
finally:
state.last_run = LastRun(
name=state.last_run.name,
status=RunStatus.FINISHED,
status=run_status,
idx=idx,
run_result=result,
run_result=node.run_summary,
runnable_type=state.last_run.runnable_type,
passed_parameters=passed_input_parameters,
started_at=state.last_run.started_at,
completed_at=datetime.now(),
state_updates=node.state_updates,
error=run_error,
)


Expand All @@ -91,50 +88,46 @@ def run_workflow(
passed_input_parameters: Mapping[str, Any],
state: State,
) -> None:
run_status = RunStatus.RUNNING
state.last_run = LastRun(
name=workflow.name,
status=RunStatus.RUNNING,
status=run_status,
idx=-1,
started_at=datetime.now(),
runnable_type=RunnableType.GRAPH,
passed_parameters=passed_input_parameters,
)
state.run_item = workflow
idx = -1
run_error = None
try:
library = get_active_library_or_error()
workflow = library.graphs[workflow.name]
input_parameters = workflow.full_parameters_class(
**passed_input_parameters
)
result = workflow.run(
workflow.run(
nodes=input_parameters.nodes.model_dump(),
**input_parameters.parameters.model_dump(),
)
print("Graph completed. Result:", result)
except Exception as ex:
state.last_run = LastRun(
name=state.last_run.name,
status=RunStatus.ERROR,
idx=-1,
started_at=state.last_run.started_at,
completed_at=datetime.now(),
runnable_type=state.last_run.runnable_type,
passed_parameters=passed_input_parameters,
error=RunError(
error_class=ex.__class__.__name__,
message=str(ex),
traceback=traceback.format_tb(ex.__traceback__),
),
run_status = RunStatus.ERROR
run_error = RunError(
error_class=ex.__class__.__name__,
message=str(ex),
traceback=traceback.format_tb(ex.__traceback__),
)
raise
else:
idx = workflow.snapshot_idx if hasattr(workflow, "snapshot_idx") else -1
idx = idx if idx is not None else -1
run_status = RunStatus.FINISHED
finally:
state.last_run = LastRun(
name=state.last_run.name,
status=RunStatus.FINISHED,
status=run_status,
idx=idx,
run_result=result,
run_result=workflow.run_summary,
started_at=state.last_run.started_at,
completed_at=datetime.now(),
runnable_type=state.last_run.runnable_type,
Expand All @@ -144,4 +137,5 @@ def run_workflow(
if hasattr(workflow, "state_updates")
else {}
),
error=run_error,
)

0 comments on commit 7cc8d99

Please sign in to comment.