diff --git a/cosmos/config.py b/cosmos/config.py index 610e6e489..87baba864 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -4,7 +4,7 @@ import contextlib import tempfile -from dataclasses import dataclass, field +from dataclasses import InitVar, dataclass, field from pathlib import Path from typing import Any, Iterator, Callable @@ -31,6 +31,9 @@ class RenderConfig: :param select: A list of dbt select arguments (e.g. 'config.materialized:incremental') :param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly') :param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing + :param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. + :param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. Mutually Exclusive with ProjectConfig.dbt_project_path + :param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` """ emit_datasets: bool = True @@ -40,6 +43,13 @@ class RenderConfig: exclude: list[str] = field(default_factory=list) dbt_deps: bool = True node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None + dbt_executable_path: str | Path = get_system_dbt() + dbt_project_path: InitVar[str | Path | None] = None + + project_path: Path | None = field(init=False) + + def __post_init__(self, dbt_project_path: str | Path | None) -> None: + self.project_path = Path(dbt_project_path) if dbt_project_path else None class ProjectConfig: @@ -72,11 +82,13 @@ def __init__( manifest_path: str | Path | None = None, project_name: str | None = None, ): + # Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig + # dbt_project_path may not always be defined here. + # We do, however, still require that both manifest_path and project_name be defined, or neither be defined. if not dbt_project_path: - if not manifest_path or not project_name: + if manifest_path and not project_name or project_name and not manifest_path: raise CosmosValueError( - "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." - " If only manifest_path is defined, project_name must also be defined." + "If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined." ) if project_name: self.project_name = project_name @@ -210,10 +222,17 @@ class ExecutionConfig: :param execution_mode: The execution mode for dbt. Defaults to local :param test_indirect_selection: The mode to configure the test behavior when performing indirect selection. - :param dbt_executable_path: The path to the dbt executable. Defaults to dbt if - available on the path. + :param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path. + :param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path """ execution_mode: ExecutionMode = ExecutionMode.LOCAL test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER dbt_executable_path: str | Path = get_system_dbt() + + dbt_project_path: InitVar[str | Path | None] = None + + project_path: Path | None = field(init=False) + + def __post_init__(self, dbt_project_path: str | Path | None) -> None: + self.project_path = Path(dbt_project_path) if dbt_project_path else None diff --git a/cosmos/converter.py b/cosmos/converter.py index b1653ba52..dbc290271 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -16,7 +16,6 @@ from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger - logger = get_logger(__name__) @@ -92,8 +91,8 @@ def __init__( self, project_config: ProjectConfig, profile_config: ProfileConfig, - execution_config: ExecutionConfig = ExecutionConfig(), - render_config: RenderConfig = RenderConfig(), + execution_config: ExecutionConfig | None = None, + render_config: RenderConfig | None = None, dag: DAG | None = None, task_group: TaskGroup | None = None, operator_args: dict[str, Any] | None = None, @@ -103,19 +102,37 @@ def __init__( ) -> None: project_config.validate_project() - emit_datasets = render_config.emit_datasets - test_behavior = render_config.test_behavior - select = render_config.select - exclude = render_config.exclude - dbt_deps = render_config.dbt_deps - execution_mode = execution_config.execution_mode - test_indirect_selection = execution_config.test_indirect_selection - load_mode = render_config.load_method - dbt_executable_path = execution_config.dbt_executable_path - node_converters = render_config.node_converters - - if not project_config.dbt_project_path: - raise CosmosValueError("A Project Path in ProjectConfig is required for generating a Task Operators.") + if not execution_config: + execution_config = ExecutionConfig() + if not render_config: + render_config = RenderConfig() + + # Since we now support both project_config.dbt_project_path, render_config.project_path and execution_config.project_path + # We need to ensure that only one interface is being used. + if project_config.dbt_project_path and (render_config.project_path or execution_config.project_path): + raise CosmosValueError( + "ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path." + + "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None" + ) + + # If we are using the old interface, we should migrate it to the new interface + # This is safe to do now since we have validated which config interface we're using + if project_config.dbt_project_path: + render_config.project_path = project_config.dbt_project_path + execution_config.project_path = project_config.dbt_project_path + + # At this point, execution_config.project_path should always be non-null + if not execution_config.project_path: + raise CosmosValueError( + "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." + ) + + # We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path + # We require render_config.project_path when we dont have a manifest + if not project_config.manifest_path and not render_config.project_path: + raise CosmosValueError( + "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." + ) profile_args = {} if profile_config.profile_mapping: @@ -136,36 +153,34 @@ def __init__( # We may want to consider defaulting this value in our actual ProjceConfig class? dbt_graph = DbtGraph( project=project_config, - exclude=exclude, - select=select, - dbt_cmd=dbt_executable_path, + render_config=render_config, + execution_config=execution_config, + dbt_cmd=render_config.dbt_executable_path, profile_config=profile_config, operator_args=operator_args, - dbt_deps=dbt_deps, ) - dbt_graph.load(method=load_mode, execution_mode=execution_mode) + dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) task_args = { **operator_args, - # the following args may be only needed for local / venv: - "project_dir": project_config.dbt_project_path, + "project_dir": execution_config.project_path, "profile_config": profile_config, - "emit_datasets": emit_datasets, + "emit_datasets": render_config.emit_datasets, } - if dbt_executable_path: - task_args["dbt_executable_path"] = dbt_executable_path + if execution_config.dbt_executable_path: + task_args["dbt_executable_path"] = execution_config.dbt_executable_path - validate_arguments(select, exclude, profile_args, task_args) + validate_arguments(render_config.select, render_config.exclude, profile_args, task_args) build_airflow_graph( nodes=dbt_graph.filtered_nodes, dag=dag or (task_group and task_group.dag), task_group=task_group, - execution_mode=execution_mode, + execution_mode=execution_config.execution_mode, task_args=task_args, - test_behavior=test_behavior, - test_indirect_selection=test_indirect_selection, + test_behavior=render_config.test_behavior, + test_indirect_selection=execution_config.test_indirect_selection, dbt_project_name=project_config.project_name, on_warning_callback=on_warning_callback, - node_converters=node_converters, + node_converters=render_config.node_converters, ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 89658811f..0322c8ac4 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -10,7 +10,7 @@ from subprocess import PIPE, Popen from typing import Any -from cosmos.config import ProfileConfig, ProjectConfig +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, DBT_LOG_FILENAME, @@ -53,12 +53,12 @@ class DbtNode: has_test: bool = False -def create_symlinks(dbt_project_path: Path, tmp_dir: Path) -> None: +def create_symlinks(project_path: Path, tmp_dir: Path) -> None: """Helper function to create symlinks to the dbt project files.""" ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") - for child_name in os.listdir(dbt_project_path): + for child_name in os.listdir(project_path): if child_name not in ignore_paths: - os.symlink(dbt_project_path / child_name, tmp_dir / child_name) + os.symlink(project_path / child_name, tmp_dir / child_name) def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: @@ -78,7 +78,7 @@ def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> if 'Run "dbt deps" to install package dependencies' in stdout and command[1] == "ls": raise CosmosLoadDbtException( - "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True." + "Unable to run dbt ls command due to missing dbt_packages. Set RenderConfig.dbt_deps=True." ) if returncode or "Error" in stdout: @@ -88,7 +88,7 @@ def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> return stdout -def parse_dbt_ls_output(dbt_project_path: Path, ls_stdout: str) -> dict[str, DbtNode]: +def parse_dbt_ls_output(project_path: Path, ls_stdout: str) -> dict[str, DbtNode]: """Parses the output of `dbt ls` into a dictionary of `DbtNode` instances.""" nodes = {} for line in ls_stdout.split("\n"): @@ -102,7 +102,7 @@ def parse_dbt_ls_output(dbt_project_path: Path, ls_stdout: str) -> dict[str, Dbt unique_id=node_dict["unique_id"], resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=dbt_project_path / node_dict["original_file_path"], + file_path=project_path / node_dict["original_file_path"], tags=node_dict["tags"], config=node_dict["config"], ) @@ -122,9 +122,8 @@ class DbtGraph: dbt_graph = DbtGraph( project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), - exclude=["*orders*"], - select=[], - dbt_cmd="/usr/local/bin/dbt", + render_config=RenderConfig(exclude=["*orders*"], select=[]), + dbt_cmd="/usr/local/bin/dbt" ) dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL) """ @@ -135,22 +134,17 @@ class DbtGraph: def __init__( self, project: ProjectConfig, + render_config: RenderConfig = RenderConfig(), + execution_config: ExecutionConfig = ExecutionConfig(), profile_config: ProfileConfig | None = None, - exclude: list[str] | None = None, - select: list[str] | None = None, dbt_cmd: str = get_system_dbt(), operator_args: dict[str, Any] | None = None, - dbt_deps: bool | None = True, ): self.project = project - self.exclude = exclude or [] - self.select = select or [] + self.render_config = render_config self.profile_config = profile_config + self.execution_config = execution_config self.operator_args = operator_args or {} - self.dbt_deps = dbt_deps - - # specific to loading using ls - self.dbt_deps = dbt_deps self.dbt_cmd = dbt_cmd def load( @@ -189,15 +183,15 @@ def load( else: load_method[method]() - def run_dbt_ls(self, dbt_project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: + def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" ls_command = [self.dbt_cmd, "ls", "--output", "json"] - if self.exclude: - ls_command.extend(["--exclude", *self.exclude]) + if self.render_config.exclude: + ls_command.extend(["--exclude", *self.render_config.exclude]) - if self.select: - ls_command.extend(["--select", *self.select]) + if self.render_config.select: + ls_command.extend(["--select", *self.render_config.select]) ls_command.extend(self.local_flags) @@ -211,7 +205,7 @@ def run_dbt_ls(self, dbt_project_path: Path, tmp_dir: Path, env_vars: dict[str, for line in logfile: logger.debug(line.strip()) - nodes = parse_dbt_ls_output(dbt_project_path, stdout) + nodes = parse_dbt_ls_output(project_path, stdout) return nodes def load_via_dbt_ls(self) -> None: @@ -226,28 +220,24 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info( - "Trying to parse the dbt project `%s` in `%s` using dbt ls...", - self.project.project_name, - self.project.dbt_project_path, - ) - if self.project.dbt_project_path is None: - raise CosmosLoadDbtException("Unable to dbt ls load a project without a project path.") + logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") + if not self.render_config.project_path or not self.execution_config.project_path: + raise CosmosLoadDbtException( + "Unable to load project via dbt ls without RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path" + ) - if not self.project.dbt_project_path or not self.profile_config: - raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config") + if not self.profile_config: + raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") if not shutil.which(self.dbt_cmd): raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") with tempfile.TemporaryDirectory() as tmpdir: logger.info( - "Content of the dbt project dir <%s>: `%s`", - self.project.dbt_project_path, - os.listdir(self.project.dbt_project_path), + f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" ) tmpdir_path = Path(tmpdir) - create_symlinks(self.project.dbt_project_path, tmpdir_path) + create_symlinks(self.render_config.project_path, tmpdir_path) with self.profile_config.ensure_profile(use_mock_values=True) as profile_values: (profile_path, env_vars) = profile_values @@ -269,13 +259,13 @@ def load_via_dbt_ls(self) -> None: env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir) env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) - if self.dbt_deps: + if self.render_config.dbt_deps: deps_command = [self.dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - nodes = self.run_dbt_ls(self.project.dbt_project_path, tmpdir_path, env) + nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes @@ -299,14 +289,16 @@ def load_via_custom_parser(self) -> None: """ logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.project_name) - if not self.project.dbt_project_path or not self.project.models_path or not self.project.seeds_path: - raise CosmosLoadDbtException("Unable to load dbt project without project files") + if not self.render_config.project_path or not self.execution_config.project_path: + raise CosmosLoadDbtException( + "Unable to load dbt project without RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path" + ) project = LegacyDbtProject( - project_name=self.project.dbt_project_path.stem, - dbt_root_path=self.project.dbt_project_path.parent.as_posix(), - dbt_models_dir=self.project.models_path.stem, - dbt_seeds_dir=self.project.seeds_path.stem, + project_name=self.render_config.project_path.stem, + dbt_root_path=self.render_config.project_path.parent.as_posix(), + dbt_models_dir=self.project.models_path.stem if self.project.models_path else "models", + dbt_seeds_dir=self.project.seeds_path.stem if self.project.seeds_path else "seeds", operator_args=self.operator_args, ) nodes = {} @@ -320,7 +312,11 @@ def load_via_custom_parser(self) -> None: unique_id=model_name, resource_type=DbtResourceType(model.type.value), depends_on=list(model.config.upstream_models), - file_path=model.path, + file_path=Path( + model.path.as_posix().replace( + self.render_config.project_path.as_posix(), self.execution_config.project_path.as_posix() + ) + ), tags=[], config=config, ) @@ -328,7 +324,10 @@ def load_via_custom_parser(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.execution_config.project_path, + nodes=nodes, + select=self.render_config.select, + exclude=self.render_config.exclude, ) self.update_node_dependency() @@ -355,6 +354,9 @@ def load_from_dbt_manifest(self) -> None: if not self.project.is_manifest_available(): raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") + if not self.execution_config.project_path: + raise CosmosLoadDbtException("Unable to load manifest without ExecutionConfig.dbt_project_path") + nodes = {} with open(self.project.manifest_path) as fp: # type: ignore[arg-type] manifest = json.load(fp) @@ -366,9 +368,7 @@ def load_from_dbt_manifest(self) -> None: unique_id=unique_id, resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dbt_project_path / Path(node_dict["original_file_path"]) - if self.project.dbt_project_path - else Path(node_dict["original_file_path"]), + file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], ) @@ -377,7 +377,10 @@ def load_from_dbt_manifest(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.execution_config.project_path, + nodes=nodes, + select=self.render_config.select, + exclude=self.render_config.exclude, ) self.update_node_dependency() diff --git a/docs/configuration/execution-config.rst b/docs/configuration/execution-config.rst index d1094107c..c118590d8 100644 --- a/docs/configuration/execution-config.rst +++ b/docs/configuration/execution-config.rst @@ -1,5 +1,12 @@ Execution Config ================== -Cosmos supports multiple ways of executing your dbt models. -For more information, see the `execution modes <../getting_started/execution-modes.html>`_ page. +Cosmos aims to give you control over how your dbt project is executed when running in airflow. +It does this by exposing a ``cosmos.config.ExecutionConfig`` class that you can use to configure how your DAGs are executed. + +The ``ExecutionConfig`` class takes the following arguments: + +- ``execution_mode``: The way dbt is run when executing within airflow. For more information, see the `execution modes <../getting_started/execution-modes.html>`_ page. +- ``test_indirect_selection``: The mode to configure the test behavior when performing indirect selection. +- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. +- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 04e7f4c02..de0a08cdb 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -11,7 +11,10 @@ The ``RenderConfig`` class takes the following arguments: - ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior `_ section. - ``load_method``: how to load your dbt project. See `Parsing Methods `_ for more information. - ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding `_ for more information. +- ``dbt_deps``: A Boolean to run dbt deps when using dbt ls for dag parsing. Default True - ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below. +- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. +- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index b9215aafa..b108878fc 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -5,7 +5,7 @@ import pytest -from cosmos.config import ProfileConfig, ProjectConfig +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( CosmosLoadDbtException, @@ -56,7 +56,14 @@ def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_f target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:table"]) + render_config = RenderConfig(exclude=["config.materialized:table"]) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph( + project=project_config, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + ) dbt_graph.load_from_dbt_manifest() assert len(dbt_graph.nodes) == 28 @@ -205,8 +212,12 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop assert not (tmp_dbt_project_dir / "logs").exists() project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -228,10 +239,14 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop @pytest.mark.integration def test_load_via_dbt_ls_with_exclude(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, select=["*customers*"], exclude=["*orders*"] + ) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, - select=["*customers*"], - exclude=["*orders*"], + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -273,8 +288,12 @@ def test_load_via_dbt_ls_with_exclude(): @pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) def test_load_via_dbt_ls_without_exclude(project_name): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -292,30 +311,48 @@ def test_load_via_dbt_ls_without_exclude(project_name): def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") - dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) + execution_config = ExecutionConfig() + render_config = RenderConfig() + dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", + project=project_config, + execution_config=execution_config, + render_config=render_config, + ) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_custom_parser() - expected = "Unable to load dbt project without project files" + expected = "Unable to load dbt project without RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path" assert err_info.value.args[0] == expected def test_load_via_dbt_ls_without_profile(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", + project=project_config, + execution_config=execution_config, + render_config=render_config, + ) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to load dbt project without project files and a profile config" + expected = "Unable to load project via dbt ls without a profile config." assert err_info.value.args[0] == expected def test_load_via_dbt_ls_with_invalid_dbt_path(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( dbt_cmd="/inexistent/dbt", project=project_config, + execution_config=execution_config, + render_config=render_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -335,11 +372,12 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): def test_load_via_dbt_ls_with_sources(load_method): project_name = "simple" dbt_graph = DbtGraph( - dbt_deps=False, project=ProjectConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=SAMPLE_MANIFEST_SOURCE if load_method == "load_from_dbt_manifest" else None, ), + render_config=RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False), + execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( profile_name="simple", target_name="dev", @@ -355,9 +393,12 @@ def test_load_via_dbt_ls_with_sources(load_method): @pytest.mark.integration def test_load_via_dbt_ls_without_dbt_deps(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_deps=False) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - dbt_deps=False, project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -371,7 +412,7 @@ def test_load_via_dbt_ls_without_dbt_deps(): with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True." + expected = "Unable to run dbt ls command due to missing dbt_packages. Set RenderConfig.dbt_deps=True." assert err_info.value.args[0] == expected @@ -382,8 +423,12 @@ def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr(mock_popen, t mock_popen().returncode = 0 project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -404,8 +449,12 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): mock_popen().returncode = 1 project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -425,8 +474,12 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): # It may seem strange, but at least until dbt 1.6.0, there are circumstances when it outputs errors to stdout project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, + render_config=render_config, + execution_config=execution_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -446,12 +499,19 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): @pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) def test_load_via_load_via_custom_parser(project_name): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) + execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) profile_config = ProfileConfig( profile_name="test", target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) + dbt_graph = DbtGraph( + project=project_config, + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + ) dbt_graph.load_via_custom_parser() @@ -464,12 +524,13 @@ def test_update_node_dependency_called(mock_update_node_dependency): project_config = ProjectConfig( dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST ) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) profile_config = ProfileConfig( profile_name="test", target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) + dbt_graph = DbtGraph(project=project_config, execution_config=execution_config, profile_config=profile_config) dbt_graph.load() assert mock_update_node_dependency.called @@ -484,7 +545,8 @@ def test_update_node_dependency_target_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph(project=project_config, execution_config=execution_config, profile_config=profile_config) dbt_graph.load() for _, nodes in dbt_graph.nodes.items(): @@ -502,7 +564,14 @@ def test_update_node_dependency_test_not_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:test"]) + render_config = RenderConfig(exclude=["config.materialized:test"]) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph( + project=project_config, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + ) dbt_graph.load_from_dbt_manifest() for _, nodes in dbt_graph.filtered_nodes.items(): @@ -518,7 +587,14 @@ def test_tag_selected_node_test_exist(): target_name="test", profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, select=["tag:test_tag"]) + render_config = RenderConfig(select=["tag:test_tag"]) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph( + project=project_config, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + ) dbt_graph.load_from_dbt_manifest() assert len(dbt_graph.filtered_nodes) > 0 @@ -537,6 +613,8 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method): dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", manifest_path=SAMPLE_MANIFEST_MODEL_VERSION if load_method == "load_from_dbt_manifest" else None, ), + render_config=RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), + execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version"), profile_config=ProfileConfig( profile_name="default", target_name="default", diff --git a/tests/test_config.py b/tests/test_config.py index c0ae32101..9eec48055 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -36,17 +36,14 @@ def test_init_with_manifest_path_and_project_path_succeeds(): def test_init_with_no_params(): """ - The constructor now validates that the required base fields are present - As such, we should test here that the correct exception is raised if these are not correctly defined - This functionality has been moved from the validate method + With the implementation of dbt_project_path in RenderConfig and ExecutionConfig + dbt_project_path becomes optional here. The only requirement is that if one of + manifest_path or project_name is defined, they should both be defined. + We used to enforce dbt_project_path or manifest_path and project_name, but this is + No longer the case """ - with pytest.raises(CosmosValueError) as err_info: - ProjectConfig() - print(err_info.value.args[0]) - assert err_info.value.args[0] == ( - "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." - "If only manifest_path is defined, project_name must also be defined." - ) + project_config = ProjectConfig() + assert project_config def test_init_with_manifest_path_and_not_project_path_and_not_project_name_fails(): @@ -55,11 +52,9 @@ def test_init_with_manifest_path_and_not_project_path_and_not_project_name_fails """ with pytest.raises(CosmosValueError) as err_info: ProjectConfig(manifest_path=DBT_PROJECTS_ROOT_DIR / "manifest.json") - print(err_info.value.args[0]) - assert err_info.value.args[0] == ( - "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." - "If only manifest_path is defined, project_name must also be defined." - ) + assert err_info.value.args[0] == ( + "If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined." + ) def test_validate_with_project_path_and_manifest_path_succeeds(): @@ -95,7 +90,7 @@ def test_validate_project_missing_fails(): project_config = ProjectConfig(dbt_project_path=Path("/tmp")) with pytest.raises(CosmosValueError) as err_info: assert project_config.validate_project() is None - assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" + assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" def test_is_manifest_available_is_true(): @@ -118,13 +113,11 @@ def test_project_name(): def test_profile_config_post_init(): with pytest.raises(CosmosValueError) as err_info: ProfileConfig(profiles_yml_filepath="/tmp/some-profile", profile_name="test", target_name="test") - assert err_info.value.args[0] == "The file /tmp/some-profile does not exist." + assert err_info.value.args[0] == "The file /tmp/some-profile does not exist." def test_profile_config_validate(): with pytest.raises(CosmosValueError) as err_info: profile_config = ProfileConfig(profile_name="test", target_name="test") assert profile_config.validate_profile() is None - assert ( - err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" - ) + assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" diff --git a/tests/test_converter.py b/tests/test_converter.py index 5fb50a00f..5d89513b3 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -113,7 +113,7 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut ) @patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) @patch("cosmos.converter.DbtGraph.load") -def test_converter_fails_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): +def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): """ This test validates that a project, given a manifest path and project name, with seeds is able to successfully generate a converter @@ -135,4 +135,83 @@ def test_converter_fails_no_project_dir(mock_load_dbt_graph, execution_mode, ope render_config=render_config, operator_args=operator_args, ) - assert err_info.value.args[0] == "A Project Path in ProjectConfig is required for generating a Task Operators." + assert ( + err_info.value.args[0] + == "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." + ) + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_fails_project_config_path_and_execution_config_path( + mock_load_dbt_graph, execution_mode, operator_args +): + """ + This test ensures that we fail if we defined project path in ProjectConfig and ExecutionConfig + They are mutually exclusive, so this should be allowed. + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + execution_config = ExecutionConfig(execution_mode=execution_mode, dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + with pytest.raises(CosmosValueError) as err_info: + DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert ( + err_info.value.args[0] + == "ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path.If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None" + ) + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_fails_no_manifest_no_render_config(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test ensures that we fail if we define project path in ProjectConfig and ExecutionConfig + They are mutually exclusive, so this should be allowed. + """ + project_config = ProjectConfig() + execution_config = ExecutionConfig(execution_mode=execution_mode, dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + with pytest.raises(CosmosValueError) as err_info: + DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert ( + err_info.value.args[0] + == "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." + )