Skip to content

Commit

Permalink
Merge pull request #136 from DalgoT4D/hotfix/flow_run_status
Browse files Browse the repository at this point in the history
update the flow run to add customm keys based on the dbt tests failed
  • Loading branch information
fatchat authored Jun 11, 2024
2 parents 0e9a2e1 + 853ca17 commit 5599656
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 44 deletions.
68 changes: 27 additions & 41 deletions proxy/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,30 +668,29 @@ def get_deployment(deployment_id: str) -> dict:
return res


def get_final_state_for_flow_run(flow_run_id: str):
"""fetch final state of subtasks"""
all_ids_to_look_at = traverse_flow_run_graph(flow_run_id, [])
def update_flow_run_final_state(flow_run: dict) -> dict:
"""
fetch tasks of the flow_run & checks for the custom flow_run state
update the flow_run and return it
addes status and state_name to the flow_run
"""
all_ids_to_look_at = traverse_flow_run_graph(flow_run["id"], [])
query = {
"flow_runs": {
"operator": "and_",
"id": {"any_": all_ids_to_look_at},
},
}
result = prefect_post("task_runs/filter/", query)
priority = ["Completed", "DBT_TEST_FAILED", "Failed", "Running", "Unknown"]
as_numeric = list(
map(
lambda x: (
priority.index(x["state"]["name"])
if x["state"]["name"] in priority
else 4
),
result,
)
)
if len(as_numeric) == 0:
return "RUNNING"
return priority[max(as_numeric)].upper()
if "DBT_TEST_FAILED" in [x["state"]["name"] for x in result]:
final_state_name = "DBT_TEST_FAILED"
else:
final_state_name = flow_run["state"]["name"]

flow_run["status"] = flow_run["state"]["type"]
flow_run["state_name"] = final_state_name

return flow_run


def get_flow_runs_by_deployment_id(
Expand Down Expand Up @@ -734,28 +733,17 @@ def get_flow_runs_by_deployment_id(
) from error
for flow_run in result:
# get the tasks if any and check their state names
all_ids_to_look_at = traverse_flow_run_graph(flow_run["id"], [])
query = {
"flow_runs": {
"operator": "and_",
"id": {"any_": all_ids_to_look_at},
},
}
result = prefect_post("task_runs/filter/", query)
if "DBT_TEST_FAILED" in [x["state"]["name"] for x in result]:
final_state_name = "DBT_TEST_FAILED"
else:
final_state_name = flow_run["state"]["name"]
updated_flow_run = update_flow_run_final_state(flow_run)
flow_runs.append(
{
"id": flow_run["id"],
"name": flow_run["name"],
"tags": flow_run["tags"],
"startTime": flow_run["start_time"],
"expectedStartTime": flow_run["expected_start_time"],
"totalRunTime": flow_run["total_run_time"],
"status": flow_run["state"]["type"],
"state_name": final_state_name,
"id": updated_flow_run["id"],
"name": updated_flow_run["name"],
"tags": updated_flow_run["tags"],
"startTime": updated_flow_run["start_time"],
"expectedStartTime": updated_flow_run["expected_start_time"],
"totalRunTime": updated_flow_run["total_run_time"],
"status": updated_flow_run["status"],
"state_name": updated_flow_run["state_name"],
}
)

Expand Down Expand Up @@ -984,13 +972,11 @@ def get_flow_run(flow_run_id: str) -> dict:
"""Get a flow run by its id"""
try:
flow_run = prefect_get(f"flow_runs/{flow_run_id}")
final_state = get_final_state_for_flow_run(flow_run_id)
if final_state != "UNKNOWN":
flow_run["state_name"] = final_state
updated_flow_run = update_flow_run_final_state(flow_run)
return updated_flow_run
except Exception as err:
logger.exception(err)
raise PrefectException("failed to fetch a flow-run") from err
return flow_run


def set_deployment_schedule(deployment_id: str, status: str) -> None:
Expand Down
10 changes: 7 additions & 3 deletions tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,10 +1185,14 @@ def test_set_deployment_schedule_prefect_post():


@patch("proxy.service.prefect_get")
@patch("proxy.service.get_final_state_for_flow_run")
def test_get_flow_run_success(mock_get_final_state: Mock, mock_get: Mock):
@patch("proxy.service.update_flow_run_final_state")
def test_get_flow_run_success(mock_update_flow_run_final_state: Mock, mock_get: Mock):
mock_get.return_value = {"id": "12345", "state": {"type": "COMPLETED"}}
mock_get_final_state.return_value = "COMPLETED"
mock_update_flow_run_final_state.return_value = {
"id": "12345",
"state": {"type": "COMPLETED"},
"state_name": "COMPLETED",
}
response = get_flow_run("flow-run-id")
mock_get.assert_called_once_with("flow_runs/flow-run-id")
assert response == {
Expand Down

0 comments on commit 5599656

Please sign in to comment.