From 3565723833b5f4d55b9236786b6285b6def953fc Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 30 Sep 2024 23:22:17 +0530 Subject: [PATCH] Address review comments from @tatiana --- cosmos/airflow/graph.py | 8 ++++---- cosmos/constants.py | 2 ++ cosmos/operators/local.py | 3 ++- cosmos/settings.py | 1 - docs/getting_started/execution-modes.rst | 9 ++++++--- tests/airflow/test_graph.py | 6 +++--- 6 files changed, 17 insertions(+), 12 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index e054e8a76..eaf732bfa 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -8,6 +8,7 @@ from cosmos.config import RenderConfig from cosmos.constants import ( + DBT_COMPILE_TASK_ID, DEFAULT_DBT_RESOURCES, TESTABLE_DBT_RESOURCES, DbtResourceType, @@ -20,7 +21,6 @@ from cosmos.core.graph.entities import Task as TaskMetadata from cosmos.dbt.graph import DbtNode from cosmos.log import get_logger -from cosmos.settings import dbt_compile_task_id logger = get_logger(__name__) @@ -264,17 +264,17 @@ def _add_dbt_compile_task( return compile_task_metadata = TaskMetadata( - id=dbt_compile_task_id, + id=DBT_COMPILE_TASK_ID, operator_class="cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator", arguments=task_args, extra_context={}, ) compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=None) - tasks_map[dbt_compile_task_id] = compile_airflow_task + tasks_map[DBT_COMPILE_TASK_ID] = compile_airflow_task for node_id, node in nodes.items(): if not node.depends_on and node_id in tasks_map: - tasks_map[dbt_compile_task_id] >> tasks_map[node_id] + tasks_map[DBT_COMPILE_TASK_ID] >> tasks_map[node_id] def build_airflow_graph( diff --git a/cosmos/constants.py b/cosmos/constants.py index 64fd42701..f42cfc4fc 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -148,3 +148,5 @@ def _missing_value_(cls, value): # type: ignore # It expects that you have already created those resources through the appropriate commands. # https://docs.getdbt.com/reference/commands/test TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED} + +DBT_COMPILE_TASK_ID = "dbt_compile" diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 1083d5703..e3392df97 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -9,6 +9,7 @@ from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence +from urllib.parse import urlparse import jinja2 from airflow import DAG @@ -268,7 +269,7 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: remote_conn_id = remote_target_path_conn_id if not remote_conn_id: - target_path_schema = target_path_str.split("://")[0] + target_path_schema = urlparse(target_path_str).scheme remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment] if remote_conn_id is None: return None, None diff --git a/cosmos/settings.py b/cosmos/settings.py index 57e8aeaaa..07a3f1b1a 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -34,7 +34,6 @@ remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None) remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None) -dbt_compile_task_id = conf.get("cosmos", "dbt_compile_task_id", fallback="dbt_compile") remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None) remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None) diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst index 8e47dd928..ec150992d 100644 --- a/docs/getting_started/execution-modes.rst +++ b/docs/getting_started/execution-modes.rst @@ -12,7 +12,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut 5. **aws_eks**: Run ``dbt`` commands from AWS EKS Pods managed by Cosmos (requires a pre-existing Docker image) 6. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image) 7. **gcp_cloud_run_job**: Run ``dbt`` commands from GCP Cloud Run Job instances managed by Cosmos (requires a pre-existing Docker image) -8. **airflow_async**: (Introduced since Cosmos 1.7.0) Run the dbt resources from your dbt project asynchronously, by submitting the corresponding compiled SQLs to Apache Airflow's `Deferrable operators `__ +8. **airflow_async**: (Experimental and introduced since Cosmos 1.7.0) Run the dbt resources from your dbt project asynchronously, by submitting the corresponding compiled SQLs to Apache Airflow's `Deferrable operators `__ The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below. @@ -244,7 +244,7 @@ Each task will create a new Cloud Run Job execution, giving full isolation. The ) Airflow Async (experimental) -------------- +---------------------------- .. versionadded:: 1.7.0 @@ -260,7 +260,9 @@ the ``dbt compile`` command on your dbt project which then outputs compiled SQLs As part of the same task run, these compiled SQLs are then stored remotely to a remote path set using the :ref:`remote_target_path` configuration. The remote path is then used by the subsequent tasks in the DAG to fetch (from the remote path) and run the compiled SQLs asynchronously using e.g. the ``DbtRunAirflowAsyncOperator``. -You may observe that the compile task takes a bit longer to run due to the latency of storing the compiled SQLs remotely, +You may observe that the compile task takes a bit longer to run due to the latency of storing the compiled SQLs +remotely (e.g. for the classic ``jaffle_shop`` dbt project, upon compiling it produces about 31 files measuring about 124KB in total, but on a local +machine it took approximately 25 seconds for the task to compile & upload the compiled SQLs to the remote path)., however, it is still a win as it is one-time overhead and the subsequent tasks run asynchronously utilising the Airflow's deferrable operators and supplying to them those compiled SQLs. @@ -268,6 +270,7 @@ Note that currently, the ``airflow_async`` execution mode has the following limi 1. Only supports the ``dbt resource type`` models to be run asynchronously using Airflow deferrable operators. All other resources are executed synchronously using dbt commands as they are in the ``local`` execution mode. 2. Only supports BigQuery as the target database. If a profile target other than BigQuery is specified, Cosmos will error out saying that the target database is not supported with this execution mode. +3. Only works for ``full_refresh`` models. There is pending work to support other modes. Example DAG: diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index d864b73e4..6fc7cdc0a 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -21,6 +21,7 @@ ) from cosmos.config import ProfileConfig, RenderConfig from cosmos.constants import ( + DBT_COMPILE_TASK_ID, DbtResourceType, ExecutionMode, SourceRenderingBehavior, @@ -30,7 +31,6 @@ from cosmos.converter import airflow_kwargs from cosmos.dbt.graph import DbtNode from cosmos.profiles import PostgresUserPasswordProfileMapping -from cosmos.settings import dbt_compile_task_id SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none")) @@ -258,8 +258,8 @@ def test_build_airflow_graph_with_dbt_compile_task(): ) task_ids = [task.task_id for task in dag.tasks] - assert dbt_compile_task_id in task_ids - assert dbt_compile_task_id in dag.tasks[0].upstream_task_ids + assert DBT_COMPILE_TASK_ID in task_ids + assert DBT_COMPILE_TASK_ID in dag.tasks[0].upstream_task_ids def test_calculate_operator_class():