Skip to content

Commit

Permalink
Merge branch 'main' into feature/global_flag_list
Browse files Browse the repository at this point in the history
  • Loading branch information
ms32035 authored Apr 5, 2024
2 parents c76529d + 7c4d7d6 commit 42a5ca4
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 47 deletions.
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ repos:
- id: remove-tabs
exclude: ^docs/make.bat$|^docs/Makefile$|^dev/dags/dbt/jaffle_shop/seeds/raw_orders.csv$
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.1
rev: v3.15.2
hooks:
- id: pyupgrade
args:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.2.2
rev: v0.3.5
hooks:
- id: ruff
args:
- --fix
- repo: https://github.com/psf/black
rev: 24.2.0
rev: 24.3.0
hooks:
- id: black
args: ["--config", "./pyproject.toml"]
Expand All @@ -71,7 +71,7 @@ repos:
alias: black
additional_dependencies: [black>=22.10.0]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.8.0"
rev: "v1.9.0"

hooks:
- id: mypy
Expand Down
6 changes: 3 additions & 3 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ def __init__(
# To keep this logic working, if converter is given no ProfileConfig,
# we can create a default retaining this value to preserve this functionality.
# We may want to consider defaulting this value in our actual ProjceConfig class?
dbt_graph = DbtGraph(
self.dbt_graph = DbtGraph(
project=project_config,
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
dbt_vars=dbt_vars,
)
dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)

task_args = {
**operator_args,
Expand All @@ -266,7 +266,7 @@ def __init__(
)

build_airflow_graph(
nodes=dbt_graph.filtered_nodes,
nodes=self.dbt_graph.filtered_nodes,
dag=dag or (task_group and task_group.dag),
task_group=task_group,
execution_mode=execution_config.execution_mode,
Expand Down
16 changes: 8 additions & 8 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
)


class DbtAzureContainerInstanceBaseOperator(AzureContainerInstancesOperator, AbstractDbtBaseOperator): # type: ignore
class DbtAzureContainerInstanceBaseOperator(AbstractDbtBaseOperator, AzureContainerInstancesOperator): # type: ignore
"""
Executes a dbt core cli command in an Azure Container Instance
"""
Expand Down Expand Up @@ -66,7 +66,7 @@ def __init__(
def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None:
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
result = super().execute(context)
result = AzureContainerInstancesOperator.execute(self, context)
logger.info(result)

def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
Expand All @@ -79,13 +79,13 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) ->
self.command: list[str] = dbt_cmd


class DbtLSAzureContainerInstanceOperator(DbtLSMixin, DbtAzureContainerInstanceBaseOperator):
class DbtLSAzureContainerInstanceOperator(DbtLSMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore
"""
Executes a dbt core ls command.
"""


class DbtSeedAzureContainerInstanceOperator(DbtSeedMixin, DbtAzureContainerInstanceBaseOperator):
class DbtSeedAzureContainerInstanceOperator(DbtSeedMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore
"""
Executes a dbt core seed command.
Expand All @@ -95,22 +95,22 @@ class DbtSeedAzureContainerInstanceOperator(DbtSeedMixin, DbtAzureContainerInsta
template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]


class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContainerInstanceBaseOperator):
class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore
"""
Executes a dbt core snapshot command.
"""


class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator):
class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore
"""
Executes a dbt core run command.
"""

template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]


class DbtTestAzureContainerInstanceOperator(DbtTestMixin, DbtAzureContainerInstanceBaseOperator):
class DbtTestAzureContainerInstanceOperator(DbtTestMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore
"""
Executes a dbt core test command.
"""
Expand All @@ -121,7 +121,7 @@ def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwar
self.on_warning_callback = on_warning_callback


class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzureContainerInstanceBaseOperator):
class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzureContainerInstanceBaseOperator): # type: ignore
"""
Executes a dbt core run-operation command.
Expand Down
9 changes: 5 additions & 4 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
environment variables for the new process; these are used instead
of inheriting the current process environment, which is the default
behavior. (templated)
:param append_env: If False(default) uses the environment variables passed in env params
and does not inherit the current process environment. If True, inherits the environment variables
:param append_env: . If True (default), inherits the environment variables
from current passes and then environment variable passed by the user will either update the existing
inherited environment variables or the new variables gets appended to it
inherited environment variables or the new variables gets appended to it.
If False, only uses the environment variables passed in env params
and does not inherit the current process environment.
:param output_encoding: Output encoding of bash command
:param skip_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: 99). If set to ``None``, any non-zero
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(
db_name: str | None = None,
schema: str | None = None,
env: dict[str, Any] | None = None,
append_env: bool = False,
append_env: bool = True,
output_encoding: str = "utf-8",
skip_exit_code: int = 99,
partial_parse: bool = True,
Expand Down
10 changes: 7 additions & 3 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ def run_command(
env.update(env_vars)

flags = [
"--project-dir",
str(tmp_project_dir),
"--profiles-dir",
str(profile_path.parent),
"--profile",
Expand Down Expand Up @@ -568,19 +570,21 @@ class DbtDocsLocalOperator(DbtLocalBaseOperator):
"""

ui_color = "#8194E0"
required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json"]
required_files = ["index.html", "manifest.json", "catalog.json"]
base_cmd = ["docs", "generate"]

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.check_static_flag()

def check_static_flag(self) -> None:
flag = "--static"
if self.dbt_cmd_flags:
if flag in self.dbt_cmd_flags:
if "--static" in self.dbt_cmd_flags:
# For the --static flag we only upload the generated static_index.html file
self.required_files = ["static_index.html"]
if self.dbt_cmd_global_flags:
if "--no-write-json" in self.dbt_cmd_global_flags and "graph.gpickle" in self.required_files:
self.required_files.remove("graph.gpickle")


class DbtDocsCloudLocalOperator(DbtDocsLocalOperator, ABC):
Expand Down
6 changes: 5 additions & 1 deletion cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
:param py_requirements: If defined, creates a virtual environment with the specified dependencies. Example:
["dbt-postgres==1.5.0"]
:param pip_install_options: Pip options to use when installing Python dependencies. Example: ["--upgrade", "--no-cache-dir"]
:param py_system_site_packages: Whether or not all the Python packages from the Airflow instance will be accessible
within the virtual environment (if py_requirements argument is specified).
Avoid using unless the dbt job requires it.
Expand All @@ -45,10 +46,12 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
def __init__(
self,
py_requirements: list[str] | None = None,
pip_install_options: list[str] | None = None,
py_system_site_packages: bool = False,
**kwargs: Any,
) -> None:
self.py_requirements = py_requirements or []
self.pip_install_options = pip_install_options or []
self.py_system_site_packages = py_system_site_packages
super().__init__(**kwargs)
self._venv_tmp_dir: None | TemporaryDirectory[str] = None
Expand All @@ -61,7 +64,7 @@ def venv_dbt_path(
Path to the dbt binary within a Python virtualenv.
The first time this property is called, it creates a virtualenv and installs the dependencies based on the
self.py_requirements and self.py_system_site_packages. This value is cached for future calls.
self.py_requirements, self.pip_install_options, and self.py_system_site_packages. This value is cached for future calls.
"""
# We are reusing the virtualenv directory for all subprocess calls within this task/operator.
# For this reason, we are not using contexts at this point.
Expand All @@ -72,6 +75,7 @@ def venv_dbt_path(
python_bin=PY_INTERPRETER,
system_site_packages=self.py_system_site_packages,
requirements=self.py_requirements,
pip_install_options=self.pip_install_options,
)
dbt_binary = Path(py_interpreter).parent / "dbt"
cmd_output = self.subprocess_hook.run_command(
Expand Down
2 changes: 1 addition & 1 deletion cosmos/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def open_file(path: str) -> str:
// Model page
getMaxElement('bottom', 'section.section') + 75,
// Search page
getMaxElement('bottom', 'div.result-body') + 110
getMaxElement('bottom', 'div.result-body') + 125
)
}
}
Expand Down
18 changes: 11 additions & 7 deletions dev/dags/example_cosmos_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from pathlib import Path

from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator

try: # available since Airflow 2.4.0
from airflow.operators.empty import EmptyOperator
except ImportError:
from airflow.operators.dummy import DummyOperator as EmptyOperator
from airflow.utils.task_group import TaskGroup

from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig
Expand All @@ -38,21 +42,21 @@


# [START custom_dbt_nodes]
# Cosmos will use this function to generate a DummyOperator task when it finds a source node, in the manifest.
# Cosmos will use this function to generate an empty task when it finds a source node, in the manifest.
# A more realistic use case could be to use an Airflow sensor to represent a source.
def convert_source(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
"""
Return an instance of DummyOperator to represent a dbt "source" node.
Return an instance of a desired operator to represent a dbt "source" node.
"""
return DummyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_source")
return EmptyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_source")


# Cosmos will use this function to generate a DummyOperator task when it finds a exposure node, in the manifest.
# Cosmos will use this function to generate an empty task when it finds a exposure node, in the manifest.
def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
"""
Return an instance of DummyOperator to represent a dbt "exposure" node.
Return an instance of a desired operator to represent a dbt "exposure" node.
"""
return DummyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_exposure")
return EmptyOperator(dag=dag, task_group=task_group, task_id=f"{node.name}_exposure")


# Use `RenderConfig` to tell Cosmos, given a node type, how to convert a dbt node into an Airflow task or task group.
Expand Down
20 changes: 19 additions & 1 deletion docs/configuration/hosting-docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,29 @@ For example, if your dbt project directory is ``/usr/local/airflow/dags/my_dbt_p
AIRFLOW__COSMOS__DBT_DOCS_DIR="/usr/local/airflow/dags/my_dbt_project/target"
Using docs out of local storage has the downside that some values in the dbt docs can become stale unless the docs are periodically refreshed and redeployed:
Using docs out of local storage has a couple downsides. First, some values in the dbt docs can become stale, unless the docs are periodically refreshed and redeployed:

- Counts of the numbers of rows.
- The compiled SQL for incremental models before and after the first run.

Second, deployment from local storage may only be partially compatible with some managed Airflow systems.
Compatibility will depend on the managed Airflow system, as each one works differently.

For example, Astronomer does not update the resources available to the webserver instance when ``--dags`` is specified during deployment, meaning that the dbt dcs will not be updated when this flag is used.

.. note::
Managed Airflow on Astronomer Cloud does not provide the webserver access to the DAGs folder.
If you want to host your docs in local storage with Astro, you should host them in a directory other than ``dags/``.
For example, you can set your ``AIRFLOW__COSMOS__DBT_DOCS_DIR`` to ``/usr/local/airflow/dbt_docs_dir`` with the following pre-deployment script:

.. code-block:: bash
dbt docs generate
mkdir dbt_docs_dir
cp dags/dbt/target/manifest.json dbt_docs_dir/manifest.json
cp dags/dbt/target/catalog.json dbt_docs_dir/catalog.json
cp dags/dbt/target/index.html dbt_docs_dir/index.html
Host from HTTP/HTTPS
~~~~~~~~~~~~~~~~~~~~

Expand Down
4 changes: 2 additions & 2 deletions docs/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ Then install ``airflow`` and ``astronomer-cosmos`` using python-venv:
.. code-block:: bash
python3 -m venv env && source env/bin/activate
pip3 install apache-airflow[cncf.kubernetes,openlineage]
pip3 install -e .[dbt-postgres,dbt-databricks]
pip3 install "apache-airflow[cncf.kubernetes,openlineage]"
pip3 install -e ".[dbt-postgres,dbt-databricks]"
Set airflow home to the ``dev/`` directory and disabled loading example DAGs:

Expand Down
2 changes: 2 additions & 0 deletions docs/getting_started/gcc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ In your ``my_cosmos_dag.py`` file, import the ``DbtDag`` class from Cosmos and c

Make sure to rename the ``<your-adapter>`` value below to your adapter's Python package (i.e. ``dbt-snowflake`` or ``dbt-bigquery``)

If you need to modify the pip install options, you can do so by adding ``pip_install_options`` to the ``operator_args`` dictionary. For example, if you wanted to install packages from local wheels you could set it too: ``["--no-index", "--find-links=/path/to/wheels"]``. All options can be found here: <https://pip.pypa.io/en/stable/cli/pip_install/>

.. code-block:: python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
Expand Down
24 changes: 24 additions & 0 deletions tests/operators/test_azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def test_dbt_azure_container_instance_operator_get_env(p_context_to_airflow_vars
name="my-aci",
resource_group="my-rg",
project_dir="my/dir",
append_env=False,
)
dbt_base_operator.env = {
"start_date": "20220101",
Expand Down Expand Up @@ -90,6 +91,7 @@ def test_dbt_azure_container_instance_operator_check_environment_variables(
resource_group="my-rg",
project_dir="my/dir",
environment_variables={"FOO": "BAR"},
append_env=False,
)
dbt_base_operator.env = {
"start_date": "20220101",
Expand Down Expand Up @@ -143,3 +145,25 @@ def test_dbt_azure_container_instance_build_command():
"start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
"--no-version-check",
]


@patch("cosmos.operators.azure_container_instance.AzureContainerInstancesOperator.execute")
def test_dbt_azure_container_instance_build_and_run_cmd(mock_execute):
dbt_base_operator = ConcreteDbtAzureContainerInstanceOperator(
ci_conn_id="my_airflow_connection",
task_id="my-task",
image="my_image",
region="Mordor",
name="my-aci",
resource_group="my-rg",
project_dir="my/dir",
environment_variables={"FOO": "BAR"},
)
mock_build_command = MagicMock()
dbt_base_operator.build_command = mock_build_command

mock_context = MagicMock()
dbt_base_operator.build_and_run_cmd(context=mock_context)

mock_build_command.assert_called_with(mock_context, None)
mock_execute.assert_called_once_with(dbt_base_operator, mock_context)
5 changes: 1 addition & 4 deletions tests/operators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ def test_dbt_docker_operator_get_env(p_context_to_airflow_vars: MagicMock, base_
If an end user passes in a
"""
dbt_base_operator = base_operator(
conn_id="my_airflow_connection",
task_id="my-task",
image="my_image",
project_dir="my/dir",
conn_id="my_airflow_connection", task_id="my-task", image="my_image", project_dir="my/dir", append_env=False
)
dbt_base_operator.env = {
"start_date": "20220101",
Expand Down
7 changes: 2 additions & 5 deletions tests/operators/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ def test_dbt_kubernetes_operator_get_env(p_context_to_airflow_vars: MagicMock, b
If an end user passes in a
"""
dbt_kube_operator = base_operator(
conn_id="my_airflow_connection",
task_id="my-task",
image="my_image",
project_dir="my/dir",
conn_id="my_airflow_connection", task_id="my-task", image="my_image", project_dir="my/dir", append_env=False
)
dbt_kube_operator.env = {
"start_date": "20220101",
Expand Down Expand Up @@ -254,7 +251,7 @@ def cleanup(pod: str, remote_pod: str):


def test_created_pod():
ls_kwargs = {"env_vars": {"FOO": "BAR"}, "namespace": "foo"}
ls_kwargs = {"env_vars": {"FOO": "BAR"}, "namespace": "foo", "append_env": False}
ls_kwargs.update(base_kwargs)
ls_operator = DbtLSKubernetesOperator(**ls_kwargs)
ls_operator.hook = MagicMock()
Expand Down
Loading

0 comments on commit 42a5ca4

Please sign in to comment.