Skip to content

Commit

Permalink
Merge branch 'main' into dataset-alias
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Sep 30, 2024
2 parents 29adced + e0a9fd3 commit 7ca4067
Show file tree
Hide file tree
Showing 18 changed files with 818 additions and 8 deletions.
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,9 @@ License
_______

`Apache License 2.0 <https://github.com/astronomer/astronomer-cosmos/blob/main/LICENSE>`_


Privacy Notice
______________

This project follows `Astronomer's Privacy Policy <https://www.astronomer.io/privacy/>`_
42 changes: 42 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,40 @@
DbtTestAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtTestAwsEksOperator", "aws_eks")


try:
from cosmos.operators.gcp_cloud_run_job import (
DbtBuildGcpCloudRunJobOperator,
DbtLSGcpCloudRunJobOperator,
DbtRunGcpCloudRunJobOperator,
DbtRunOperationGcpCloudRunJobOperator,
DbtSeedGcpCloudRunJobOperator,
DbtSnapshotGcpCloudRunJobOperator,
DbtTestGcpCloudRunJobOperator,
)
except (ImportError, AttributeError):
DbtBuildGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtBuildGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtLSGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtLSGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtRunGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtRunGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtRunOperationGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtRunOperationGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtSeedGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtSeedGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtSnapshotGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtSnapshotGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtTestGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtTestGcpCloudRunJobOperator", "gcp-cloud-run-job"
)


__all__ = [
"ProjectConfig",
"ProfileConfig",
Expand Down Expand Up @@ -221,6 +255,14 @@
"DbtSeedAwsEksOperator",
"DbtSnapshotAwsEksOperator",
"DbtTestAwsEksOperator",
# GCP Cloud Run Job Execution Mode
"DbtBuildGcpCloudRunJobOperator",
"DbtLSGcpCloudRunJobOperator",
"DbtRunGcpCloudRunJobOperator",
"DbtRunOperationGcpCloudRunJobOperator",
"DbtSeedGcpCloudRunJobOperator",
"DbtSnapshotGcpCloudRunJobOperator",
"DbtTestGcpCloudRunJobOperator",
]

"""
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ExecutionMode(Enum):
AWS_EKS = "aws_eks"
VIRTUALENV = "virtualenv"
AZURE_CONTAINER_INSTANCE = "azure_container_instance"
GCP_CLOUD_RUN_JOB = "gcp_cloud_run_job"


class InvocationMode(Enum):
Expand Down
7 changes: 3 additions & 4 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ def validate_initial_user_config(
:param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG
:param operator_args: Arguments to pass to the underlying operators.
"""
if profile_config is None and execution_config.execution_mode not in (
ExecutionMode.KUBERNETES,
ExecutionMode.AWS_EKS,
ExecutionMode.DOCKER,
if profile_config is None and execution_config.execution_mode in (
ExecutionMode.LOCAL,
ExecutionMode.VIRTUALENV,
):
raise CosmosValueError(f"The profile_config is mandatory when using {execution_config.execution_mode}")

Expand Down
172 changes: 172 additions & 0 deletions cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from __future__ import annotations

import inspect
from typing import Any, Callable, Sequence

from airflow.utils.context import Context

from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtTestMixin,
)

logger = get_logger(__name__)

DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {}

try:
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator

# The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.13.0
init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__)
if "overrides" not in init_signature.parameters:
raise AttributeError(
"CloudRunExecuteJobOperator does not have `overrides` attribute. Ensure you've installed apache-airflow-providers-google of at least 10.11.0 "
"separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`."
)
except ImportError:
raise ImportError(
"Could not import CloudRunExecuteJobOperator. Ensure you've installed the Google Cloud provider "
"separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`."
)


class DbtGcpCloudRunJobBaseOperator(AbstractDbtBaseOperator, CloudRunExecuteJobOperator): # type: ignore
"""
Executes a dbt core cli command in a Cloud Run Job instance with dbt installed in it.
"""

template_fields: Sequence[str] = tuple(
list(AbstractDbtBaseOperator.template_fields) + list(CloudRunExecuteJobOperator.template_fields)
)

intercept_flag = False

def __init__(
self,
# arguments required by CloudRunExecuteJobOperator
project_id: str,
region: str,
job_name: str,
#
profile_config: ProfileConfig | None = None,
command: list[str] | None = None,
environment_variables: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
self.profile_config = profile_config
self.command = command
self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES
super().__init__(project_id=project_id, region=region, job_name=job_name, **kwargs)

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
result = CloudRunExecuteJobOperator.execute(self, context)
logger.info(result)

def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
# This means that we don't have openlineage support, but we will create a ticket
# to add that in the future
self.dbt_executable_path = "dbt"
dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
self.environment_variables = {**env_vars, **self.environment_variables}
self.command = dbt_cmd
# Override Cloud Run Job default arguments with dbt command
self.overrides = {
"container_overrides": [
{
"args": self.command,
"env": [{"name": key, "value": value} for key, value in self.environment_variables.items()],
}
],
}


class DbtBuildGcpCloudRunJobOperator(DbtBuildMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core build command.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtLSGcpCloudRunJobOperator(DbtLSMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core ls command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSeedGcpCloudRunJobOperator(DbtSeedMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core seed command.
:param full_refresh: dbt optional arg - dbt will treat incremental models as table models
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtSeedMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSnapshotGcpCloudRunJobOperator(DbtSnapshotMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core snapshot command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run command.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core test command.
"""

def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None:
super().__init__(**kwargs)
# as of now, on_warning_callback in docker executor does nothing
self.on_warning_callback = on_warning_callback


class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run-operation command.
:param macro_name: name of macro to execute
:param args: Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the
selected macro.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
3 changes: 2 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import tempfile
import urllib.parse
import warnings
from abc import ABC, abstractmethod
from functools import cached_property
Expand Down Expand Up @@ -468,7 +469,7 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
uris = []
for completed in self.openlineage_events_completes:
for output in getattr(completed, source):
dataset_uri = output.namespace + "/" + output.name
dataset_uri = output.namespace + "/" + urllib.parse.quote(output.name)
uris.append(dataset_uri)
self.log.debug("URIs to be converted to Dataset: %s", uris)

Expand Down
2 changes: 2 additions & 0 deletions dev/dags/dbt/simple/models/multibyte.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
select
'TEST_FOR_MULTIBYTE_CHARCTERS'
Binary file added docs/_static/cosmos_gcp_crj_schematic.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/jaffle_shop_big_query.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/jaffle_shop_gcp_cloud_run_job.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ It does this by exposing a ``cosmos.config.RenderConfig`` class that you can use

The ``RenderConfig`` class takes the following arguments:

- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies <lineage.html>`_.
- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. Depends on `additional dependencies <lineage.html>`_. If a model in the project has a name containing multibyte characters, the dataset name will be URL-encoded.
- ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior <testing-behavior.html>`_ section.
- ``load_method``: how to load your dbt project. See `Parsing Methods <parsing-methods.html>`_ for more information.
- ``select`` and ``exclude``: which models to include or exclude from your DAGs. See `Selecting & Excluding <selecting-excluding.html>`_ for more information.
Expand Down
28 changes: 28 additions & 0 deletions docs/getting_started/execution-modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut
4. **kubernetes**: Run ``dbt`` commands from Kubernetes Pods managed by Cosmos (requires a pre-existing Docker image)
5. **aws_eks**: Run ``dbt`` commands from AWS EKS Pods managed by Cosmos (requires a pre-existing Docker image)
6. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image)
7. **gcp_cloud_run_job**: Run ``dbt`` commands from GCP Cloud Run Job instances managed by Cosmos (requires a pre-existing Docker image)

The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below.

Expand Down Expand Up @@ -47,6 +48,10 @@ The choice of the ``execution mode`` can vary based on each user's needs and con
- Slow
- High
- No
* - GCP Cloud Run Job Instance
- Slow
- High
- No

Local
-----
Expand Down Expand Up @@ -209,6 +214,29 @@ Each task will create a new container on Azure, giving full isolation. This, how
},
)
GCP Cloud Run Job
------------------------
.. versionadded:: 1.7
The ``gcp_cloud_run_job`` execution mode is particularly useful for users who prefer to run their ``dbt`` commands on Google Cloud infrastructure, taking advantage of Cloud Run's scalability, isolation, and managed service capabilities.

For the ``gcp_cloud_run_job`` execution mode to work, a Cloud Run Job instance must first be created using a previously built Docker container. This container should include the latest ``dbt`` pipelines and profiles. You can find more details in the `Cloud Run Job creation guide <https://cloud.google.com/run/docs/create-jobs>`__ .

This execution mode allows users to run ``dbt`` core CLI commands in a Google Cloud Run Job instance. This mode leverages the ``CloudRunExecuteJobOperator`` from the Google Cloud Airflow provider to execute commands within a Cloud Run Job instance, where ``dbt`` is already installed. Similarly to the ``Docker`` and ``Kubernetes`` execution modes, a Docker container should be available, containing the up-to-date ``dbt`` pipelines and profiles.

Each task will create a new Cloud Run Job execution, giving full isolation. The separation of tasks adds extra overhead; however, that can be mitigated by using the ``concurrency`` parameter in ``DbtDag``, which will result in parallelized execution of ``dbt`` models.


.. code-block:: python
gcp_cloud_run_job_cosmos_dag = DbtDag(
# ...
execution_config=ExecutionConfig(execution_mode=ExecutionMode.GCP_CLOUD_RUN_JOB),
operator_args={
"project_id": "my-gcp-project-id",
"region": "europe-west1",
"job_name": "my-crj-{{ ti.task_id.replace('.','-').replace('_','-') }}",
},
)
.. _invocation_modes:
Invocation Modes
Expand Down
Loading

0 comments on commit 7ca4067

Please sign in to comment.