Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to set timeout health check in workflows #58

Merged
merged 4 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
tasks=tasks,
git_source=git_conf,
tags=workflow.tags,
health=workflow.health,
job_clusters=[JobsJobClusters(**c) for c in workflow_clusters],
schedule=self.workflow_obj_to_schedule(workflow),
max_concurrent_runs=workflow.max_concurrent_runs,
Expand Down
7 changes: 6 additions & 1 deletion brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
BrickflowProjectDeploymentSettings,
get_brickflow_version,
)
from brickflow.bundles.model import JobsTasksNotebookTask, JobsTasksNotificationSettings
from brickflow.bundles.model import (
JobsTasksNotebookTask,
JobsTasksNotificationSettings,
JobsTasksHealthRules,
)
from brickflow.cli.projects import DEFAULT_BRICKFLOW_VERSION_MODE
from brickflow.context import (
BrickflowBuiltInTaskVariables,
Expand Down Expand Up @@ -479,6 +483,7 @@ class Task:
task_settings: Optional[TaskSettings] = None
custom_execute_callback: Optional[Callable] = None
ensure_brickflow_plugins: bool = False
health: Optional[List[JobsTasksHealthRules]] = None

def __post_init__(self) -> None:
self.is_valid_task_signature()
Expand Down
3 changes: 3 additions & 0 deletions brickflow/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
JobsWebhookNotifications,
JobsNotificationSettings,
JobsTrigger,
JobsHealthRules,
)
from brickflow.context import BrickflowInternalVariables
from brickflow.engine import ROOT_NODE
Expand Down Expand Up @@ -116,6 +117,8 @@ class Workflow:
schedule_pause_status: str = "UNPAUSED"
default_cluster: Optional[Cluster] = None
clusters: List[Cluster] = field(default_factory=lambda: [])

health: Optional[List[JobsHealthRules]] = None
default_task_settings: TaskSettings = TaskSettings()
email_notifications: Optional[WorkflowEmailNotifications] = None
webhook_notifications: Optional[WorkflowWebhookNotifications] = None
Expand Down
15 changes: 12 additions & 3 deletions docs/workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ wf = Workflow( # (1)!
email_notifications=EmailNotifications(
on_start=["[email protected]"],
on_success=["[email protected]"],
on_failure=["[email protected]"]
on_failure=["[email protected]"],
on_duration_warning_threshold_exceeded=["[email protected]"]
),
timeout_seconds=timedelta(hours=2).seconds
),
Expand All @@ -44,6 +45,11 @@ wf = Workflow( # (1)!
"catalog": "development",
"database": "your_database"
},
health = { # (16)!
"metric": "RUN_DURATION_SECONDS",
"op": "GREATER_THAN",
"value": 7200
}
)


Expand All @@ -67,6 +73,7 @@ def task_function(*, test="var"):
13. Define the common task parameters that can be used in all the tasks
14. Define a workflow task and associate it to the workflow
15. Define the schedule pause status. It is defaulted to "UNPAUSED"
16. Define health check condition that triggers duration warning threshold exceeded notifications

### Clusters

Expand Down Expand Up @@ -207,7 +214,8 @@ default_task_settings=TaskSettings(
email_notifications=EmailNotifications(
on_start=["[email protected]"],
on_success=["[email protected]"],
on_failure=["[email protected]"]
on_failure=["[email protected]"],
on_duration_warning_threshold_exceeded=["[email protected]"]
),
timeout_seconds=timedelta(hours=2).seconds,
max_retries=2,
Expand Down Expand Up @@ -249,4 +257,5 @@ common_task_parameters={
"catalog": "development",
"database": "your_database"
}
```
```

5 changes: 5 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source:
git_commit: a
git_provider: github
Expand Down
5 changes: 5 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source:
git_commit: a
git_provider: github
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ environments:
jobs:
some_wf:
email_notifications: null
health: null
git_source:
git_commit: a
git_provider: github
Expand Down Expand Up @@ -64,6 +65,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source:
git_commit: a
git_provider: github
Expand Down
5 changes: 5 additions & 0 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source: null
job_clusters: []
max_concurrent_runs: 1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source: null
job_clusters: []
max_concurrent_runs: 1.0
Expand Down
5 changes: 5 additions & 0 deletions tests/codegen/sample_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
run_as_user="[email protected]",
tags={"test": "test2"},
common_task_parameters={"all_tasks1": "test", "all_tasks3": "123"}, # type: ignore
health={
"rules": [
{"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200.0}
]
},
)


Expand Down
5 changes: 5 additions & 0 deletions tests/engine/sample_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
),
tags={"test": "test2"},
common_task_parameters={"all_tasks1": "test", "all_tasks3": "123"}, # type: ignore
health={
"rules": [
{"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200}
]
},
)


Expand Down
7 changes: 7 additions & 0 deletions tests/engine/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ def test_tags(self):
def test_default_task_settings(self):
assert wf.default_task_settings is not None

def test_health_settings(self):
assert wf.health == {
"rules": [
{"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200}
]
}

def test_user(self):
principal = "[email protected]"
u = User(principal)
Expand Down