Skip to content

Commit

Permalink
Merge pull request #72 from DevDataPlatform/71-new-state-for-dbt-test…
Browse files Browse the repository at this point in the history
…-failures

71 new state for dbt test failures
  • Loading branch information
Ishankoradia authored Aug 27, 2023
2 parents 385433a + 4994a43 commit 9fba87c
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 30 deletions.
38 changes: 36 additions & 2 deletions proxy/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from prefect import flow, task
from prefect.blocks.system import Secret
from prefect.states import State
from prefect.states import State, StateType
from prefect_airbyte.flows import run_connection_sync
from prefect_airbyte import AirbyteConnection
from prefect_dbt.cli.commands import DbtCoreOperation, ShellOperation
Expand Down Expand Up @@ -138,6 +138,7 @@ def dbtjob(dbt_op_name: str):
errors are propagated to the flow except those from "dbt test"
"""
dbt_op: DbtCoreOperation = DbtCoreOperation.load(dbt_op_name)
logger.info("running dbtjob with DBT_TEST_FAILED update")

if os.path.exists(dbt_op.profiles_dir / "profiles.yml"):
os.unlink(dbt_op.profiles_dir / "profiles.yml")
Expand All @@ -146,7 +147,11 @@ def dbtjob(dbt_op_name: str):
return dbt_op.run()
except Exception: # skipcq PYL-W0703
if dbt_op_name.endswith("-test"):
return State(type="COMPLETED", message=f"WARNING: {dbt_op_name} failed")
return State(
type=StateType.COMPLETED,
name="DBT_TEST_FAILED",
message=f"WARNING: {dbt_op_name} failed",
)

raise

Expand Down Expand Up @@ -178,3 +183,32 @@ def deployment_schedule_flow_v2(airbyte_blocks: list, dbt_blocks: list):
except Exception as error: # skipcq PYL-W0703
logger.exception(error)
raise


@flow
def deployment_schedule_flow_v3(airbyte_blocks: list, dbt_blocks: list):
# pylint: disable=broad-exception-caught
"""modification so dbt test failures are not propagated as flow failures"""
# sort the airbyte blocks by seq
airbyte_blocks.sort(key=lambda blk: blk["seq"])

# sort the dbt blocks by seq
dbt_blocks.sort(key=lambda blk: blk["seq"])

try:
# run airbyte blocks, fail if sync fails
for block in airbyte_blocks:
airbyte_connection = AirbyteConnection.load(block["blockName"])
run_connection_sync(airbyte_connection)

# run dbt blocks, fail on block failure unless the failing block is a dbt-test
for block in dbt_blocks:
if block["blockType"] == SHELLOPERATION:
gitpulljob(block["blockName"])

elif block["blockType"] == DBTCORE:
dbtjob(block["blockName"])

except Exception as error: # skipcq PYL-W0703
logger.exception(error)
raise
22 changes: 18 additions & 4 deletions proxy/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
PrefectSecretBlockCreate,
)
from proxy.flows import (
deployment_schedule_flow_v2,
deployment_schedule_flow_v3,
)

load_dotenv()
Expand All @@ -51,7 +51,7 @@ def prefect_post(endpoint: str, payload: dict) -> dict:

root = os.getenv("PREFECT_API_URL")
res = requests.post(f"{root}/{endpoint}", timeout=30, json=payload)
logger.info(res.text)

try:
res.raise_for_status()
except Exception as error:
Expand All @@ -69,7 +69,7 @@ def prefect_patch(endpoint: str, payload: dict) -> dict:

root = os.getenv("PREFECT_API_URL")
res = requests.patch(f"{root}/{endpoint}", timeout=30, json=payload)
logger.info(res.text)

try:
res.raise_for_status()
except Exception as error:
Expand Down Expand Up @@ -547,7 +547,7 @@ async def post_deployment(payload: DeploymentCreate) -> dict:
logger.info(payload)

deployment = await Deployment.build_from_flow(
flow=deployment_schedule_flow_v2.with_options(name=payload.flow_name),
flow=deployment_schedule_flow_v3.with_options(name=payload.flow_name),
name=payload.deployment_name,
work_queue_name="ddp",
tags=[payload.org_slug],
Expand Down Expand Up @@ -626,6 +626,19 @@ def get_flow_runs_by_deployment_id(deployment_id: str, limit: int) -> list:
f"failed to fetch flow_runs for deployment {deployment_id}"
) from error
for flow_run in result:
# get the tasks if any and check their state names
all_ids_to_look_at = traverse_flow_run_graph(flow_run["id"], [])
query = {
"flow_runs": {
"operator": "and_",
"id": {"any_": all_ids_to_look_at},
},
}
result = prefect_post("task_runs/filter/", query)
if "DBT_TEST_FAILED" in [x["state"]["name"] for x in result]:
final_state_name = "DBT_TEST_FAILED"
else:
final_state_name = flow_run["state"]["name"]
flow_runs.append(
{
"id": flow_run["id"],
Expand All @@ -635,6 +648,7 @@ def get_flow_runs_by_deployment_id(deployment_id: str, limit: int) -> list:
"expectedStartTime": flow_run["expected_start_time"],
"totalRunTime": flow_run["total_run_time"],
"status": flow_run["state"]["type"],
"state_name": final_state_name,
}
)

Expand Down
81 changes: 57 additions & 24 deletions tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,10 +1130,10 @@ async def test_post_deployment_bad_payload():
@pytest.mark.asyncio
@patch("proxy.service.Deployment.build_from_flow", new_callable=AsyncMock)
@patch(
"proxy.service.deployment_schedule_flow_v2",
"proxy.service.deployment_schedule_flow_v3",
new_callable=Mock,
)
async def test_post_deployment(deployment_schedule_flow_v2, mock_build):
async def test_post_deployment(deployment_schedule_flow_v3, mock_build):
payload = DeploymentCreate(
flow_name="flow-name",
deployment_name="deployment-name",
Expand All @@ -1148,7 +1148,7 @@ async def test_post_deployment(deployment_schedule_flow_v2, mock_build):
deployment.name = "deployment-name"

mock_build.return_value = deployment
deployment_schedule_flow_v2.with_options = Mock(return_value="dsf")
deployment_schedule_flow_v3.with_options = Mock(return_value="dsf")

response = await post_deployment(payload)
assert response["id"] == "deployment-id"
Expand Down Expand Up @@ -1225,28 +1225,61 @@ def test_get_flow_runs_by_deployment_id_prefect_post():

def test_get_flow_runs_by_deployment_id_result():
with patch("proxy.service.prefect_post") as prefect_post_mock:
flow_run = {
"id": "flow_run_id",
"name": "flow_run_name",
"tags": ["tag1", "tag2"],
"start_time": "2022-01-01T00:00:00Z",
"expected_start_time": "2022-01-01T00:00:00Z",
"total_run_time": 60,
"state": {"type": "COMPLETED"},
}
prefect_post_mock.return_value = [flow_run]
result = get_flow_runs_by_deployment_id("deployment_id", 10)
assert result == [
{
"id": flow_run["id"],
"name": flow_run["name"],
"tags": flow_run["tags"],
"startTime": flow_run["start_time"],
"expectedStartTime": flow_run["expected_start_time"],
"totalRunTime": flow_run["total_run_time"],
"status": flow_run["state"]["type"],
with patch("proxy.service.prefect_get") as prefect_get_mock:
prefect_get_mock.return_value = []
flow_run = {
"id": "flow_run_id",
"name": "flow_run_name",
"tags": ["tag1", "tag2"],
"start_time": "2022-01-01T00:00:00Z",
"expected_start_time": "2022-01-01T00:00:00Z",
"total_run_time": 60,
"state": {"type": "COMPLETED", "name": "Completed"},
}
]
prefect_post_mock.side_effect = [[flow_run], []]
result = get_flow_runs_by_deployment_id("deployment_id", 10)
assert result == [
{
"id": flow_run["id"],
"name": flow_run["name"],
"tags": flow_run["tags"],
"startTime": flow_run["start_time"],
"expectedStartTime": flow_run["expected_start_time"],
"totalRunTime": flow_run["total_run_time"],
"status": flow_run["state"]["type"],
"state_name": "Completed",
}
]


def test_get_flow_runs_by_deployment_id_result_state_from_task():
with patch("proxy.service.prefect_post") as prefect_post_mock:
with patch("proxy.service.prefect_get") as prefect_get_mock:
prefect_get_mock.return_value = []
flow_run = {
"id": "flow_run_id",
"name": "flow_run_name",
"tags": ["tag1", "tag2"],
"start_time": "2022-01-01T00:00:00Z",
"expected_start_time": "2022-01-01T00:00:00Z",
"total_run_time": 60,
"state": {"type": "COMPLETED", "name": "Completed"},
}
task_run = {"state": {"name": "DBT_TEST_FAILED"}}
prefect_post_mock.side_effect = [[flow_run], [task_run]]
result = get_flow_runs_by_deployment_id("deployment_id", 10)
assert result == [
{
"id": flow_run["id"],
"name": flow_run["name"],
"tags": flow_run["tags"],
"startTime": flow_run["start_time"],
"expectedStartTime": flow_run["expected_start_time"],
"totalRunTime": flow_run["total_run_time"],
"status": "COMPLETED",
"state_name": "DBT_TEST_FAILED",
}
]


def test_get_flow_runs_by_deployment_id_exception():
Expand Down

0 comments on commit 9fba87c

Please sign in to comment.