Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task timeout support. #1317

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import timedelta
from typing import Any, Callable, Union

from airflow.models import BaseOperator
Expand Down Expand Up @@ -166,6 +167,8 @@ def create_task_metadata(
task_id = f"{node.name}_run"
if use_task_group is True:
task_id = "run"
if "cosmos_task_timeout" in node.config.keys():
args["execution_timeout"] = timedelta(seconds=int(node.config["cosmos_task_timeout"]))
elif node.resource_type == DbtResourceType.SOURCE:
if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ Cosmos offers a number of configuration options to customize its behavior. For m
Compiled SQL <compiled-sql>
Logging <logging>
Caching <caching>
Task Timeout <task-timeout>
21 changes: 21 additions & 0 deletions docs/configuration/task-timeout.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.. _task-timeout:

Task Timeout
================

In Airflow, the "execution_timeout" parameter allows you to set the maximum runtime for a Task.
t0momi219 marked this conversation as resolved.
Show resolved Hide resolved
With Cosmos, you can specify an execution_timeout for each dbt model converted to a Task.
This lets users set a threshold for the maximum runtime of a model, triggering a timeout error if the execution exceeds this limit.

By adding cosmos_task_timeout to the config of a dbt model, Cosmos will automatically apply the specified timeout to the Task based on this value.
pankajkoti marked this conversation as resolved.
Show resolved Hide resolved

Example:

.. code-block:: yaml

version: 2

models:
- name: my_model
config:
- cosmos_task_timeout: 600 # Specify in seconds.
18 changes: 17 additions & 1 deletion tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -568,6 +568,22 @@ def test_create_task_metadata_snapshot(caplog):
assert metadata.arguments == {"models": "my_snapshot"}


def test_create_task_metadata_timeout():
sample_node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_model",
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path="",
tags=[],
config={"cosmos_task_timeout": 1},
)
metadata = create_task_metadata(
sample_node, execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier=""
)
assert "execution_timeout" in metadata.arguments
assert metadata.arguments["execution_timeout"] == timedelta(seconds=1)


@pytest.mark.parametrize(
"node_type,node_unique_id,test_indirect_selection,additional_arguments",
[
Expand Down