Skip to content

Commit

Permalink
fix (#123)
Browse files Browse the repository at this point in the history
Co-authored-by: Maxim Mityutko <[email protected]>
  • Loading branch information
maxim-mityutko and Maxim Mityutko authored May 24, 2024
1 parent 44c702c commit 9acb216
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 13 deletions.
20 changes: 13 additions & 7 deletions brickflow_plugins/databricks/workflow_dependency_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,19 @@ def execute(self):
for task in run.tasks:
if task.task_key == self.dependency_task_name:
task_state = task.state.result_state
self.log.info(
f"Found the run_id '{run.run_id}' and '{self.dependency_task_name}' "
f"task with state: {task_state.value}"
)
if task_state.value == "SUCCESS":
self.log.info(f"Found a successful run: {run.run_id}")
return
if task_state:
self.log.info(
f"Found the run_id '{run.run_id}' and '{self.dependency_task_name}' "
f"task with state: {task_state.value}"
)
if task_state.value == "SUCCESS":
self.log.info(f"Found a successful run: {run.run_id}")
return
else:
self.log.info(
f"Found the run_id '{run.run_id}' and '{self.dependency_task_name}' "
f"but the task has not started yet..."
)

self.log.info("Didn't find a successful task run yet...")

Expand Down
38 changes: 32 additions & 6 deletions tests/databricks_plugins/test_workflow_task_dependency_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class TestWorkflowTaskDependencySensor:
"result_state": "FAILED",
},
},
{
"run_id": 300,
"task_key": "baz",
"state": {},
},
],
}
]
Expand Down Expand Up @@ -95,9 +100,30 @@ def test_sensor_failure(self, caplog, api):

with pytest.raises(WorkflowDependencySensorTimeOutException):
sensor.execute()
assert "The job has timed out..." in caplog.text
assert "Didn't find a successful task run yet..." in caplog.text
assert (
"Found the run_id '1' and 'bar' task with state: FAILED"
in caplog.text
)

assert (
"Found the run_id '1' and 'bar' task with state: FAILED"
in caplog.messages
)
assert "Didn't find a successful task run yet..." in caplog.messages

def test_sensor_no_state(self, caplog, api):
with api:
sensor = WorkflowTaskDependencySensor(
databricks_host=self.workspace_url,
databricks_token="token",
dependency_job_name="job",
dependency_task_name="baz",
delta=timedelta(seconds=1),
timeout_seconds=1,
poke_interval_seconds=1,
)

with pytest.raises(WorkflowDependencySensorTimeOutException):
sensor.execute()

assert (
"Found the run_id '1' and 'baz' but the task has not started yet..."
in caplog.messages
)
assert "Didn't find a successful task run yet..." in caplog.messages

0 comments on commit 9acb216

Please sign in to comment.