From ce9cce8cd45d549015ecc5fa408ba6aeb1ed346b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 21 Aug 2023 20:09:14 +0000 Subject: [PATCH 01/11] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/dbt/graph.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index bee0e43de..70f999d85 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -144,28 +144,28 @@ def load_via_dbt_ls(self) -> None: with self.profile_config.ensure_profile() as profile_values: (profile_path, env_vars) = profile_values - local_flags=[ - "--project-dir", - str(self.project.dir), - "--profiles-dir", - str(profile_path.parent), - "--profile", - self.profile_config.profile_name, - "--target", - self.profile_config.target_name, - ] + local_flags = [ + "--project-dir", + str(self.project.dir), + "--profiles-dir", + str(profile_path.parent), + "--profile", + self.profile_config.profile_name, + "--target", + self.profile_config.target_name, + ] env = os.environ.copy() env.update(env_vars) - with tempfile.TemporaryDirectory() as tmpdir: + with tempfile.TemporaryDirectory() as tmpdir: logger.info("Environment variable keys: %s", env.keys()) log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir) target_dir = Path(env.get(DBT_TARGET_PATH_ENVVAR) or tmpdir) env[DBT_LOG_PATH_ENVVAR] = str(log_dir) env[DBT_TARGET_PATH_ENVVAR] = str(target_dir) - deps_command=[self.dbt_cmd, "deps"] + deps_command = [self.dbt_cmd, "deps"] deps_command.extend(local_flags) logger.info("Running command: `%s`", " ".join(deps_command)) From 342a6bdcfd8233dbf31894272797f1deb0e35f22 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 21 Aug 2023 20:21:00 +0000 Subject: [PATCH 02/11] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dev/dags/dbt/jaffle_shop/packages.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/dbt/jaffle_shop/packages.yml b/dev/dags/dbt/jaffle_shop/packages.yml index d5e05d96c..3af907c8a 100644 --- a/dev/dags/dbt/jaffle_shop/packages.yml +++ b/dev/dags/dbt/jaffle_shop/packages.yml @@ -1,3 +1,3 @@ packages: - package: dbt-labs/dbt_utils - version: "1.1.1" \ No newline at end of file + version: "1.1.1" From df27dd2ed75bc9dac9f6d0f1c87ec6d0bff2c81d Mon Sep 17 00:00:00 2001 From: DanMawdsleyBA Date: Mon, 21 Aug 2023 22:01:34 +0100 Subject: [PATCH 03/11] fix precommit issue --- dev/dags/dbt/jaffle_shop/packages.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/dbt/jaffle_shop/packages.yml b/dev/dags/dbt/jaffle_shop/packages.yml index d5e05d96c..3af907c8a 100644 --- a/dev/dags/dbt/jaffle_shop/packages.yml +++ b/dev/dags/dbt/jaffle_shop/packages.yml @@ -1,3 +1,3 @@ packages: - package: dbt-labs/dbt_utils - version: "1.1.1" \ No newline at end of file + version: "1.1.1" From 91308167233931436d1c79b70c84cef912a118ca Mon Sep 17 00:00:00 2001 From: DanMawdsleyBA Date: Tue, 22 Aug 2023 13:54:23 +0100 Subject: [PATCH 04/11] Adding dbt_deps option in render config --- cosmos/config.py | 2 ++ cosmos/converter.py | 2 ++ cosmos/dbt/graph.py | 42 +++++++++++++++++++++++------------------- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index e2d69c9a6..b6d287604 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -30,6 +30,7 @@ class RenderConfig: :param load_method: The parsing method for loading the dbt model. Defaults to AUTOMATIC :param select: A list of dbt select arguments (e.g. 'config.materialized:incremental') :param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly') + :param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing """ emit_datasets: bool = True @@ -37,6 +38,7 @@ class RenderConfig: load_method: LoadMode = LoadMode.AUTOMATIC select: list[str] = field(default_factory=list) exclude: list[str] = field(default_factory=list) + dbt_deps: bool = True @dataclass diff --git a/cosmos/converter.py b/cosmos/converter.py index 84dcd9fed..0c73eb47f 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -114,6 +114,7 @@ def __init__( test_behavior = render_config.test_behavior select = render_config.select exclude = render_config.exclude + dbt_deps = render_config.dbt_deps execution_mode = execution_config.execution_mode load_mode = render_config.load_method manifest_path = project_config.parsed_manifest_path @@ -145,6 +146,7 @@ def __init__( select=select, dbt_cmd=dbt_executable_path, profile_config=profile_config, + dbt_deps=dbt_deps, ) dbt_graph.load(method=load_mode, execution_mode=execution_mode) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 70f999d85..7bd5f0d68 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -74,15 +74,18 @@ class DbtGraph: def __init__( self, project: DbtProject, + dbt_deps: bool, exclude: list[str] | None = None, select: list[str] | None = None, dbt_cmd: str = get_system_dbt(), profile_config: ProfileConfig | None = None, + ): self.project = project self.exclude = exclude or [] self.select = select or [] self.profile_config = profile_config + self.dbt_deps=dbt_deps # specific to loading using ls self.dbt_cmd = dbt_cmd @@ -165,25 +168,26 @@ def load_via_dbt_ls(self) -> None: env[DBT_LOG_PATH_ENVVAR] = str(log_dir) env[DBT_TARGET_PATH_ENVVAR] = str(target_dir) - deps_command = [self.dbt_cmd, "deps"] - deps_command.extend(local_flags) - logger.info("Running command: `%s`", " ".join(deps_command)) - - process = Popen( - deps_command, - stdout=PIPE, - stderr=PIPE, - cwd=tmpdir, - universal_newlines=True, - env=env, - ) - stdout, stderr = process.communicate() - - logger.debug("dbt deps output: %s", stdout) - - if stderr or "Runtime Error" in stdout: - details = stderr or stdout - raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}") + if self.dbt_deps: + deps_command = [self.dbt_cmd, "deps"] + deps_command.extend(local_flags) + logger.info("Running command: `%s`", " ".join(deps_command)) + + process = Popen( + deps_command, + stdout=PIPE, + stderr=PIPE, + cwd=tmpdir, + universal_newlines=True, + env=env, + ) + stdout, stderr = process.communicate() + + logger.debug("dbt deps output: %s", stdout) + + if stderr or "Runtime Error" in stdout: + details = stderr or stdout + raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}") ls_command = [self.dbt_cmd, "ls", "--output", "json"] From 22acc3a98cc39d430350ba68c38ffa8b26af2a0e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 22 Aug 2023 12:54:54 +0000 Subject: [PATCH 05/11] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/dbt/graph.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 7bd5f0d68..52b0d6e17 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -79,13 +79,12 @@ def __init__( select: list[str] | None = None, dbt_cmd: str = get_system_dbt(), profile_config: ProfileConfig | None = None, - ): self.project = project self.exclude = exclude or [] self.select = select or [] self.profile_config = profile_config - self.dbt_deps=dbt_deps + self.dbt_deps = dbt_deps # specific to loading using ls self.dbt_cmd = dbt_cmd From f490cd5bc7199e3f9fa23f6eb8078ea2fd21de93 Mon Sep 17 00:00:00 2001 From: DanMawdsleyBA Date: Tue, 22 Aug 2023 14:04:21 +0100 Subject: [PATCH 06/11] adding default value for dbt deps --- cosmos/dbt/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 52b0d6e17..129d6dfb3 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -74,11 +74,11 @@ class DbtGraph: def __init__( self, project: DbtProject, - dbt_deps: bool, exclude: list[str] | None = None, select: list[str] | None = None, dbt_cmd: str = get_system_dbt(), profile_config: ProfileConfig | None = None, + dbt_deps: bool | None = True, ): self.project = project self.exclude = exclude or [] From d8bb6dd91eb70accec9945ba6641ddb30f67fe93 Mon Sep 17 00:00:00 2001 From: DanMawdsleyBA Date: Tue, 22 Aug 2023 14:51:52 +0100 Subject: [PATCH 07/11] add test for dbt deps --- cosmos/dbt/graph.py | 7 +++++-- tests/dbt/test_graph.py | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 129d6dfb3..a811f2598 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -219,9 +219,12 @@ def load_via_dbt_ls(self) -> None: for line in logfile: logger.debug(line.strip()) - if stderr or "Runtime Error" in stdout: + if stderr or "Runtime Error" or "Compilation Error" in stdout: details = stderr or stdout - raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}") + if 'Run "dbt deps" to install package dependencies' in stdout: + raise CosmosLoadDbtException(f"Unable to run dbt ls command due to dbt_packages not installed.") + else: + raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}") nodes = {} for line in stdout.split("\n"): diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 497e189cd..1db832cf5 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -251,6 +251,28 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): assert err_info.value.args[0] == expected +def test_load_via_dbt_ls_without_dbt_deps(): + dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + dbt_graph = DbtGraph( + dbt_deps=False, + project=dbt_project, + profile_config=ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="airflow_db", + profile_args={"schema": "public"}, + ), + ), + ) + + with pytest.raises(CosmosLoadDbtException) as err_info: + dbt_graph.load_via_dbt_ls() + + expected = "Unable to run dbt ls command due to dbt_packages not installed." + assert err_info.value.args[0] == expected + + @pytest.mark.integration @patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", "")) def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): From 68f477bceaf32476574490a48c2f056076ddb196 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 22 Aug 2023 13:54:21 +0000 Subject: [PATCH 08/11] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/dbt/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index a811f2598..1ad3f9a75 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -222,7 +222,7 @@ def load_via_dbt_ls(self) -> None: if stderr or "Runtime Error" or "Compilation Error" in stdout: details = stderr or stdout if 'Run "dbt deps" to install package dependencies' in stdout: - raise CosmosLoadDbtException(f"Unable to run dbt ls command due to dbt_packages not installed.") + raise CosmosLoadDbtException("Unable to run dbt ls command due to dbt_packages not installed.") else: raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}") From 13f5408d8d656ac1bf535ddb0a4cbf94e4f2f56d Mon Sep 17 00:00:00 2001 From: DanMawdsleyBA Date: Tue, 22 Aug 2023 15:06:39 +0100 Subject: [PATCH 09/11] fix test --- tests/dbt/test_graph.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 1db832cf5..de4dacbd5 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -250,9 +250,10 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): expected = "Unable to find the dbt executable: /inexistent/dbt" assert err_info.value.args[0] == expected - -def test_load_via_dbt_ls_without_dbt_deps(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) +@pytest.mark.integration +@pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) +def test_load_via_dbt_ls_without_dbt_deps(pipeline_name): + dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR) dbt_graph = DbtGraph( dbt_deps=False, project=dbt_project, From 4f1d83792680a07e04efa81bc8cfed11e7b926a5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 22 Aug 2023 14:07:13 +0000 Subject: [PATCH 10/11] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/dbt/test_graph.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index de4dacbd5..8bf0f301d 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -250,6 +250,7 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): expected = "Unable to find the dbt executable: /inexistent/dbt" assert err_info.value.args[0] == expected + @pytest.mark.integration @pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) def test_load_via_dbt_ls_without_dbt_deps(pipeline_name): From 4103c99edb58cc4820a4546d20442e1c535327ea Mon Sep 17 00:00:00 2001 From: DanMawdsleyBA Date: Thu, 24 Aug 2023 10:37:35 +0100 Subject: [PATCH 11/11] minor tweaks --- cosmos/dbt/graph.py | 2 +- tests/dbt/test_graph.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 1ad3f9a75..cd4dd34cb 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -219,7 +219,7 @@ def load_via_dbt_ls(self) -> None: for line in logfile: logger.debug(line.strip()) - if stderr or "Runtime Error" or "Compilation Error" in stdout: + if stderr or "Error" in stdout: details = stderr or stdout if 'Run "dbt deps" to install package dependencies' in stdout: raise CosmosLoadDbtException("Unable to run dbt ls command due to dbt_packages not installed.") diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 8bf0f301d..583fe1ccb 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -252,7 +252,7 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): @pytest.mark.integration -@pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) +@pytest.mark.parametrize("pipeline_name", ("jaffle_shop")) def test_load_via_dbt_ls_without_dbt_deps(pipeline_name): dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR) dbt_graph = DbtGraph(