From 5e54e080a057327e48fb976065b1fe14ed211bb3 Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Tue, 12 Nov 2024 16:20:14 +0900 Subject: [PATCH 1/5] Add task sla and timeout support. --- cosmos/airflow/graph.py | 17 +++++++++++++++++ cosmos/config.py | 2 ++ 2 files changed, 19 insertions(+) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f507b03ac..9242026ca 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -2,6 +2,8 @@ from typing import Any, Callable, Union +from datetime import timedelta + from airflow.models import BaseOperator from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup @@ -135,6 +137,8 @@ def create_task_metadata( dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, + model_timeout: bool = False, + model_sla: bool = False, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -166,6 +170,11 @@ def create_task_metadata( task_id = f"{node.name}_run" if use_task_group is True: task_id = "run" + if model_timeout and "model_timeout" in node.config.keys(): + logger.error(f'model_timeout: {node.config["model_timeout"]} in values') + args["execution_timeout"] = timedelta(seconds=int(node.config["model_timeout"])) + if model_sla and "model_sla" in node.config.keys(): + args["sla"] = timedelta(seconds=int(node.config["model_sla"])) elif node.resource_type == DbtResourceType.SOURCE: if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS @@ -217,6 +226,8 @@ def generate_task_or_group( source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, + model_timeout: bool, + model_sla: bool, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: task_or_group: BaseOperator | TaskGroup | None = None @@ -234,6 +245,8 @@ def generate_task_or_group( dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, + model_timeout=model_timeout, + model_sla=model_sla ) # In most cases, we'll map one DBT node to one Airflow task @@ -335,6 +348,8 @@ def build_airflow_graph( node_converters = render_config.node_converters or {} test_behavior = render_config.test_behavior source_rendering_behavior = render_config.source_rendering_behavior + model_timeout = render_config.model_timeout + model_sla = render_config.model_sla tasks_map = {} task_or_group: TaskGroup | BaseOperator @@ -356,6 +371,8 @@ def build_airflow_graph( source_rendering_behavior=source_rendering_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, + model_timeout=model_timeout, + model_sla=model_sla, node=node, ) if task_or_group is not None: diff --git a/cosmos/config.py b/cosmos/config.py index 516a6787b..0275189c4 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -80,6 +80,8 @@ class RenderConfig: enable_mock_profile: bool = True source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) + model_timeout: bool = False + model_sla: bool = False def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: From 054e29118f1075f32d6d42b2040034db6c43b045 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 07:36:16 +0000 Subject: [PATCH 2/5] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/airflow/graph.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 9242026ca..0f135bfa5 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -1,8 +1,7 @@ from __future__ import annotations -from typing import Any, Callable, Union - from datetime import timedelta +from typing import Any, Callable, Union from airflow.models import BaseOperator from airflow.models.dag import DAG @@ -246,7 +245,7 @@ def generate_task_or_group( use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, model_timeout=model_timeout, - model_sla=model_sla + model_sla=model_sla, ) # In most cases, we'll map one DBT node to one Airflow task From 2e8e157c175666062e0d65b428f7e082713853af Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Mon, 18 Nov 2024 23:28:06 +0900 Subject: [PATCH 3/5] add test and documents --- cosmos/airflow/graph.py | 17 ++--------------- cosmos/config.py | 2 -- docs/configuration/index.rst | 1 + docs/configuration/task-timeout.rst | 21 +++++++++++++++++++++ tests/airflow/test_graph.py | 20 +++++++++++++++++++- 5 files changed, 43 insertions(+), 18 deletions(-) create mode 100644 docs/configuration/task-timeout.rst diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 0f135bfa5..a18f148a3 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -136,8 +136,6 @@ def create_task_metadata( dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, - model_timeout: bool = False, - model_sla: bool = False, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -169,11 +167,8 @@ def create_task_metadata( task_id = f"{node.name}_run" if use_task_group is True: task_id = "run" - if model_timeout and "model_timeout" in node.config.keys(): - logger.error(f'model_timeout: {node.config["model_timeout"]} in values') - args["execution_timeout"] = timedelta(seconds=int(node.config["model_timeout"])) - if model_sla and "model_sla" in node.config.keys(): - args["sla"] = timedelta(seconds=int(node.config["model_sla"])) + if "cosmos_task_timeout" in node.config.keys(): + args["execution_timeout"] = timedelta(seconds=int(node.config["cosmos_task_timeout"])) elif node.resource_type == DbtResourceType.SOURCE: if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS @@ -225,8 +220,6 @@ def generate_task_or_group( source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, - model_timeout: bool, - model_sla: bool, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: task_or_group: BaseOperator | TaskGroup | None = None @@ -244,8 +237,6 @@ def generate_task_or_group( dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, - model_timeout=model_timeout, - model_sla=model_sla, ) # In most cases, we'll map one DBT node to one Airflow task @@ -347,8 +338,6 @@ def build_airflow_graph( node_converters = render_config.node_converters or {} test_behavior = render_config.test_behavior source_rendering_behavior = render_config.source_rendering_behavior - model_timeout = render_config.model_timeout - model_sla = render_config.model_sla tasks_map = {} task_or_group: TaskGroup | BaseOperator @@ -370,8 +359,6 @@ def build_airflow_graph( source_rendering_behavior=source_rendering_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, - model_timeout=model_timeout, - model_sla=model_sla, node=node, ) if task_or_group is not None: diff --git a/cosmos/config.py b/cosmos/config.py index 0275189c4..516a6787b 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -80,8 +80,6 @@ class RenderConfig: enable_mock_profile: bool = True source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) - model_timeout: bool = False - model_sla: bool = False def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: diff --git a/docs/configuration/index.rst b/docs/configuration/index.rst index f6e60f61b..39895548e 100644 --- a/docs/configuration/index.rst +++ b/docs/configuration/index.rst @@ -27,3 +27,4 @@ Cosmos offers a number of configuration options to customize its behavior. For m Compiled SQL Logging Caching + Task Timeout diff --git a/docs/configuration/task-timeout.rst b/docs/configuration/task-timeout.rst new file mode 100644 index 000000000..9db986c5c --- /dev/null +++ b/docs/configuration/task-timeout.rst @@ -0,0 +1,21 @@ +.. _task-timeout: + +Task Timeout +================ + +In Airflow, the "execution_timeout" parameter allows you to set the maximum runtime for a Task. +With Cosmos, you can specify an execution_timeout for each dbt model converted to a Task. +This lets users set a threshold for the maximum runtime of a model, triggering a timeout error if the execution exceeds this limit. + +By adding cosmos_task_timeout to the config of a dbt model, Cosmos will automatically apply the specified timeout to the Task based on this value. + +Example: + +.. code-block:: yaml + + version: 2 + + models: + - name: my_model + config: + - cosmos_task_timeout: 600 # Specify in seconds. diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 1bd8cab35..309139296 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -1,5 +1,5 @@ import os -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from unittest.mock import MagicMock, patch @@ -568,6 +568,24 @@ def test_create_task_metadata_snapshot(caplog): assert metadata.arguments == {"models": "my_snapshot"} +def test_create_task_metadata_timeout(): + sample_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_model", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path="", + tags=[], + config={ + "cosmos_task_timeout": 1 + }, + ) + metadata = create_task_metadata( + sample_node, execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier="" + ) + assert "execution_timeout" in metadata.arguments + assert metadata.arguments["execution_timeout"] == timedelta(seconds=1) + + @pytest.mark.parametrize( "node_type,node_unique_id,test_indirect_selection,additional_arguments", [ From 4bd843a997761e1ec1de0a644633f22ad5a91964 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 18 Nov 2024 14:28:23 +0000 Subject: [PATCH 4/5] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/airflow/test_graph.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 309139296..d2fd11690 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -575,9 +575,7 @@ def test_create_task_metadata_timeout(): depends_on=[], file_path="", tags=[], - config={ - "cosmos_task_timeout": 1 - }, + config={"cosmos_task_timeout": 1}, ) metadata = create_task_metadata( sample_node, execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier="" From 46479f7cadde795849b6ac1de02a862d4efa2a25 Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Fri, 22 Nov 2024 20:08:13 +0900 Subject: [PATCH 5/5] fix document --- docs/configuration/task-timeout.rst | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/docs/configuration/task-timeout.rst b/docs/configuration/task-timeout.rst index 9db986c5c..16dd844ca 100644 --- a/docs/configuration/task-timeout.rst +++ b/docs/configuration/task-timeout.rst @@ -3,11 +3,8 @@ Task Timeout ================ -In Airflow, the "execution_timeout" parameter allows you to set the maximum runtime for a Task. -With Cosmos, you can specify an execution_timeout for each dbt model converted to a Task. -This lets users set a threshold for the maximum runtime of a model, triggering a timeout error if the execution exceeds this limit. - -By adding cosmos_task_timeout to the config of a dbt model, Cosmos will automatically apply the specified timeout to the Task based on this value. +In Airflow, the ``execution_timeout`` parameter allows you to set the maximum runtime for a Task. +In Cosmos, you can apply an ``execution_timeout`` to each dbt model task by specifying a ``cosmos_task_timeout`` in the model’s configuration, which sets a runtime threshold to trigger a timeout error if exceeded. Example: