diff --git a/proxy/service.py b/proxy/service.py index f50c377..61e67e0 100644 --- a/proxy/service.py +++ b/proxy/service.py @@ -668,9 +668,13 @@ def get_deployment(deployment_id: str) -> dict: return res -def get_final_state_for_flow_run(flow_run_id: str): - """fetch final state of subtasks""" - all_ids_to_look_at = traverse_flow_run_graph(flow_run_id, []) +def update_flow_run_final_state(flow_run: dict) -> dict: + """ + fetch tasks of the flow_run & checks for the custom flow_run state + update the flow_run and return it + addes status and state_name to the flow_run + """ + all_ids_to_look_at = traverse_flow_run_graph(flow_run["id"], []) query = { "flow_runs": { "operator": "and_", @@ -678,20 +682,15 @@ def get_final_state_for_flow_run(flow_run_id: str): }, } result = prefect_post("task_runs/filter/", query) - priority = ["Completed", "DBT_TEST_FAILED", "Failed", "Running", "Unknown"] - as_numeric = list( - map( - lambda x: ( - priority.index(x["state"]["name"]) - if x["state"]["name"] in priority - else 4 - ), - result, - ) - ) - if len(as_numeric) == 0: - return "RUNNING" - return priority[max(as_numeric)].upper() + if "DBT_TEST_FAILED" in [x["state"]["name"] for x in result]: + final_state_name = "DBT_TEST_FAILED" + else: + final_state_name = flow_run["state"]["name"] + + flow_run["status"] = flow_run["state"]["type"] + flow_run["state_name"] = final_state_name + + return flow_run def get_flow_runs_by_deployment_id( @@ -734,28 +733,17 @@ def get_flow_runs_by_deployment_id( ) from error for flow_run in result: # get the tasks if any and check their state names - all_ids_to_look_at = traverse_flow_run_graph(flow_run["id"], []) - query = { - "flow_runs": { - "operator": "and_", - "id": {"any_": all_ids_to_look_at}, - }, - } - result = prefect_post("task_runs/filter/", query) - if "DBT_TEST_FAILED" in [x["state"]["name"] for x in result]: - final_state_name = "DBT_TEST_FAILED" - else: - final_state_name = flow_run["state"]["name"] + updated_flow_run = update_flow_run_final_state(flow_run) flow_runs.append( { - "id": flow_run["id"], - "name": flow_run["name"], - "tags": flow_run["tags"], - "startTime": flow_run["start_time"], - "expectedStartTime": flow_run["expected_start_time"], - "totalRunTime": flow_run["total_run_time"], - "status": flow_run["state"]["type"], - "state_name": final_state_name, + "id": updated_flow_run["id"], + "name": updated_flow_run["name"], + "tags": updated_flow_run["tags"], + "startTime": updated_flow_run["start_time"], + "expectedStartTime": updated_flow_run["expected_start_time"], + "totalRunTime": updated_flow_run["total_run_time"], + "status": updated_flow_run["status"], + "state_name": updated_flow_run["state_name"], } ) @@ -984,13 +972,11 @@ def get_flow_run(flow_run_id: str) -> dict: """Get a flow run by its id""" try: flow_run = prefect_get(f"flow_runs/{flow_run_id}") - final_state = get_final_state_for_flow_run(flow_run_id) - if final_state != "UNKNOWN": - flow_run["state_name"] = final_state + updated_flow_run = update_flow_run_final_state(flow_run) + return updated_flow_run except Exception as err: logger.exception(err) raise PrefectException("failed to fetch a flow-run") from err - return flow_run def set_deployment_schedule(deployment_id: str, status: str) -> None: diff --git a/tests/test_service.py b/tests/test_service.py index f5e735a..37f1320 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -1185,10 +1185,14 @@ def test_set_deployment_schedule_prefect_post(): @patch("proxy.service.prefect_get") -@patch("proxy.service.get_final_state_for_flow_run") -def test_get_flow_run_success(mock_get_final_state: Mock, mock_get: Mock): +@patch("proxy.service.update_flow_run_final_state") +def test_get_flow_run_success(mock_update_flow_run_final_state: Mock, mock_get: Mock): mock_get.return_value = {"id": "12345", "state": {"type": "COMPLETED"}} - mock_get_final_state.return_value = "COMPLETED" + mock_update_flow_run_final_state.return_value = { + "id": "12345", + "state": {"type": "COMPLETED"}, + "state_name": "COMPLETED", + } response = get_flow_run("flow-run-id") mock_get.assert_called_once_with("flow_runs/flow-run-id") assert response == {