Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to store and fetch dbt ls cache in remote stores #1147

Merged
merged 15 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

Contains dags, task groups, and operators.
"""
__version__ = "1.5.1"
__version__ = "1.6.0a6"
tatiana marked this conversation as resolved.
Show resolved Hide resolved


from cosmos.airflow.dag import DbtDag
Expand Down
103 changes: 103 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
from airflow.utils.task_group import TaskGroup
from airflow.version import version as airflow_version
from sqlalchemy import select
from sqlalchemy.orm import Session

Expand All @@ -25,22 +26,66 @@
DBT_MANIFEST_FILE_NAME,
DBT_TARGET_DIR_NAME,
DEFAULT_PROFILES_FILE_NAME,
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP,
PACKAGE_LOCKFILE_YML,
)
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.settings import (
AIRFLOW_IO_AVAILABLE,
cache_dir,
dbt_profile_cache_dir_name,
enable_cache,
enable_cache_package_lockfile,
enable_cache_profile,
remote_cache_dir_conn_id,
)
from cosmos.settings import remote_cache_dir as settings_remote_cache_dir

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"


def _configure_remote_cache_dir() -> Path | None:
"""Configure the remote cache dir if it is provided."""
if not settings_remote_cache_dir:
return None

_configured_cache_dir = None

cache_dir_str = str(settings_remote_cache_dir)

remote_cache_conn_id = remote_cache_dir_conn_id
if not remote_cache_conn_id:
cache_dir_schema = cache_dir_str.split("://")[0]
remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(cache_dir_schema, None) # type: ignore[assignment]
if remote_cache_conn_id is None:
return _configured_cache_dir

Check warning on line 64 in cosmos/cache.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cache.py#L64

Added line #L64 was not covered by tests

if not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote cache_dir {cache_dir_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath

_configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id)

if not _configured_cache_dir.exists(): # type: ignore[no-untyped-call]
# TODO: Check if we should raise an error instead in case the provided path does not exist.
_configured_cache_dir.mkdir(parents=True, exist_ok=True)

# raise CosmosValueError(
# f"remote_cache_path `{cache_dir_str}` does not exist or is not accessible using "
# f"remote_cache_conn_id `{remote_cache_conn_id}`"
# )

return _configured_cache_dir


def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]:
dag_id = None
task_group_id = None
Expand Down Expand Up @@ -366,6 +411,64 @@
return deleted_cosmos_variables


# TODO: Add integration tests once remote cache is supported in the CI pipeline
@provide_session
def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs.
"""
if session is None:
return 0

logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}")
cosmos_dags_ids_remote_cache_files = defaultdict(list)

configured_remote_cache_dir = _configure_remote_cache_dir()
if not configured_remote_cache_dir:
logger.info(
"No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage."
)
return 0

dirs = [obj for obj in configured_remote_cache_dir.iterdir() if obj.is_dir()]
files = [f for label in dirs for f in label.iterdir() if f.is_file()]

total_cosmos_remote_cache_files = 0
for file in files:
prefix_path = (configured_remote_cache_dir / VAR_KEY_CACHE_PREFIX).as_uri()
if file.as_uri().startswith(prefix_path):
with file.open("r") as fp:
cache_dict = json.load(fp)
cosmos_dags_ids_remote_cache_files[cache_dict["dag_id"]].append(file)
total_cosmos_remote_cache_files += 1

deleted_cosmos_remote_cache_files = 0

for dag_id, files in cosmos_dags_ids_remote_cache_files.items():
last_dag_run = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.execution_date.desc())
.first()
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for file in files:
logger.info(f"Removing the dbt ls cache remote file {file}")
file.unlink()
deleted_cosmos_remote_cache_files += 1
logger.info(
"Deleted %s/%s dbt ls cache files in remote storage.",
deleted_cosmos_remote_cache_files,
total_cosmos_remote_cache_files,
)

return deleted_cosmos_remote_cache_files


def is_profile_cache_enabled() -> bool:
"""Return True if global and profile cache is enable"""
return enable_cache and enable_cache_profile
Expand Down
25 changes: 23 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from cosmos import cache, settings
from cosmos.cache import (
_configure_remote_cache_dir,
_copy_cached_package_lockfile_to_project,
_get_latest_cached_package_lockfile,
is_cache_package_lockfile_enabled,
Expand Down Expand Up @@ -312,7 +313,22 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None:
"last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(),
**self.airflow_metadata,
}
Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True)
remote_cache_dir = _configure_remote_cache_dir()
if remote_cache_dir:
remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json"
with remote_cache_key_path.open("w") as fp:
json.dump(cache_dict, fp)
else:
Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True)

def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path) -> dict[str, str]:
"""Loads the remote cache for dbt ls."""
cache_dict: dict[str, str] = {}
remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json"
if remote_cache_key_path.exists():
with remote_cache_key_path.open("r") as fp:
cache_dict = json.load(fp)
return cache_dict

def get_dbt_ls_cache(self) -> dict[str, str]:
"""
Expand All @@ -327,7 +343,12 @@ def get_dbt_ls_cache(self) -> dict[str, str]:
"""
cache_dict: dict[str, str] = {}
try:
cache_dict = Variable.get(self.dbt_ls_cache_key, deserialize_json=True)
remote_cache_dir = _configure_remote_cache_dir()
cache_dict = (
self._get_dbt_ls_remote_cache(remote_cache_dir)
if remote_cache_dir
else Variable.get(self.dbt_ls_cache_key, deserialize_json=True)
)
except (json.decoder.JSONDecodeError, KeyError):
return cache_dict
else:
Expand Down
12 changes: 11 additions & 1 deletion cosmos/settings.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import os
import tempfile
from pathlib import Path
Expand All @@ -7,7 +9,10 @@
from airflow.version import version as airflow_version
from packaging.version import Version

from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE
from cosmos.constants import (
DEFAULT_COSMOS_CACHE_DIR_NAME,
DEFAULT_OPENLINEAGE_NAMESPACE,
)

# In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change
DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
Expand All @@ -23,6 +28,11 @@
enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True)
dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile")

# Experimentally adding `remote_cache_dir` as a separate entity in the Cosmos 1.6 release to gather feedback.
# This will be merged with the `cache_dir` config parameter in upcoming releases.
remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None)
remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None)

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
Expand Down
11 changes: 10 additions & 1 deletion dev/dags/example_cosmos_cleanup_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from airflow.decorators import dag, task

from cosmos.cache import delete_unused_dbt_ls_cache
from cosmos.cache import delete_unused_dbt_ls_cache, delete_unused_dbt_ls_remote_cache_files


@dag(
Expand All @@ -28,6 +28,15 @@ def clear_db_ls_cache(session=None):

clear_db_ls_cache()

@task()
def clear_db_ls_remote_cache(session=None):
"""
Delete the dbt ls remote cache files that have not been used for the last five days.
"""
delete_unused_dbt_ls_remote_cache_files(max_age_last_usage=timedelta(days=5))

clear_db_ls_remote_cache()


# [END cache_example]

Expand Down
17 changes: 14 additions & 3 deletions docs/configuration/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ It is possible to turn off any cache in Cosmos by exporting the environment vari
Disabling individual types of cache in Cosmos is also possible, as explained below.

Caching the dbt ls output
~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~

(Introduced in Cosmos 1.5)

Expand All @@ -29,13 +29,24 @@ also the tasks queueing time.

Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output
of this command as an `Airflow Variable <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html>`_.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs task queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.

This feature is on by default. To turn it off, export the following environment variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0``.

(Introduced in Cosmos 1.6 - Experimental feature)

Starting with Cosmos 1.6.0, users can also set a remote directory path to store this cache instead of using Airflow
Variables. To do so, you need to configure a remote cache directory. See :ref:`remote_cache_dir` and
:ref:`remote_cache_dir_conn_id` for more information. This is an experimental feature introduced in 1.6.0 to gather
user feedback. The ``remote_cache_dir`` will eventually be merged into the :ref:`cache_dir` setting in upcoming
releases.

**How the cache is refreshed**

Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key.
If using the default Variables cache approach, users can purge or delete the cache via Airflow UI by identifying and
deleting the cache key. In case you're using the alternative approach by setting the ``remote_cache_dir`` introduced
in Cosmos 1.6.0, you can delete the cache by removing the specific files by identifying them using your configured path
in the remote store.

Cosmos will refresh the cache in a few circumstances:

Expand Down
25 changes: 25 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``profile``
- Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``

.. _remote_cache_dir:

`remote_cache_dir`_:
The remote directory to store the dbt cache. Starting with Cosmos 1.6.0, you can store the `dbt ls` output as cache
in a remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0)
using this configuration. The value for the remote cache directory can be any of the schemes that are supported by
the `Airflow Object Store <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`_
feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``,
``abfs://your_azure_container/cache_dir``, etc.)

This is an experimental feature available since Cosmos 1.6 to gather user feedback and will be merged into the
``cache_dir`` setting in upcoming releases.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR``

.. _remote_cache_dir_conn_id:

`remote_cache_dir_conn_id`_:
The connection ID for the remote cache directory. If this is not set, the default Airflow connection ID identified
for the scheme will be used.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR_CONN_ID``

[openlineage]
~~~~~~~~~~~~~

Expand Down
Loading
Loading