Skip to content

Commit

Permalink
feat: run results and graph execution history
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-v4s committed Aug 27, 2024
1 parent 4d37f4f commit dffa56d
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 12 deletions.
19 changes: 16 additions & 3 deletions qualibrate_runner/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from pydantic import BaseModel
from qualibrate.orchestration.execution_history import ExecutionHistory
from qualibrate.qualibration_graph import QualibrationGraph
from qualibrate.qualibration_node import QualibrationNode

Expand Down Expand Up @@ -87,9 +88,7 @@ def submit_workflow_run(
for name, params in input_parameters.get("nodes", {}).items()
},
}
validate_input_parameters(
cast(Type[BaseModel], graph.full_parameters), input_parameters
)
validate_input_parameters(graph.full_parameters_class, input_parameters)
background_tasks.add_task(run_workflow, graph, input_parameters, state)
return f"Workflow job {graph.name} is submitted"

Expand Down Expand Up @@ -156,6 +155,20 @@ def get_last_run(
return state.last_run


@base_router.get("/last_run/workflow/execution_history")
def get_execution_history(
state: Annotated[State, Depends(get_state)],
) -> Optional[Mapping[str, Any]]:
if not isinstance(state.run_item, QualibrationGraph):
return None
graph: QualibrationGraph = state.run_item
orch = graph._orchestrator
if orch is None:
raise RuntimeError("No graph orchestrator")
history: ExecutionHistory = orch.get_execution_history()
return cast(Mapping[str, Any], history.model_dump(mode="json"))


@base_router.post(
"/record_state_update",
description=(
Expand Down
11 changes: 7 additions & 4 deletions qualibrate_runner/config/models.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from pathlib import Path
from typing import Any, Callable, Mapping, Optional
from typing import Any, Callable, Mapping, Optional, Union

from pydantic import BaseModel, DirectoryPath, ImportString
from pydantic import BaseModel, ConfigDict, DirectoryPath, ImportString
from pydantic_settings import BaseSettings, SettingsConfigDict
from qualibrate.qualibration_graph import QualibrationGraph
from qualibrate.qualibration_library import QualibrationLibrary
from qualibrate.qualibration_node import QualibrationNode

from qualibrate_runner.core.models.last_run import LastRun, RunStatus


class State(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

passed_parameters: Mapping[str, Any] = {}
persistent: dict[str, Any] = {}
last_run: Optional[LastRun] = None
node: Optional[str] = None
run_item: Optional[Union[QualibrationNode, QualibrationGraph]] = None

@property
def is_running(self) -> bool:
Expand Down
3 changes: 3 additions & 0 deletions qualibrate_runner/core/models/last_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import Any, Mapping, Optional, Union

from pydantic import BaseModel, Field
from qualibrate.run_summary.graph import GraphRunSummary
from qualibrate.run_summary.node import NodeRunSummary


class RunStatus(Enum):
Expand All @@ -28,5 +30,6 @@ class LastRun(BaseModel):
status: RunStatus
name: str
idx: int
run_result: Optional[Union[NodeRunSummary, GraphRunSummary]] = None
state_updates: Mapping[str, StateUpdate] = Field(default_factory=dict)
error: Optional[RunError] = None
16 changes: 11 additions & 5 deletions qualibrate_runner/core/run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@


def validate_input_parameters(
node_parameters: Type[BaseModel],
parameters_class: Type[BaseModel],
passed_parameters: Mapping[str, Any],
) -> BaseModel:
try:
return node_parameters.model_validate(passed_parameters)
return parameters_class.model_validate(passed_parameters)
except ValidationError as ex:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=ex.errors()
Expand All @@ -29,6 +29,7 @@ def run_node(
state: State,
) -> None:
state.passed_parameters = passed_input_parameters
state.run_item = node
state.last_run = LastRun(
name=node.name,
status=RunStatus.RUNNING,
Expand All @@ -37,7 +38,7 @@ def run_node(
try:
library = QualibrationLibrary.active_library
node = library.nodes[node.name]
library.run_node(
result = library.run_node(
node.name, node.parameters_class(**passed_input_parameters)
)
except Exception as ex:
Expand All @@ -59,6 +60,7 @@ def run_node(
name=state.last_run.name,
status=RunStatus.FINISHED,
idx=idx,
run_result=result,
state_updates=node.state_updates,
)

Expand All @@ -74,12 +76,15 @@ def run_workflow(
status=RunStatus.RUNNING,
idx=-1,
)
state.run_item = workflow
try:
library = QualibrationLibrary.active_library
workflow = library.graphs[workflow.name]
library.run_graph(
workflow.name, workflow.full_parameters(**passed_input_parameters)
result = library.run_graph(
workflow.name,
workflow.full_parameters_class(**passed_input_parameters),
)
print("Graph completed. Result:", result)
except Exception as ex:
state.last_run = LastRun(
name=state.last_run.name,
Expand All @@ -99,6 +104,7 @@ def run_workflow(
name=state.last_run.name,
status=RunStatus.FINISHED,
idx=idx,
run_result=result,
state_updates=(
workflow.state_updates
if hasattr(workflow, "state_updates")
Expand Down

0 comments on commit dffa56d

Please sign in to comment.