diff --git a/qualibrate_runner/api/routes/last_run.py b/qualibrate_runner/api/routes/last_run.py index f842d8a..ec6d4c7 100644 --- a/qualibrate_runner/api/routes/last_run.py +++ b/qualibrate_runner/api/routes/last_run.py @@ -1,12 +1,12 @@ from typing import Annotated, Any, Mapping, Optional, cast from fastapi import APIRouter, Depends -from qualibrate.orchestration.execution_history import ExecutionHistory +from qualibrate.models.execution_history import ExecutionHistory from qualibrate.qualibration_graph import QualibrationGraph from qualibrate_runner.api.dependencies import get_state from qualibrate_runner.config import State -from qualibrate_runner.core.models.last_run import LastRun +from qualibrate_runner.core.models.last_run import LastRun, RunStatus from qualibrate_runner.core.models.workflow import WorkflowStatus last_run_router = APIRouter(prefix="/last_run") @@ -26,16 +26,17 @@ def get_workflow_status( if not isinstance(state.run_item, QualibrationGraph): return None graph: QualibrationGraph = state.run_item + last_run = state.last_run run_duration = float( - state.last_run.run_duration # type: ignore - if state.last_run - else 0.0 + last_run.run_duration if last_run else 0.0 # type: ignore ) return WorkflowStatus( + status=last_run.status if last_run else RunStatus.FINISHED, active=state.is_running, nodes_completed=graph.completed_count(), nodes_total=len(graph._nodes), run_duration=run_duration, + error=last_run.error if last_run else None, ) diff --git a/qualibrate_runner/core/models/last_run.py b/qualibrate_runner/core/models/last_run.py index f1b7bfb..1ab56ff 100644 --- a/qualibrate_runner/core/models/last_run.py +++ b/qualibrate_runner/core/models/last_run.py @@ -3,8 +3,8 @@ from typing import Any, Mapping, Optional, Union from pydantic import BaseModel, Field, computed_field -from qualibrate.run_summary.graph import GraphRunSummary -from qualibrate.run_summary.node import NodeRunSummary +from qualibrate.models.run_summary.graph import GraphRunSummary +from qualibrate.models.run_summary.node import NodeRunSummary class RunStatus(Enum): diff --git a/qualibrate_runner/core/models/workflow.py b/qualibrate_runner/core/models/workflow.py index e866219..c812ac4 100644 --- a/qualibrate_runner/core/models/workflow.py +++ b/qualibrate_runner/core/models/workflow.py @@ -1,8 +1,14 @@ +from typing import Optional + from pydantic import BaseModel +from qualibrate_runner.core.models.last_run import RunError, RunStatus + class WorkflowStatus(BaseModel): active: bool + status: RunStatus nodes_completed: int nodes_total: int run_duration: float + error: Optional[RunError] = None diff --git a/qualibrate_runner/core/run_job.py b/qualibrate_runner/core/run_job.py index 46e8542..77d8659 100644 --- a/qualibrate_runner/core/run_job.py +++ b/qualibrate_runner/core/run_job.py @@ -41,6 +41,7 @@ def run_node( state: State, ) -> None: state.run_item = node + run_status = RunStatus.RUNNING state.last_run = LastRun( name=node.name, status=RunStatus.RUNNING, @@ -49,41 +50,39 @@ def run_node( started_at=datetime.now(), runnable_type=RunnableType.NODE, ) + idx = -1 + run_error = None try: library = get_active_library_or_error() node = library.nodes[node.name] - result = library.run_node( + library.run_node( node.name, node.parameters_class(**passed_input_parameters) ) except Exception as ex: - state.last_run = LastRun( - name=state.last_run.name, - status=RunStatus.ERROR, - idx=-1, - started_at=state.last_run.started_at, - completed_at=datetime.now(), - runnable_type=state.last_run.runnable_type, - passed_parameters=passed_input_parameters, - error=RunError( - error_class=ex.__class__.__name__, - message=str(ex), - traceback=traceback.format_tb(ex.__traceback__), - ), + run_status = RunStatus.ERROR + run_error = RunError( + error_class=ex.__class__.__name__, + message=str(ex), + traceback=traceback.format_tb(ex.__traceback__), ) + run_status = RunStatus.ERROR raise else: idx = node.snapshot_idx if hasattr(node, "snapshot_idx") else -1 idx = idx if idx is not None else -1 + run_status = RunStatus.FINISHED + finally: state.last_run = LastRun( name=state.last_run.name, - status=RunStatus.FINISHED, + status=run_status, idx=idx, - run_result=result, + run_result=node.run_summary, runnable_type=state.last_run.runnable_type, passed_parameters=passed_input_parameters, started_at=state.last_run.started_at, completed_at=datetime.now(), state_updates=node.state_updates, + error=run_error, ) @@ -92,47 +91,43 @@ def run_workflow( passed_input_parameters: Mapping[str, Any], state: State, ) -> None: + run_status = RunStatus.RUNNING state.last_run = LastRun( name=workflow.name, - status=RunStatus.RUNNING, + status=run_status, idx=-1, started_at=datetime.now(), runnable_type=RunnableType.GRAPH, passed_parameters=passed_input_parameters, ) state.run_item = workflow + idx = -1 + run_error = None try: library = get_active_library_or_error() workflow = library.graphs[workflow.name] - result = library.run_graph( + library.run_graph( workflow.name, workflow.full_parameters_class(**passed_input_parameters), ) - print("Graph completed. Result:", result) except Exception as ex: - state.last_run = LastRun( - name=state.last_run.name, - status=RunStatus.ERROR, - idx=-1, - started_at=state.last_run.started_at, - completed_at=datetime.now(), - runnable_type=state.last_run.runnable_type, - passed_parameters=passed_input_parameters, - error=RunError( - error_class=ex.__class__.__name__, - message=str(ex), - traceback=traceback.format_tb(ex.__traceback__), - ), + run_status = RunStatus.ERROR + run_error = RunError( + error_class=ex.__class__.__name__, + message=str(ex), + traceback=traceback.format_tb(ex.__traceback__), ) raise else: idx = workflow.snapshot_idx if hasattr(workflow, "snapshot_idx") else -1 idx = idx if idx is not None else -1 + run_status = RunStatus.FINISHED + finally: state.last_run = LastRun( name=state.last_run.name, - status=RunStatus.FINISHED, + status=run_status, idx=idx, - run_result=result, + run_result=workflow.run_summary, started_at=state.last_run.started_at, completed_at=datetime.now(), runnable_type=state.last_run.runnable_type, @@ -142,4 +137,5 @@ def run_workflow( if hasattr(workflow, "state_updates") else {} ), + error=run_error, )