diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 146e07743..5e76c4a98 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -47,20 +47,20 @@ repos: - id: remove-tabs exclude: ^docs/make.bat$|^docs/Makefile$|^dev/dags/dbt/jaffle_shop/seeds/raw_orders.csv$ - repo: https://github.com/asottile/pyupgrade - rev: v3.15.1 + rev: v3.15.2 hooks: - id: pyupgrade args: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.2.2 + rev: v0.3.5 hooks: - id: ruff args: - --fix - repo: https://github.com/psf/black - rev: 24.2.0 + rev: 24.3.0 hooks: - id: black args: ["--config", "./pyproject.toml"] @@ -71,7 +71,7 @@ repos: alias: black additional_dependencies: [black>=22.10.0] - repo: https://github.com/pre-commit/mirrors-mypy - rev: "v1.8.0" + rev: "v1.9.0" hooks: - id: mypy diff --git a/cosmos/converter.py b/cosmos/converter.py index 3619fecf5..fdf4d8e42 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -234,14 +234,14 @@ def __init__( # To keep this logic working, if converter is given no ProfileConfig, # we can create a default retaining this value to preserve this functionality. # We may want to consider defaulting this value in our actual ProjceConfig class? - dbt_graph = DbtGraph( + self.dbt_graph = DbtGraph( project=project_config, render_config=render_config, execution_config=execution_config, profile_config=profile_config, dbt_vars=dbt_vars, ) - dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) + self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) task_args = { **operator_args, @@ -266,7 +266,7 @@ def __init__( ) build_airflow_graph( - nodes=dbt_graph.filtered_nodes, + nodes=self.dbt_graph.filtered_nodes, dag=dag or (task_group and task_group.dag), task_group=task_group, execution_mode=execution_config.execution_mode, diff --git a/cosmos/operators/azure_container_instance.py b/cosmos/operators/azure_container_instance.py index 397a47551..d8427b2fb 100644 --- a/cosmos/operators/azure_container_instance.py +++ b/cosmos/operators/azure_container_instance.py @@ -28,7 +28,7 @@ ) -class DbtAzureContainerInstanceBaseOperator(AzureContainerInstancesOperator, AbstractDbtBaseOperator): # type: ignore +class DbtAzureContainerInstanceBaseOperator(AbstractDbtBaseOperator, AzureContainerInstancesOperator): # type: ignore """ Executes a dbt core cli command in an Azure Container Instance """ @@ -66,7 +66,7 @@ def __init__( def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: self.build_command(context, cmd_flags) self.log.info(f"Running command: {self.command}") - result = super().execute(context) + result = AzureContainerInstancesOperator.execute(self, context) logger.info(result) def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None: @@ -79,13 +79,13 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> self.command: list[str] = dbt_cmd -class DbtLSAzureContainerInstanceOperator(DbtLSMixin, DbtAzureContainerInstanceBaseOperator): +class DbtLSAzureContainerInstanceOperator(DbtLSMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore """ Executes a dbt core ls command. """ -class DbtSeedAzureContainerInstanceOperator(DbtSeedMixin, DbtAzureContainerInstanceBaseOperator): +class DbtSeedAzureContainerInstanceOperator(DbtSeedMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore """ Executes a dbt core seed command. @@ -95,14 +95,14 @@ class DbtSeedAzureContainerInstanceOperator(DbtSeedMixin, DbtAzureContainerInsta template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator] -class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContainerInstanceBaseOperator): +class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore """ Executes a dbt core snapshot command. """ -class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator): +class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore """ Executes a dbt core run command. """ @@ -110,7 +110,7 @@ class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanc template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator] -class DbtTestAzureContainerInstanceOperator(DbtTestMixin, DbtAzureContainerInstanceBaseOperator): +class DbtTestAzureContainerInstanceOperator(DbtTestMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore """ Executes a dbt core test command. """ @@ -121,7 +121,7 @@ def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwar self.on_warning_callback = on_warning_callback -class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzureContainerInstanceBaseOperator): +class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore """ Executes a dbt core run-operation command. diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 13c99431a..b6e1797d8 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -43,10 +43,11 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) - :param append_env: If False(default) uses the environment variables passed in env params - and does not inherit the current process environment. If True, inherits the environment variables + :param append_env: . If True (default), inherits the environment variables from current passes and then environment variable passed by the user will either update the existing - inherited environment variables or the new variables gets appended to it + inherited environment variables or the new variables gets appended to it. + If False, only uses the environment variables passed in env params + and does not inherit the current process environment. :param output_encoding: Output encoding of bash command :param skip_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: 99). If set to ``None``, any non-zero @@ -99,7 +100,7 @@ def __init__( db_name: str | None = None, schema: str | None = None, env: dict[str, Any] | None = None, - append_env: bool = False, + append_env: bool = True, output_encoding: str = "utf-8", skip_exit_code: int = 99, partial_parse: bool = True, diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index e6a09748f..995273a91 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -298,6 +298,8 @@ def run_command( env.update(env_vars) flags = [ + "--project-dir", + str(tmp_project_dir), "--profiles-dir", str(profile_path.parent), "--profile", @@ -568,7 +570,7 @@ class DbtDocsLocalOperator(DbtLocalBaseOperator): """ ui_color = "#8194E0" - required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json"] + required_files = ["index.html", "manifest.json", "catalog.json"] base_cmd = ["docs", "generate"] def __init__(self, **kwargs: Any) -> None: @@ -576,11 +578,13 @@ def __init__(self, **kwargs: Any) -> None: self.check_static_flag() def check_static_flag(self) -> None: - flag = "--static" if self.dbt_cmd_flags: - if flag in self.dbt_cmd_flags: + if "--static" in self.dbt_cmd_flags: # For the --static flag we only upload the generated static_index.html file self.required_files = ["static_index.html"] + if self.dbt_cmd_global_flags: + if "--no-write-json" in self.dbt_cmd_global_flags and "graph.gpickle" in self.required_files: + self.required_files.remove("graph.gpickle") class DbtDocsCloudLocalOperator(DbtDocsLocalOperator, ABC): diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index b17772b88..cbe8c67e9 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -37,6 +37,7 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): :param py_requirements: If defined, creates a virtual environment with the specified dependencies. Example: ["dbt-postgres==1.5.0"] + :param pip_install_options: Pip options to use when installing Python dependencies. Example: ["--upgrade", "--no-cache-dir"] :param py_system_site_packages: Whether or not all the Python packages from the Airflow instance will be accessible within the virtual environment (if py_requirements argument is specified). Avoid using unless the dbt job requires it. @@ -45,10 +46,12 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): def __init__( self, py_requirements: list[str] | None = None, + pip_install_options: list[str] | None = None, py_system_site_packages: bool = False, **kwargs: Any, ) -> None: self.py_requirements = py_requirements or [] + self.pip_install_options = pip_install_options or [] self.py_system_site_packages = py_system_site_packages super().__init__(**kwargs) self._venv_tmp_dir: None | TemporaryDirectory[str] = None @@ -61,7 +64,7 @@ def venv_dbt_path( Path to the dbt binary within a Python virtualenv. The first time this property is called, it creates a virtualenv and installs the dependencies based on the - self.py_requirements and self.py_system_site_packages. This value is cached for future calls. + self.py_requirements, self.pip_install_options, and self.py_system_site_packages. This value is cached for future calls. """ # We are reusing the virtualenv directory for all subprocess calls within this task/operator. # For this reason, we are not using contexts at this point. @@ -72,6 +75,7 @@ def venv_dbt_path( python_bin=PY_INTERPRETER, system_site_packages=self.py_system_site_packages, requirements=self.py_requirements, + pip_install_options=self.pip_install_options, ) dbt_binary = Path(py_interpreter).parent / "dbt" cmd_output = self.subprocess_hook.run_command( diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py index 48061b254..b82b29f5c 100644 --- a/cosmos/plugin/__init__.py +++ b/cosmos/plugin/__init__.py @@ -136,7 +136,7 @@ def open_file(path: str) -> str: // Model page getMaxElement('bottom', 'section.section') + 75, // Search page - getMaxElement('bottom', 'div.result-body') + 110 + getMaxElement('bottom', 'div.result-body') + 125 ) } } diff --git a/dev/dags/example_cosmos_sources.py b/dev/dags/example_cosmos_sources.py index 1a85b6d9f..346f37370 100644 --- a/dev/dags/example_cosmos_sources.py +++ b/dev/dags/example_cosmos_sources.py @@ -17,7 +17,11 @@ from pathlib import Path from airflow.models.dag import DAG -from airflow.operators.dummy import DummyOperator + +try: # available since Airflow 2.4.0 + from airflow.operators.empty import EmptyOperator +except ImportError: + from airflow.operators.dummy import DummyOperator as EmptyOperator from airflow.utils.task_group import TaskGroup from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig @@ -38,21 +42,21 @@ # [START custom_dbt_nodes] -# Cosmos will use this function to generate a DummyOperator task when it finds a source node, in the manifest. +# Cosmos will use this function to generate an empty task when it finds a source node, in the manifest. # A more realistic use case could be to use an Airflow sensor to represent a source. def convert_source(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs): """ - Return an instance of DummyOperator to represent a dbt "source" node. + Return an instance of a desired operator to represent a dbt "source" node. """ - return DummyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_source") + return EmptyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_source") -# Cosmos will use this function to generate a DummyOperator task when it finds a exposure node, in the manifest. +# Cosmos will use this function to generate an empty task when it finds a exposure node, in the manifest. def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs): """ - Return an instance of DummyOperator to represent a dbt "exposure" node. + Return an instance of a desired operator to represent a dbt "exposure" node. """ - return DummyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_exposure") + return EmptyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_exposure") # Use `RenderConfig` to tell Cosmos, given a node type, how to convert a dbt node into an Airflow task or task group. diff --git a/docs/configuration/hosting-docs.rst b/docs/configuration/hosting-docs.rst index 5143a9f67..2ab4fdf69 100644 --- a/docs/configuration/hosting-docs.rst +++ b/docs/configuration/hosting-docs.rst @@ -105,11 +105,29 @@ For example, if your dbt project directory is ``/usr/local/airflow/dags/my_dbt_p AIRFLOW__COSMOS__DBT_DOCS_DIR="/usr/local/airflow/dags/my_dbt_project/target" -Using docs out of local storage has the downside that some values in the dbt docs can become stale unless the docs are periodically refreshed and redeployed: +Using docs out of local storage has a couple downsides. First, some values in the dbt docs can become stale, unless the docs are periodically refreshed and redeployed: - Counts of the numbers of rows. - The compiled SQL for incremental models before and after the first run. +Second, deployment from local storage may only be partially compatible with some managed Airflow systems. +Compatibility will depend on the managed Airflow system, as each one works differently. + +For example, Astronomer does not update the resources available to the webserver instance when ``--dags`` is specified during deployment, meaning that the dbt dcs will not be updated when this flag is used. + +.. note:: + Managed Airflow on Astronomer Cloud does not provide the webserver access to the DAGs folder. + If you want to host your docs in local storage with Astro, you should host them in a directory other than ``dags/``. + For example, you can set your ``AIRFLOW__COSMOS__DBT_DOCS_DIR`` to ``/usr/local/airflow/dbt_docs_dir`` with the following pre-deployment script: + + .. code-block:: bash + + dbt docs generate + mkdir dbt_docs_dir + cp dags/dbt/target/manifest.json dbt_docs_dir/manifest.json + cp dags/dbt/target/catalog.json dbt_docs_dir/catalog.json + cp dags/dbt/target/index.html dbt_docs_dir/index.html + Host from HTTP/HTTPS ~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/contributing.rst b/docs/contributing.rst index 3e2ab6fe3..56826cfc8 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst @@ -35,8 +35,8 @@ Then install ``airflow`` and ``astronomer-cosmos`` using python-venv: .. code-block:: bash python3 -m venv env && source env/bin/activate - pip3 install apache-airflow[cncf.kubernetes,openlineage] - pip3 install -e .[dbt-postgres,dbt-databricks] + pip3 install "apache-airflow[cncf.kubernetes,openlineage]" + pip3 install -e ".[dbt-postgres,dbt-databricks]" Set airflow home to the ``dev/`` directory and disabled loading example DAGs: diff --git a/docs/getting_started/gcc.rst b/docs/getting_started/gcc.rst index 5baa9c37e..ed4b931ce 100644 --- a/docs/getting_started/gcc.rst +++ b/docs/getting_started/gcc.rst @@ -49,6 +49,8 @@ In your ``my_cosmos_dag.py`` file, import the ``DbtDag`` class from Cosmos and c Make sure to rename the ```` value below to your adapter's Python package (i.e. ``dbt-snowflake`` or ``dbt-bigquery``) +If you need to modify the pip install options, you can do so by adding ``pip_install_options`` to the ``operator_args`` dictionary. For example, if you wanted to install packages from local wheels you could set it too: ``["--no-index", "--find-links=/path/to/wheels"]``. All options can be found here: + .. code-block:: python from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig diff --git a/tests/operators/test_azure_container_instance.py b/tests/operators/test_azure_container_instance.py index 01fa3e20e..84d733ce3 100644 --- a/tests/operators/test_azure_container_instance.py +++ b/tests/operators/test_azure_container_instance.py @@ -53,6 +53,7 @@ def test_dbt_azure_container_instance_operator_get_env(p_context_to_airflow_vars name="my-aci", resource_group="my-rg", project_dir="my/dir", + append_env=False, ) dbt_base_operator.env = { "start_date": "20220101", @@ -90,6 +91,7 @@ def test_dbt_azure_container_instance_operator_check_environment_variables( resource_group="my-rg", project_dir="my/dir", environment_variables={"FOO": "BAR"}, + append_env=False, ) dbt_base_operator.env = { "start_date": "20220101", @@ -143,3 +145,25 @@ def test_dbt_azure_container_instance_build_command(): "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", "--no-version-check", ] + + +@patch("cosmos.operators.azure_container_instance.AzureContainerInstancesOperator.execute") +def test_dbt_azure_container_instance_build_and_run_cmd(mock_execute): + dbt_base_operator = ConcreteDbtAzureContainerInstanceOperator( + ci_conn_id="my_airflow_connection", + task_id="my-task", + image="my_image", + region="Mordor", + name="my-aci", + resource_group="my-rg", + project_dir="my/dir", + environment_variables={"FOO": "BAR"}, + ) + mock_build_command = MagicMock() + dbt_base_operator.build_command = mock_build_command + + mock_context = MagicMock() + dbt_base_operator.build_and_run_cmd(context=mock_context) + + mock_build_command.assert_called_with(mock_context, None) + mock_execute.assert_called_once_with(dbt_base_operator, mock_context) diff --git a/tests/operators/test_docker.py b/tests/operators/test_docker.py index ad3ec5485..2cfb6b835 100644 --- a/tests/operators/test_docker.py +++ b/tests/operators/test_docker.py @@ -73,10 +73,7 @@ def test_dbt_docker_operator_get_env(p_context_to_airflow_vars: MagicMock, base_ If an end user passes in a """ dbt_base_operator = base_operator( - conn_id="my_airflow_connection", - task_id="my-task", - image="my_image", - project_dir="my/dir", + conn_id="my_airflow_connection", task_id="my-task", image="my_image", project_dir="my/dir", append_env=False ) dbt_base_operator.env = { "start_date": "20220101", diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 75739111f..d0be2acad 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -81,10 +81,7 @@ def test_dbt_kubernetes_operator_get_env(p_context_to_airflow_vars: MagicMock, b If an end user passes in a """ dbt_kube_operator = base_operator( - conn_id="my_airflow_connection", - task_id="my-task", - image="my_image", - project_dir="my/dir", + conn_id="my_airflow_connection", task_id="my-task", image="my_image", project_dir="my/dir", append_env=False ) dbt_kube_operator.env = { "start_date": "20220101", @@ -254,7 +251,7 @@ def cleanup(pod: str, remote_pod: str): def test_created_pod(): - ls_kwargs = {"env_vars": {"FOO": "BAR"}, "namespace": "foo"} + ls_kwargs = {"env_vars": {"FOO": "BAR"}, "namespace": "foo", "append_env": False} ls_kwargs.update(base_kwargs) ls_operator = DbtLSKubernetesOperator(**ls_kwargs) ls_operator.hook = MagicMock() diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 8b1893a67..0b080feb8 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -332,9 +332,7 @@ def test_dbt_base_operator_get_env(p_context_to_airflow_vars: MagicMock) -> None If an end user passes in a """ dbt_base_operator = ConcreteDbtLocalBaseOperator( - profile_config=profile_config, - task_id="my-task", - project_dir="my/dir", + profile_config=profile_config, task_id="my-task", project_dir="my/dir", append_env=False ) dbt_base_operator.env = { "start_date": "20220101", @@ -720,18 +718,26 @@ def test_dbt_docs_gcs_local_operator(): @patch("cosmos.config.ProfileConfig.ensure_profile") @patch("cosmos.operators.local.DbtLocalBaseOperator.run_subprocess") @patch("cosmos.operators.local.DbtLocalBaseOperator.run_dbt_runner") +@patch("cosmos.operators.local.tempfile.TemporaryDirectory") @pytest.mark.parametrize("invocation_mode", [InvocationMode.SUBPROCESS, InvocationMode.DBT_RUNNER]) def test_operator_execute_deps_parameters( + mock_temporary_directory, mock_dbt_runner, mock_subprocess, mock_ensure_profile, mock_exception_handling, mock_store_compiled_sql, invocation_mode, + tmp_path, ): + project_dir = tmp_path / "mock_project_tmp_dir" + project_dir.mkdir() + expected_call_kwargs = [ "/usr/local/bin/dbt", "deps", + "--project-dir", + project_dir.as_posix(), "--profiles-dir", "/path/to", "--profile", @@ -749,6 +755,7 @@ def test_operator_execute_deps_parameters( invocation_mode=invocation_mode, ) mock_ensure_profile.return_value.__enter__.return_value = (Path("/path/to/profile"), {"ENV_VAR": "value"}) + mock_temporary_directory.return_value.__enter__.return_value = project_dir.as_posix() task.execute(context={"task_instance": MagicMock()}) if invocation_mode == InvocationMode.SUBPROCESS: assert mock_subprocess.call_args_list[0].kwargs["command"] == expected_call_kwargs @@ -767,6 +774,21 @@ def test_dbt_docs_local_operator_with_static_flag(): assert operator.required_files == ["static_index.html"] +def test_dbt_docs_local_operator_ignores_graph_gpickle(): + # Check when --no-write-json is passed, graph.gpickle is removed. + # This is only currently relevant for subclasses, but will become more generally relevant in the future. + class CustomDbtDocsLocalOperator(DbtDocsLocalOperator): + required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json"] + + operator = CustomDbtDocsLocalOperator( + task_id="fake-task", + project_dir="fake-dir", + profile_config=profile_config, + dbt_cmd_global_flags=["--no-write-json"], + ) + assert operator.required_files == ["index.html", "manifest.json", "catalog.json"] + + @patch("cosmos.hooks.subprocess.FullOutputSubprocessHook.send_sigint") def test_dbt_local_operator_on_kill_sigint(mock_send_sigint) -> None: diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 1610d2707..5866347ab 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -50,6 +50,7 @@ def test_run_command( install_deps=True, project_dir="./dev/dags/dbt/jaffle_shop", py_system_site_packages=False, + pip_install_options=["--test-flag"], py_requirements=["dbt-postgres==1.6.0b1"], emit_datasets=False, invocation_mode=InvocationMode.SUBPROCESS, diff --git a/tests/test_converter.py b/tests/test_converter.py index f5dfe68b9..10dc37f13 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -8,7 +8,7 @@ from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DbtResourceType, ExecutionMode, InvocationMode, LoadMode from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config -from cosmos.dbt.graph import DbtNode +from cosmos.dbt.graph import DbtGraph, DbtNode from cosmos.exceptions import CosmosValueError from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping @@ -468,3 +468,34 @@ def test_converter_invocation_mode_added_to_task_args( assert kwargs["task_args"]["invocation_mode"] == invocation_mode else: assert "invocation_mode" not in kwargs["task_args"] + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_contains_dbt_graph(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test validates that DbtToAirflowConverter contains and exposes a DbtGraph instance + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert isinstance(converter.dbt_graph, DbtGraph)