Skip to content

Commit

Permalink
Support using DatasetAlias and fix orphaning unreferenced dataset (#…
Browse files Browse the repository at this point in the history
…1217)

**Context**

Cosmos versions between 1.1 and 1.6 supported automatically emitting
Airflow Datasets when using `ExecutionMode.LOCAL`. Although the datasets
generated by these versions of Cosmos could be utilised for
dataset-aware scheduling, the implementation had long-standing issues,
as described in #522.

The main problems were:
* Orphaning unreferenced dataset
* Not displaying dataset inlets/outlets in the Airflow UI

These issues were caused by Cosmos defining task outlets and inlets
during Airflow task execution, a feature only partially supported before
Airflow 2.10: apache/airflow#34206.

**Solution**

Airflow 2.10 has introduced the concept of `DatasetAlias`, as described
in the [official
docs](https://airflow.apache.org/blog/airflow-2.10.0/#dynamic-dataset-scheduling-through-datasetalias),
so operators can dynamically define inlets and outlets during task
execution.

This PR uses Airflow `DatasetAlias`, when possible (Airflow 2.10 or
above), and does two things:
1. Adds a `DatasetAlias` to every LocalOperator/VirtualenvOperator
subclass
2. Dynamically adds `Dataset` as outlets during the `LocalOperator`
subclasses execution, associating them to the desired `Dataset`
instance.
3. Exposes to users a function to retrieve Cosmos' `DatasetAlias` names,
programatically

**Caveats**

* Only works for Airflow 2.10 and above

This feature relies on `DatasetAlias`, only available in Airflow 2.10
and above. If users use previous versions of Airflow, Cosmos behaves
like it did before, and the issues described in this task are not
solved.

* Unable to leverage DatasetAlias in `airflow dags test`

Although the feature described in this PR works well when scheduling
DAGs, triggering them via the UI, or using `airflow dags trigger`, it
does not work when users attempt to use `dags test` or `dag.test()`.
When trying to test the DAG, these commands fail with an
`sqlalchemy.orm. etc.FlushError`. This is a known issue from Airflow
2.10.0 and Airflow 2.10.1 when declaring DatasetAliases, as described in
apache/airflow#42495.

To mitigate this second problem, we've introduced a new Airflow
variable, `AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS`, that allows users to
disable using dataset aliases when running Cosmos. We'd recommend users
who face the `sqlalchemy.orm. etc.FlushError` in their tests to set this
configuration to `False` only for running tests - until the issue is
solved in Airflow. When this configuration is set to `False`, Cosmos
behaves as before the DatasetAlias feature was introduced.

**How this feature was validated**

TODO

**Related tickets**

Closes: #522
Closes: #1119


**Pending**

* Add docs
* Update PR description
  • Loading branch information
tatiana authored Sep 30, 2024
1 parent e0a9fd3 commit 2febf3f
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 20 deletions.
4 changes: 3 additions & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import importlib

from airflow.models import BaseOperator
Expand All @@ -10,7 +12,7 @@
logger = get_logger(__name__)


def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None) -> BaseOperator:
def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) -> BaseOperator:
"""
Get the Airflow Operator class for a Task.
Expand Down
33 changes: 33 additions & 0 deletions cosmos/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

from airflow import DAG
from airflow.utils.task_group import TaskGroup


def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_id: str) -> str:
"""
Given the Airflow DAG, Airflow TaskGroup and the Airflow Task ID, return the name of the
Airflow DatasetAlias associated to that task.
"""
dag_id = None
task_group_id = None

if task_group:
if task_group.dag_id is not None:
dag_id = task_group.dag_id
if task_group.group_id is not None:
task_group_id = task_group.group_id
task_group_id = task_group_id.replace(".", "__")
elif dag:
dag_id = dag.dag_id

identifiers_list = []

if dag_id:
identifiers_list.append(dag_id)
if task_group_id:
identifiers_list.append(task_group_id)

identifiers_list.append(task_id)

return "__".join(identifiers_list)
68 changes: 52 additions & 16 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,26 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence

import airflow
import jinja2
from airflow import DAG
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from attr import define
from packaging.version import Version

from cosmos import cache
from cosmos import cache, settings
from cosmos.cache import (
_copy_cached_package_lockfile_to_project,
_get_latest_cached_package_lockfile,
is_cache_package_lockfile_enabled,
)
from cosmos.constants import InvocationMode
from cosmos.dataset import get_dataset_alias_name
from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file
from cosmos.exceptions import AirflowCompatibilityError
from cosmos.settings import LINEAGE_NAMESPACE

try:
from airflow.datasets import Dataset
Expand All @@ -43,6 +45,7 @@
from dbt.cli.main import dbtRunner, dbtRunnerResult
from openlineage.client.run import RunEvent


from sqlalchemy.orm import Session

from cosmos.config import ProfileConfig
Expand Down Expand Up @@ -73,6 +76,8 @@
DbtTestMixin,
)

AIRFLOW_VERSION = Version(airflow.__version__)

logger = get_logger(__name__)

try:
Expand Down Expand Up @@ -126,6 +131,7 @@ class DbtLocalBaseOperator(AbstractDbtBaseOperator):

def __init__(
self,
task_id: str,
profile_config: ProfileConfig,
invocation_mode: InvocationMode | None = None,
install_deps: bool = False,
Expand All @@ -134,6 +140,7 @@ def __init__(
append_env: bool = True,
**kwargs: Any,
) -> None:
self.task_id = task_id
self.profile_config = profile_config
self.callback = callback
self.compiled_sql = ""
Expand All @@ -146,7 +153,19 @@ def __init__(
self._dbt_runner: dbtRunner | None = None
if self.invocation_mode:
self._set_invocation_methods()
super().__init__(**kwargs)

if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias and AIRFLOW_VERSION >= Version("2.10"):
from airflow.datasets import DatasetAlias

# ignoring the type because older versions of Airflow raise the follow error in mypy
# error: Incompatible types in assignment (expression has type "list[DatasetAlias]", target has type "str")
dag_id = kwargs.get("dag")
task_group_id = kwargs.get("task_group")
kwargs["outlets"] = [
DatasetAlias(name=get_dataset_alias_name(dag_id, task_group_id, task_id))
] # type: ignore

super().__init__(task_id=task_id, **kwargs)

# For local execution mode, we're consistent with the LoadMode.DBT_LS command in forwarding the environment
# variables to the subprocess by default. Although this behavior is designed for ExecuteMode.LOCAL and
Expand Down Expand Up @@ -388,7 +407,7 @@ def run_command(
outlets = self.get_datasets("outputs")
self.log.info("Inlets: %s", inlets)
self.log.info("Outlets: %s", outlets)
self.register_dataset(inlets, outlets)
self.register_dataset(inlets, outlets, context)

if self.partial_parse and self.cache_dir:
partial_parse_file = get_partial_parse_path(tmp_dir_path)
Expand Down Expand Up @@ -423,7 +442,7 @@ def calculate_openlineage_events_completes(

openlineage_processor = DbtLocalArtifactProcessor(
producer=OPENLINEAGE_PRODUCER,
job_namespace=LINEAGE_NAMESPACE,
job_namespace=settings.LINEAGE_NAMESPACE,
project_dir=project_dir,
profile_name=self.profile_config.profile_name,
target=self.profile_config.target_name,
Expand Down Expand Up @@ -469,20 +488,37 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
)
return datasets

def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None:
def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset], context: Context) -> None:
"""
Register a list of datasets as outlets of the current task.
Register a list of datasets as outlets of the current task, when possible.
Until Airflow 2.7, there was not a better interface to associate outlets to a task during execution.
This works in Cosmos with versions before Airflow 2.10 with a few limitations, as described in the ticket:
https://github.com/astronomer/astronomer-cosmos/issues/522
Since Airflow 2.10, Cosmos uses DatasetAlias by default, to generate datasets. This resolved the limitations
described before.
The only limitation is that with Airflow 2.10.0 and 2.10.1, the `airflow dags test` command will not work
with DatasetAlias:
https://github.com/apache/airflow/issues/42495
"""
with create_session() as session:
self.outlets.extend(new_outlets)
self.inlets.extend(new_inlets)
for task in self.dag.tasks:
if task.task_id == self.task_id:
task.outlets.extend(new_outlets)
task.inlets.extend(new_inlets)
DAG.bulk_write_to_db([self.dag], session=session)
session.commit()
if AIRFLOW_VERSION < Version("2.10") or not settings.enable_dataset_alias:
logger.info("Assigning inlets/outlets without DatasetAlias")
with create_session() as session:
self.outlets.extend(new_outlets)
self.inlets.extend(new_inlets)
for task in self.dag.tasks:
if task.task_id == self.task_id:
task.outlets.extend(new_outlets)
task.inlets.extend(new_inlets)
DAG.bulk_write_to_db([self.dag], session=session)
session.commit()
else:
logger.info("Assigning inlets/outlets with DatasetAlias")
dataset_alias_name = get_dataset_alias_name(self.dag, self.task_group, self.task_id)
for outlet in new_outlets:
context["outlet_events"][dataset_alias_name].add(outlet)

def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage:
"""
Expand Down
1 change: 1 addition & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR)
enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True)
enable_dataset_alias = conf.getboolean("cosmos", "enable_dataset_alias", fallback=True)
enable_cache_partial_parse = conf.getboolean("cosmos", "enable_cache_partial_parse", fallback=True)
enable_cache_package_lockfile = conf.getboolean("cosmos", "enable_cache_package_lockfile", fallback=True)
enable_cache_dbt_ls = conf.getboolean("cosmos", "enable_cache_dbt_ls", fallback=True)
Expand Down
47 changes: 46 additions & 1 deletion docs/configuration/scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Data-Aware Scheduling

`Apache Airflow® <https://airflow.apache.org/>`_ 2.4 introduced the concept of `scheduling based on Datasets <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html>`_.

By default, if Airflow 2.4 or higher is used, Cosmos emits `Airflow Datasets <https://airflow.apache.org/docs/apache-airflow/stable/concepts/datasets.html>`_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets using the OpenLineage URI format, as detailed in the `OpenLineage Naming Convention <https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md>`_.
By default, if using a version between Airflow 2.4 or higher is used, Cosmos emits `Airflow Datasets <https://airflow.apache.org/docs/apache-airflow/stable/concepts/datasets.html>`_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets using the OpenLineage URI format, as detailed in the `OpenLineage Naming Convention <https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md>`_.

Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common <https://pypi.org/project/openlineage-integration-common/>`_.

Expand Down Expand Up @@ -62,3 +62,48 @@ Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_
)
In this scenario, ``project_one`` runs once a day and ``project_two`` runs immediately after ``project_one``. You can view these dependencies in Airflow's UI.

Known Limitations
.................

Airflow 2.9 and below
_____________________

If using cosmos with an Airflow 2.9 or below, users will experience the following issues:

- The task inlets and outlets generated by Cosmos will not be seen in the Airflow UI
- The scheduler logs will contain many messages saying "Orphaning unreferenced dataset"

Example of scheduler logs:

.. code-block::

Check failure on line 79 in docs/configuration/scheduling.rst

View workflow job for this annotation

GitHub Actions / pages

Error in "code-block" directive:
scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_customers'
scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_payments'
scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_orders'
scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.customers'
References about the root cause of these issues:

- https://github.com/astronomer/astronomer-cosmos/issues/522
- https://github.com/apache/airflow/issues/34206


Airflow 2.10.0 and 2.10.1
_________________________

If using Cosmos with Airflow 2.10.0 or 2.10.1, the two issues previously described are resolved, since Cosmos uses ``DatasetAlias``
to support the dynamic creation of datasets during task execution. However, users may face ``sqlalchemy.orm.exc.FlushError``
errors if they attempt to run Cosmos-powered DAGs using ``airflow dags test`` with these versions.

We've reported this issue and it will be resolved in future versions of Airflow:

- https://github.com/apache/airflow/issues/42495

For users to overcome this limitation in local tests, until the Airflow community solves this, we introduced the configuration
``AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS``, that is ``True`` by default. If users want to run ``dags test` and not see ``sqlalchemy.orm.exc.FlushError``,
they can set this configuration to ``False``. It can also be set in the ``airflow.cfg`` file:

.. code-block::

Check failure on line 107 in docs/configuration/scheduling.rst

View workflow job for this annotation

GitHub Actions / pages

Error in "code-block" directive:
[cosmos]
enable_dataset_alias = False
81 changes: 80 additions & 1 deletion tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,10 @@ def test_dbt_test_local_operator_invocation_mode_methods(mock_extract_log_issues

@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4")
or version.parse(airflow_version) >= version.parse("2.10")
or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1",
reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1. \n"
"From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.",
)
@pytest.mark.integration
def test_run_operator_dataset_inlets_and_outlets(caplog):
Expand Down Expand Up @@ -453,6 +455,82 @@ def test_run_operator_dataset_inlets_and_outlets(caplog):
assert test_operator.outlets == []


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.10"),
reason="From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.",
)
@pytest.mark.integration
def test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards(caplog):
from airflow.models.dataset import DatasetAliasModel
from sqlalchemy.orm.exc import FlushError

with DAG("test_id_1", start_date=datetime(2022, 1, 1)) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dag=dag,
emit_datasets=False,
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dag=dag,
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
test_operator = DbtTestLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="test",
dag=dag,
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
seed_operator >> run_operator >> test_operator

assert seed_operator.outlets == [] # because emit_datasets=False,
assert run_operator.outlets == [DatasetAliasModel(name="test_id_1__run")]
assert test_operator.outlets == [DatasetAliasModel(name="test_id_1__test")]

with pytest.raises(FlushError):
# This is a known limitation of Airflow 2.10.0 and 2.10.1
# https://github.com/apache/airflow/issues/42495
dag_run, session = run_test_dag(dag)

# Once this issue is solved, we should do some type of check on the actual datasets being emitted,
# so we guarantee Cosmos is backwards compatible via tests using something along the lines or an alternative,
# based on the resolution of the issue logged in Airflow:
# dataset_model = session.scalars(select(DatasetModel).where(DatasetModel.uri == "<something>"))
# assert dataset_model == 1


@patch("cosmos.settings.enable_dataset_alias", 0)
@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.10"),
reason="From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.",
)
@pytest.mark.integration
def test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards_disabled_via_envvar(caplog):
with DAG("test_id_2", start_date=datetime(2022, 1, 1)) as dag:
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dag=dag,
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
assert run_operator.outlets == []


@pytest.mark.skipif(
version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs",
Expand Down Expand Up @@ -495,6 +573,7 @@ def test_run_operator_dataset_emission_is_skipped(caplog):
reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1",
)
@pytest.mark.integration
@patch("cosmos.settings.enable_dataset_alias", 0)
def test_run_operator_dataset_url_encoded_names(caplog):
from airflow.datasets import Dataset

Expand Down
Loading

0 comments on commit 2febf3f

Please sign in to comment.