From ae0f45586912f1ccbfa7677c18720bdc781ea139 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 9 Aug 2024 19:30:01 +0530 Subject: [PATCH 01/14] Add support to store and fetch dbt ls cache in remote stores --- cosmos/dbt/graph.py | 24 ++++++++++++++++++++++-- cosmos/settings.py | 40 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 5240a8ed6..b6caef585 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -281,7 +281,23 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: "last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(), **self.airflow_metadata, } - Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) + if settings.remote_cache_path: + remote_cache_key_path = settings.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json" + with remote_cache_key_path.open("w") as fp: + json.dump(cache_dict, fp) + else: + Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) + + def _get_dbt_ls_remote_cache(self) -> dict[str, str]: + """Loads the remote cache for dbt ls.""" + cache_dict: dict[str, str] = {} + if settings.remote_cache_path is None: + return cache_dict + remote_cache_key_path = settings.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json" + if remote_cache_key_path.exists(): + with remote_cache_key_path.open("r") as fp: + cache_dict = json.load(fp) + return cache_dict def get_dbt_ls_cache(self) -> dict[str, str]: """ @@ -296,7 +312,11 @@ def get_dbt_ls_cache(self) -> dict[str, str]: """ cache_dict: dict[str, str] = {} try: - cache_dict = Variable.get(self.dbt_ls_cache_key, deserialize_json=True) + cache_dict = ( + self._get_dbt_ls_remote_cache() + if settings.remote_cache_path + else Variable.get(self.dbt_ls_cache_key, deserialize_json=True) + ) except (json.decoder.JSONDecodeError, KeyError): return cache_dict else: diff --git a/cosmos/settings.py b/cosmos/settings.py index 71387de6e..1259a15f2 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import tempfile from pathlib import Path @@ -7,7 +9,12 @@ from airflow.version import version as airflow_version from packaging.version import Version -from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE +from cosmos.constants import ( + DEFAULT_COSMOS_CACHE_DIR_NAME, + DEFAULT_OPENLINEAGE_NAMESPACE, + FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, +) +from cosmos.exceptions import CosmosValueError # In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) @@ -29,3 +36,34 @@ LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") + + +def _configure_remote_cache_path() -> Path | None: + remote_cache_path_str = str(conf.get("cosmos", "remote_cache_path", fallback="")) + remote_cache_conn_id = str(conf.get("cosmos", "remote_cache_conn_id", fallback="")) + cache_path = None + + if remote_cache_path_str and not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"You're trying to specify dbt_ls_cache_remote_path {remote_cache_path_str}, but the required Object " + f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + f"Airflow 2.8 or later." + ) + elif remote_cache_path_str: + from airflow.io.path import ObjectStoragePath + + if not remote_cache_conn_id: + remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get( + remote_cache_path_str.split("://")[0], None + ) + + cache_path = ObjectStoragePath(remote_cache_path_str, conn_id=remote_cache_conn_id) + if not cache_path.exists(): # type: ignore[no-untyped-call] + raise CosmosValueError( + f"`remote_cache_path` {remote_cache_path_str} does not exist or is not accessible using " + f"`remote_cache_conn_id` {remote_cache_conn_id}" + ) + return cache_path + + +remote_cache_path = _configure_remote_cache_path() From ffa28d50035e2cd531f6212b4ff98d6673c47ac3 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 9 Aug 2024 20:47:41 +0530 Subject: [PATCH 02/14] Release 1.6.0a5 --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 1a00036f1..c83bc7d74 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.5.1" +__version__ = "1.6.0a5" from cosmos.airflow.dag import DbtDag From 0f5e28bb2a12a108e6137b9f2cebdcbb5d28531b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Sat, 10 Aug 2024 23:24:36 +0530 Subject: [PATCH 03/14] Configure remote_cache_path in ProjectConfig and use that --- cosmos/config.py | 32 +++++++++++++++++++++++++++++++- cosmos/dbt/graph.py | 8 ++++---- cosmos/settings.py | 36 +++--------------------------------- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 62557de63..3466822cc 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -27,7 +27,8 @@ from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.profiles import BaseProfileMapping -from cosmos.settings import AIRFLOW_IO_AVAILABLE +from cosmos.settings import AIRFLOW_IO_AVAILABLE, remote_cache_conn_id +from cosmos.settings import remote_cache_path as settings_remote_cache_path logger = get_logger(__name__) @@ -146,6 +147,7 @@ class ProjectConfig: seeds_path: Path | None = None snapshots_path: Path | None = None project_name: str + remote_cache_path: Path | None = None def __init__( self, @@ -203,6 +205,34 @@ def __init__( self.env_vars = env_vars self.dbt_vars = dbt_vars self.partial_parse = partial_parse + self.remote_cache_path = self._configure_remote_cache_path() + + @staticmethod + def _configure_remote_cache_path() -> Path | None: + """Configure the remote cache path if it is provided.""" + cache_path = None + + if settings_remote_cache_path and not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"You're trying to specify dbt_ls_cache_remote_path {settings_remote_cache_path}, but the required " + f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + "Airflow 2.8 or later." + ) + elif settings_remote_cache_path: + from airflow.io.path import ObjectStoragePath + + remote_conn_id = remote_cache_conn_id or FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get( + settings_remote_cache_path.split("://")[0], None + ) + + cache_path = ObjectStoragePath(settings_remote_cache_path, conn_id=remote_conn_id) + if not cache_path.exists(): # type: ignore[no-untyped-call] + raise CosmosValueError( + f"remote_cache_path `{settings_remote_cache_path}` does not exist or is not accessible using " + f"remote_cache_conn_id `{remote_conn_id}`" + ) + + return cache_path def validate_project(self) -> None: """ diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index b6caef585..013a88351 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -281,8 +281,8 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: "last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(), **self.airflow_metadata, } - if settings.remote_cache_path: - remote_cache_key_path = settings.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json" + if self.project.remote_cache_path: + remote_cache_key_path = self.project.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json" with remote_cache_key_path.open("w") as fp: json.dump(cache_dict, fp) else: @@ -291,9 +291,9 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: def _get_dbt_ls_remote_cache(self) -> dict[str, str]: """Loads the remote cache for dbt ls.""" cache_dict: dict[str, str] = {} - if settings.remote_cache_path is None: + if self.project.remote_cache_path is None: return cache_dict - remote_cache_key_path = settings.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json" + remote_cache_key_path = self.project.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json" if remote_cache_key_path.exists(): with remote_cache_key_path.open("r") as fp: cache_dict = json.load(fp) diff --git a/cosmos/settings.py b/cosmos/settings.py index 1259a15f2..3829f5373 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -12,9 +12,7 @@ from cosmos.constants import ( DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE, - FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, ) -from cosmos.exceptions import CosmosValueError # In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) @@ -30,40 +28,12 @@ enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True) dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") +remote_cache_path = conf.get("cosmos", "remote_cache_path", fallback=None) +remote_cache_conn_id = conf.get("cosmos", "remote_cache_conn_id", fallback=None) + try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") except airflow.exceptions.AirflowConfigException: LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") - - -def _configure_remote_cache_path() -> Path | None: - remote_cache_path_str = str(conf.get("cosmos", "remote_cache_path", fallback="")) - remote_cache_conn_id = str(conf.get("cosmos", "remote_cache_conn_id", fallback="")) - cache_path = None - - if remote_cache_path_str and not AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"You're trying to specify dbt_ls_cache_remote_path {remote_cache_path_str}, but the required Object " - f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - f"Airflow 2.8 or later." - ) - elif remote_cache_path_str: - from airflow.io.path import ObjectStoragePath - - if not remote_cache_conn_id: - remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get( - remote_cache_path_str.split("://")[0], None - ) - - cache_path = ObjectStoragePath(remote_cache_path_str, conn_id=remote_cache_conn_id) - if not cache_path.exists(): # type: ignore[no-untyped-call] - raise CosmosValueError( - f"`remote_cache_path` {remote_cache_path_str} does not exist or is not accessible using " - f"`remote_cache_conn_id` {remote_cache_conn_id}" - ) - return cache_path - - -remote_cache_path = _configure_remote_cache_path() From dfb23463767fdcdd18797159e383a169bd68863f Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Sun, 11 Aug 2024 00:00:32 +0530 Subject: [PATCH 04/14] Release 1.6.0a6 --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index c83bc7d74..86d29576a 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.6.0a5" +__version__ = "1.6.0a6" from cosmos.airflow.dag import DbtDag From 4f001482520ab903787207b3be9f0d0ee595f6db Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 12 Aug 2024 14:53:08 +0530 Subject: [PATCH 05/14] Update cosmos/dbt/graph.py --- cosmos/dbt/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 013a88351..6052eeb52 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -314,7 +314,7 @@ def get_dbt_ls_cache(self) -> dict[str, str]: try: cache_dict = ( self._get_dbt_ls_remote_cache() - if settings.remote_cache_path + if self.project.remote_cache_path else Variable.get(self.dbt_ls_cache_key, deserialize_json=True) ) except (json.decoder.JSONDecodeError, KeyError): From 48e34bcf67576ea860b2be05fc247c5bd00a11d7 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 13 Aug 2024 02:24:41 +0530 Subject: [PATCH 06/14] Add docs and tests --- docs/configuration/caching.rst | 15 ++++++-- docs/configuration/cosmos-conf.rst | 21 ++++++++++ tests/dbt/test_graph.py | 61 ++++++++++++++++++++++++++++++ tests/test_config.py | 56 ++++++++++++++++++++++++++- 4 files changed, 148 insertions(+), 5 deletions(-) diff --git a/docs/configuration/caching.rst b/docs/configuration/caching.rst index 7dba9933f..656ca5481 100644 --- a/docs/configuration/caching.rst +++ b/docs/configuration/caching.rst @@ -19,7 +19,7 @@ It is possible to turn off any cache in Cosmos by exporting the environment vari Disabling individual types of cache in Cosmos is also possible, as explained below. Caching the dbt ls output -~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~ (Introduced in Cosmos 1.5) @@ -29,13 +29,22 @@ also the tasks queueing time. Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output of this command as an `Airflow Variable `_. -Based on an initial `analysis `_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% `_ in the DAG run time. +Based on an initial `analysis `_, enabling this setting reduced some DAGs task queueing from 30s to 0s. Additionally, some users `reported improvements of 84% `_ in the DAG run time. This feature is on by default. To turn it off, export the following environment variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0``. +(Introduced in Cosmos 1.6) + +Starting with Cosmos 1.6.0, users can also set a remote path to store this cache instead of using Airflow Variables. +To do so, you need to configure a remote cache path. See :ref:`remote_cache_path` and :ref:`remote_cache_conn_id` for +more information. + **How the cache is refreshed** -Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key. +If using the default Variables cache approach, users can purge or delete the cache via Airflow UI by identifying and +deleting the cache key. In case you're using the alternative approach by setting the ``remote_cache_path`` introduced +in Cosmos 1.6.0, you can delete the cache by removing the specific files by identifying them using your configured path +in the remote store. Cosmos will refresh the cache in a few circumstances: diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 037a43d3b..e4abb8b7c 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -94,6 +94,27 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``profile`` - Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME`` +.. _remote_cache_path: + +`remote_cache_path`_: + The remote path to store the DBT cache. Starting with Cosmos 1.6.0, you can store the dbt ls output as cache in a + remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0) + using this configuration. The remote paths are all schemes supported by the + `Airflow Object Store `_ + feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, + ``abfs://your_azure_container/cache_r``, etc.) + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_PATH`` + +.. _remote_cache_conn_id: + +`remote_cache_conn_id`_: + The connection ID for the remote cache path. If this is not set, the default Airflow connection ID identified for + the scheme will be used. + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_CONN_ID`` [openlineage] ~~~~~~~~~~~~~ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 0e7b0b05a..fb0f11b05 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1,9 +1,12 @@ +import base64 import importlib +import json import logging import os import shutil import sys import tempfile +import zlib from datetime import datetime from pathlib import Path from subprocess import PIPE, Popen @@ -24,6 +27,7 @@ run_command, ) from cosmos.profiles import PostgresUserPasswordProfileMapping +from cosmos.settings import AIRFLOW_IO_AVAILABLE DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" DBT_PROJECT_NAME = "jaffle_shop" @@ -1532,3 +1536,60 @@ def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, sh graph = DbtGraph(cache_identifier=cache_id, project=ProjectConfig(dbt_project_path="/tmp")) graph.should_use_dbt_ls_cache.cache_clear() assert graph.should_use_dbt_ls_cache() == should_use + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.config.ProjectConfig") +def test_save_dbt_ls_cache_remote_cache_path(mock_project_config, mock_object_storage_path): + mock_remote_cache_path = mock_object_storage_path.return_value + mock_remote_cache_path.exists.return_value = True + + mock_project_config.remote_cache_path = mock_remote_cache_path + mock_project_config.dbt_vars = {"var1": "value1"} + mock_project_config.env_vars = {"var1": "value1"} + mock_project_config._calculate_dbt_ls_cache_current_version.return_value = "mock_version" + + dbt_ls_output = "sample dbt ls output" + dbt_graph = DbtGraph(project=mock_project_config) + + dbt_graph.save_dbt_ls_cache(dbt_ls_output) + + mock_remote_cache_key_path = mock_remote_cache_path / dbt_graph.dbt_ls_cache_key / "dbt_ls_cache.json" + mock_remote_cache_key_path.open.assert_called_once_with("w") + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.config.ProjectConfig") +def test_get_dbt_ls_cache_remote_cache_path(mock_project_config, mock_object_storage_path): + mock_remote_cache_path = mock_object_storage_path.return_value + mock_remote_cache_path.exists.return_value = True + + mock_project_config.remote_cache_path = mock_remote_cache_path + + dbt_ls_output = "sample dbt ls output" + compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) + encoded_data = base64.b64encode(compressed_data).decode("utf-8") + + cache_dict = { + "version": "cache-version", + "dbt_ls_compressed": encoded_data, + "last_modified": "2024-08-13T12:34:56Z", + } + + mock_remote_cache_key_path = mock_remote_cache_path / "some_cache_key" / "dbt_ls_cache.json" + mock_remote_cache_key_path.exists.return_value = True + mock_remote_cache_key_path.open.return_value.__enter__.return_value.read.return_value = json.dumps(cache_dict) + + dbt_graph = DbtGraph(project=mock_project_config) + + result = dbt_graph.get_dbt_ls_cache() + + expected_result = { + "version": "cache-version", + "dbt_ls": dbt_ls_output, + "last_modified": "2024-08-13T12:34:56Z", + } + + assert result == expected_result diff --git a/tests/test_config.py b/tests/test_config.py index d557dd4fc..398bc1711 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,11 +1,11 @@ from contextlib import nullcontext as does_not_raise from pathlib import Path -from unittest.mock import Mock, PropertyMock, call, patch +from unittest.mock import MagicMock, Mock, PropertyMock, call, patch import pytest from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode, InvocationMode +from cosmos.constants import ExecutionMode, InvocationMode, _default_s3_conn from cosmos.exceptions import CosmosValueError from cosmos.profiles.athena.access_key import AthenaAccessKeyProfileMapping from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping @@ -331,3 +331,55 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife _ = ProjectConfig( dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id ) + + +@patch("cosmos.config.settings_remote_cache_path", new=None) +def test_remote_cache_path_initialization_no_remote_cache(): + config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project") + assert config.remote_cache_path is None + + +@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache") +@patch("cosmos.config.AIRFLOW_IO_AVAILABLE", new=False) +def test_remote_cache_path_initialization_object_storage_unavailable_on_earlier_airflow_versions(): + with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): + ProjectConfig(dbt_project_path="/some/path", project_name="test_project") + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_path_available_default_connection(mock_object_storage_path): + mock_cache_path = MagicMock() + mock_cache_path.exists.return_value = True + mock_object_storage_path.return_value = mock_cache_path + + config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project") + mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id=_default_s3_conn) + assert config.remote_cache_path == mock_cache_path + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_path_not_exist(mock_object_storage_path): + mock_cache_path = MagicMock() + mock_cache_path.exists.return_value = False + mock_object_storage_path.return_value = mock_cache_path + + with pytest.raises(CosmosValueError, match="remote_cache_path `s3://some-bucket/cache` does not exist"): + ProjectConfig(dbt_project_path="/some/path", project_name="test_project") + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache") +@patch("cosmos.config.remote_cache_conn_id", new="my_conn_id") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_with_conn_id(mock_object_storage_path): + mock_cache_path = MagicMock() + mock_cache_path.exists.return_value = True + mock_object_storage_path.return_value = mock_cache_path + + config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project") + mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id="my_conn_id") + assert config.remote_cache_path == mock_cache_path From 4ea9daeaed50301ba5a1beeb0389c03ef09278fe Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 13 Aug 2024 02:47:05 +0530 Subject: [PATCH 07/14] Add integration tests --- .github/workflows/test.yml | 78 +++++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f19831676..7d1b08c8c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main,remote-db-ls-cache] pull_request_target: # Also run on pull requests originated from forks branches: [main] @@ -170,6 +170,81 @@ jobs: name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} path: .coverage + Run-Integration-Tests-Remote-Cache-Path: + needs: Authorize + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.8", "3.9", "3.10", "3.11" ] + airflow-version: [ "2.8", "2.9" ] + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: + - uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + + - uses: actions/cache@v3 + with: + path: | + ~/.cache/pip + .local/share/hatch/ + key: integration-remote-cache-path-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install packages and dependencies + run: | + python -m pip install uv + uv pip install --system hatch + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + run: | + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration + env: + AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 + AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ + AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres + AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} + AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} + DATABRICKS_HOST: mock + DATABRICKS_WAREHOUSE_ID: mock + DATABRICKS_TOKEN: mock + DATABRICKS_CLUSTER_ID: mock + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 + PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }} + POSTGRES_HOST: localhost + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + POSTGRES_SCHEMA: public + POSTGRES_PORT: 5432 + AIRFLOW__COSMOS__REMOTE_CACHE_PATH: s3://cosmos-remote-cache/cache_dir/ + AIRFLOW__COSMOS__REMOTE_CACHE_CONN_ID: aws_s3_conn + + - name: Upload coverage to Github + uses: actions/upload-artifact@v2 + with: + name: coverage-integration-test-remote-cache-path-${{ matrix.python-version }}-${{ matrix.airflow-version }} + path: .coverage + Run-Integration-Tests-Expensive: needs: Authorize runs-on: ubuntu-latest @@ -463,6 +538,7 @@ jobs: needs: - Run-Unit-Tests - Run-Integration-Tests + - Run-Integration-Tests-Remote-Cache-Path - Run-Integration-Tests-Expensive runs-on: ubuntu-latest steps: From 57e2c7bbfddf4ef0a9a937fbfe245b947dad82d5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 13 Aug 2024 03:23:51 +0530 Subject: [PATCH 08/14] Enable db ls cache for remote path tests --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7d1b08c8c..21116de1f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -217,7 +217,7 @@ jobs: hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration env: - AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 + AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 1 AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} From 398ecd7abc91e98cb6889bb05c4be70af4867039 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 13 Aug 2024 03:24:39 +0530 Subject: [PATCH 09/14] Update .github/workflows/test.yml --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 21116de1f..c0ea11ecd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -176,7 +176,7 @@ jobs: strategy: matrix: python-version: [ "3.8", "3.9", "3.10", "3.11" ] - airflow-version: [ "2.8", "2.9" ] + airflow-version: [ "2.8" ] services: postgres: image: postgres From c26a95c01d6139a58b60e8071498ac6da5d2c538 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 15 Aug 2024 22:45:18 +0530 Subject: [PATCH 10/14] Address review feedback --- .github/workflows/test.yml | 78 +------------------------------------- cosmos/cache.py | 45 ++++++++++++++++++++++ cosmos/config.py | 32 +--------------- cosmos/dbt/graph.py | 17 +++++---- cosmos/settings.py | 6 ++- 5 files changed, 60 insertions(+), 118 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c0ea11ecd..f19831676 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main,remote-db-ls-cache] + branches: [main] pull_request_target: # Also run on pull requests originated from forks branches: [main] @@ -170,81 +170,6 @@ jobs: name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} path: .coverage - Run-Integration-Tests-Remote-Cache-Path: - needs: Authorize - runs-on: ubuntu-latest - strategy: - matrix: - python-version: [ "3.8", "3.9", "3.10", "3.11" ] - airflow-version: [ "2.8" ] - services: - postgres: - image: postgres - env: - POSTGRES_PASSWORD: postgres - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - ports: - - 5432:5432 - steps: - - uses: actions/checkout@v3 - with: - ref: ${{ github.event.pull_request.head.sha || github.ref }} - - - uses: actions/cache@v3 - with: - path: | - ~/.cache/pip - .local/share/hatch/ - key: integration-remote-cache-path-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - - - name: Install packages and dependencies - run: | - python -m pip install uv - uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze - - - name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} - run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration - env: - AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 1 - AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ - AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres - AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} - AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} - AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} - DATABRICKS_HOST: mock - DATABRICKS_WAREHOUSE_ID: mock - DATABRICKS_TOKEN: mock - DATABRICKS_CLUSTER_ID: mock - AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 - PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH - COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }} - POSTGRES_HOST: localhost - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - POSTGRES_DB: postgres - POSTGRES_SCHEMA: public - POSTGRES_PORT: 5432 - AIRFLOW__COSMOS__REMOTE_CACHE_PATH: s3://cosmos-remote-cache/cache_dir/ - AIRFLOW__COSMOS__REMOTE_CACHE_CONN_ID: aws_s3_conn - - - name: Upload coverage to Github - uses: actions/upload-artifact@v2 - with: - name: coverage-integration-test-remote-cache-path-${{ matrix.python-version }}-${{ matrix.airflow-version }} - path: .coverage - Run-Integration-Tests-Expensive: needs: Authorize runs-on: ubuntu-latest @@ -538,7 +463,6 @@ jobs: needs: - Run-Unit-Tests - Run-Integration-Tests - - Run-Integration-Tests-Remote-Cache-Path - Run-Integration-Tests-Expensive runs-on: ubuntu-latest steps: diff --git a/cosmos/cache.py b/cosmos/cache.py index 102186423..2c4ccf562 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -17,6 +17,7 @@ from airflow.models.dag import DAG from airflow.utils.session import provide_session from airflow.utils.task_group import TaskGroup +from airflow.version import version as airflow_version from sqlalchemy import select from sqlalchemy.orm import Session @@ -25,22 +26,66 @@ DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME, + FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, PACKAGE_LOCKFILE_YML, ) from cosmos.dbt.project import get_partial_parse_path +from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.settings import ( + AIRFLOW_IO_AVAILABLE, cache_dir, dbt_profile_cache_dir_name, enable_cache, enable_cache_package_lockfile, enable_cache_profile, + remote_cache_dir_conn_id, ) +from cosmos.settings import remote_cache_dir as settings_remote_cache_dir logger = get_logger(__name__) VAR_KEY_CACHE_PREFIX = "cosmos_cache__" +def _configure_remote_cache_dir() -> Path | None: + """Configure the remote cache dir if it is provided.""" + if not settings_remote_cache_dir: + return None + + _configured_cache_dir = None + + cache_dir_str = str(settings_remote_cache_dir) + + remote_cache_conn_id = remote_cache_dir_conn_id + if not remote_cache_conn_id: + cache_dir_schema = cache_dir_str.split("://")[0] + remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(cache_dir_schema, None) # type: ignore[assignment] + if remote_cache_conn_id is None: + return _configured_cache_dir + + if not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"You're trying to specify remote cache_dir {cache_dir_str}, but the required " + f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + "Airflow 2.8 or later." + ) + + from airflow.io.path import ObjectStoragePath + + _configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id) + + if not _configured_cache_dir.exists(): # type: ignore[no-untyped-call] + # TODO: Check if we should raise an error instead in case the provided path does not exist. + _configured_cache_dir.mkdir(parents=True, exist_ok=True) + + # raise CosmosValueError( + # f"remote_cache_path `{cache_dir_str}` does not exist or is not accessible using " + # f"remote_cache_conn_id `{remote_cache_conn_id}`" + # ) + + return _configured_cache_dir + + def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]: dag_id = None task_group_id = None diff --git a/cosmos/config.py b/cosmos/config.py index 3466822cc..62557de63 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -27,8 +27,7 @@ from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.profiles import BaseProfileMapping -from cosmos.settings import AIRFLOW_IO_AVAILABLE, remote_cache_conn_id -from cosmos.settings import remote_cache_path as settings_remote_cache_path +from cosmos.settings import AIRFLOW_IO_AVAILABLE logger = get_logger(__name__) @@ -147,7 +146,6 @@ class ProjectConfig: seeds_path: Path | None = None snapshots_path: Path | None = None project_name: str - remote_cache_path: Path | None = None def __init__( self, @@ -205,34 +203,6 @@ def __init__( self.env_vars = env_vars self.dbt_vars = dbt_vars self.partial_parse = partial_parse - self.remote_cache_path = self._configure_remote_cache_path() - - @staticmethod - def _configure_remote_cache_path() -> Path | None: - """Configure the remote cache path if it is provided.""" - cache_path = None - - if settings_remote_cache_path and not AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"You're trying to specify dbt_ls_cache_remote_path {settings_remote_cache_path}, but the required " - f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - "Airflow 2.8 or later." - ) - elif settings_remote_cache_path: - from airflow.io.path import ObjectStoragePath - - remote_conn_id = remote_cache_conn_id or FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get( - settings_remote_cache_path.split("://")[0], None - ) - - cache_path = ObjectStoragePath(settings_remote_cache_path, conn_id=remote_conn_id) - if not cache_path.exists(): # type: ignore[no-untyped-call] - raise CosmosValueError( - f"remote_cache_path `{settings_remote_cache_path}` does not exist or is not accessible using " - f"remote_cache_conn_id `{remote_conn_id}`" - ) - - return cache_path def validate_project(self) -> None: """ diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 6052eeb52..857437d86 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -19,6 +19,7 @@ from cosmos import cache, settings from cosmos.cache import ( + _configure_remote_cache_dir, _copy_cached_package_lockfile_to_project, _get_latest_cached_package_lockfile, is_cache_package_lockfile_enabled, @@ -281,19 +282,18 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: "last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(), **self.airflow_metadata, } - if self.project.remote_cache_path: - remote_cache_key_path = self.project.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json" + remote_cache_dir = _configure_remote_cache_dir() + if remote_cache_dir: + remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" with remote_cache_key_path.open("w") as fp: json.dump(cache_dict, fp) else: Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) - def _get_dbt_ls_remote_cache(self) -> dict[str, str]: + def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path) -> dict[str, str]: """Loads the remote cache for dbt ls.""" cache_dict: dict[str, str] = {} - if self.project.remote_cache_path is None: - return cache_dict - remote_cache_key_path = self.project.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json" + remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" if remote_cache_key_path.exists(): with remote_cache_key_path.open("r") as fp: cache_dict = json.load(fp) @@ -312,9 +312,10 @@ def get_dbt_ls_cache(self) -> dict[str, str]: """ cache_dict: dict[str, str] = {} try: + remote_cache_dir = _configure_remote_cache_dir() cache_dict = ( - self._get_dbt_ls_remote_cache() - if self.project.remote_cache_path + self._get_dbt_ls_remote_cache(remote_cache_dir) + if remote_cache_dir else Variable.get(self.dbt_ls_cache_key, deserialize_json=True) ) except (json.decoder.JSONDecodeError, KeyError): diff --git a/cosmos/settings.py b/cosmos/settings.py index 3829f5373..3a01ebe0e 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -28,8 +28,10 @@ enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True) dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") -remote_cache_path = conf.get("cosmos", "remote_cache_path", fallback=None) -remote_cache_conn_id = conf.get("cosmos", "remote_cache_conn_id", fallback=None) +# Experimentally adding `remote_cache_dir` as a separate entity in the Cosmos 1.6 release to gather feedback. +# This will be merged with the `cache_dir` config parameter in upcoming releases. +remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None) +remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None) try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") From 6ca7d26ae45b0bce2ea2af27db8c192c15038145 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 15 Aug 2024 23:21:48 +0530 Subject: [PATCH 11/14] Refactor docs to accomodate changes as per review comments --- docs/configuration/caching.rst | 7 ++++--- docs/configuration/cosmos-conf.rst | 5 ++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/configuration/caching.rst b/docs/configuration/caching.rst index 656ca5481..625f25452 100644 --- a/docs/configuration/caching.rst +++ b/docs/configuration/caching.rst @@ -33,11 +33,12 @@ Based on an initial `analysis `_ feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, - ``abfs://your_azure_container/cache_r``, etc.) + ``abfs://your_azure_container/cache_dir``, etc.) + + This is an experimental feature available since Cosmos 1.6 to gather user feedback and will be merged into the + ``cache_dir`` setting in upcoming releases. - Default: ``None`` - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR`` From 78f47272082a23d1e38e99005c7b78f8724b17b7 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 15 Aug 2024 23:29:53 +0530 Subject: [PATCH 12/14] Correct usage for remote_cache_dir in the docs --- docs/configuration/caching.rst | 11 ++++++----- docs/configuration/cosmos-conf.rst | 12 ++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/docs/configuration/caching.rst b/docs/configuration/caching.rst index 625f25452..cc518f748 100644 --- a/docs/configuration/caching.rst +++ b/docs/configuration/caching.rst @@ -35,15 +35,16 @@ This feature is on by default. To turn it off, export the following environment (Introduced in Cosmos 1.6 - Experimental feature) -Starting with Cosmos 1.6.0, users can also set a remote path to store this cache instead of using Airflow Variables. -To do so, you need to configure a remote cache path. See :ref:`remote_cache_dir` and :ref:`remote_cache_dir_conn_id` for -more information. This is an experimental feature introduced in 1.6.0 to gather user feedback. The ``remote_cache_dir`` -will eventually be merged into the :ref:`cache_dir` setting in upcoming releases. +Starting with Cosmos 1.6.0, users can also set a remote directory path to store this cache instead of using Airflow +Variables. To do so, you need to configure a remote cache directory. See :ref:`remote_cache_dir` and +:ref:`remote_cache_dir_conn_id` for more information. This is an experimental feature introduced in 1.6.0 to gather +user feedback. The ``remote_cache_dir`` will eventually be merged into the :ref:`cache_dir` setting in upcoming +releases. **How the cache is refreshed** If using the default Variables cache approach, users can purge or delete the cache via Airflow UI by identifying and -deleting the cache key. In case you're using the alternative approach by setting the ``remote_cache_path`` introduced +deleting the cache key. In case you're using the alternative approach by setting the ``remote_cache_dir`` introduced in Cosmos 1.6.0, you can delete the cache by removing the specific files by identifying them using your configured path in the remote store. diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 39dd6e43a..6b05ec814 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -97,10 +97,10 @@ This page lists all available Airflow configurations that affect ``astronomer-co .. _remote_cache_dir: `remote_cache_dir`_: - The remote path to store the dbt cache. Starting with Cosmos 1.6.0, you can store the `dbt ls` output as cache in a - remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0) - using this configuration. The remote cache dir supports all schemes that are supported by the - `Airflow Object Store `_ + The remote directory to store the dbt cache. Starting with Cosmos 1.6.0, you can store the `dbt ls` output as cache + in a remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0) + using this configuration. The value for the remote cache directory can be any of the schemes that are supported by + the `Airflow Object Store `_ feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, ``abfs://your_azure_container/cache_dir``, etc.) @@ -113,8 +113,8 @@ This page lists all available Airflow configurations that affect ``astronomer-co .. _remote_cache_dir_conn_id: `remote_cache_dir_conn_id`_: - The connection ID for the remote cache path. If this is not set, the default Airflow connection ID identified for - the scheme will be used. + The connection ID for the remote cache directory. If this is not set, the default Airflow connection ID identified + for the scheme will be used. - Default: ``None`` - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR_CONN_ID`` From a6cd938e7c72f1adf7431d203658e4ec8ebd197a Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 16 Aug 2024 00:29:28 +0530 Subject: [PATCH 13/14] Refactor tests as per applied review suggestion changes --- tests/dbt/test_graph.py | 31 +++++++++++--------- tests/test_cache.py | 65 +++++++++++++++++++++++++++++++++++++++-- tests/test_config.py | 56 ++--------------------------------- 3 files changed, 82 insertions(+), 70 deletions(-) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 66420bc3e..879663009 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1637,32 +1637,37 @@ def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, sh @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("airflow.io.path.ObjectStoragePath") @patch("cosmos.config.ProjectConfig") -def test_save_dbt_ls_cache_remote_cache_path(mock_project_config, mock_object_storage_path): - mock_remote_cache_path = mock_object_storage_path.return_value - mock_remote_cache_path.exists.return_value = True +@patch("cosmos.dbt.graph._configure_remote_cache_dir") +def test_save_dbt_ls_cache_remote_cache_dir( + mock_configure_remote_cache_dir, mock_project_config, mock_object_storage_path +): + mock_remote_cache_dir_path = mock_object_storage_path.return_value + mock_remote_cache_dir_path.exists.return_value = True + + mock_configure_remote_cache_dir.return_value = mock_remote_cache_dir_path - mock_project_config.remote_cache_path = mock_remote_cache_path + dbt_ls_output = "sample dbt ls output" mock_project_config.dbt_vars = {"var1": "value1"} mock_project_config.env_vars = {"var1": "value1"} mock_project_config._calculate_dbt_ls_cache_current_version.return_value = "mock_version" - - dbt_ls_output = "sample dbt ls output" dbt_graph = DbtGraph(project=mock_project_config) dbt_graph.save_dbt_ls_cache(dbt_ls_output) - mock_remote_cache_key_path = mock_remote_cache_path / dbt_graph.dbt_ls_cache_key / "dbt_ls_cache.json" + mock_remote_cache_key_path = mock_remote_cache_dir_path / dbt_graph.dbt_ls_cache_key / "dbt_ls_cache.json" mock_remote_cache_key_path.open.assert_called_once_with("w") @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("airflow.io.path.ObjectStoragePath") @patch("cosmos.config.ProjectConfig") -def test_get_dbt_ls_cache_remote_cache_path(mock_project_config, mock_object_storage_path): - mock_remote_cache_path = mock_object_storage_path.return_value - mock_remote_cache_path.exists.return_value = True - - mock_project_config.remote_cache_path = mock_remote_cache_path +@patch("cosmos.dbt.graph._configure_remote_cache_dir") +def test_get_dbt_ls_cache_remote_cache_dir( + mock_configure_remote_cache_dir, mock_project_config, mock_object_storage_path +): + mock_remote_cache_dir_path = mock_object_storage_path.return_value + mock_remote_cache_dir_path.exists.return_value = True + mock_configure_remote_cache_dir.return_value = mock_remote_cache_dir_path dbt_ls_output = "sample dbt ls output" compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) @@ -1674,7 +1679,7 @@ def test_get_dbt_ls_cache_remote_cache_path(mock_project_config, mock_object_sto "last_modified": "2024-08-13T12:34:56Z", } - mock_remote_cache_key_path = mock_remote_cache_path / "some_cache_key" / "dbt_ls_cache.json" + mock_remote_cache_key_path = mock_remote_cache_dir_path / "some_cache_key" / "dbt_ls_cache.json" mock_remote_cache_key_path.exists.return_value = True mock_remote_cache_key_path.open.return_value.__enter__.return_value.read.return_value = json.dumps(cache_dict) diff --git a/tests/test_cache.py b/tests/test_cache.py index b9ab087a7..c4c7e45c9 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,7 +4,7 @@ import time from datetime import datetime, timedelta, timezone from pathlib import Path -from unittest.mock import call, patch +from unittest.mock import MagicMock, call, patch import pytest from airflow import DAG @@ -13,6 +13,7 @@ from airflow.utils.task_group import TaskGroup from cosmos.cache import ( + _configure_remote_cache_dir, _copy_partial_parse_to_project, _create_cache_identifier, _create_folder_version_hash, @@ -27,8 +28,14 @@ is_cache_package_lockfile_enabled, is_profile_cache_enabled, ) -from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME -from cosmos.settings import dbt_profile_cache_dir_name +from cosmos.constants import ( + DBT_PARTIAL_PARSE_FILE_NAME, + DBT_TARGET_DIR_NAME, + DEFAULT_PROFILES_FILE_NAME, + _default_s3_conn, +) +from cosmos.exceptions import CosmosValueError +from cosmos.settings import AIRFLOW_IO_AVAILABLE, dbt_profile_cache_dir_name START_DATE = datetime(2024, 4, 16) example_dag = DAG("dag", start_date=START_DATE) @@ -406,3 +413,55 @@ def test_get_latest_cached_lockfile_with_no_cache(mock_get_sha): # Test case where there is a cached file result = _get_latest_cached_package_lockfile(project_dir) assert result.exists() + + +@patch("cosmos.cache.settings_remote_cache_dir", new=None) +def test_remote_cache_path_initialization_no_remote_cache_dir(): + configured_remote_cache_dir = _configure_remote_cache_dir() + assert configured_remote_cache_dir is None + + +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("cosmos.cache.AIRFLOW_IO_AVAILABLE", new=False) +def test_remote_cache_path_initialization_object_storage_unavailable_on_earlier_airflow_versions(): + with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): + _configure_remote_cache_dir() + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_path_available_default_connection(mock_object_storage_path): + mock_cache_dir_path = MagicMock() + mock_cache_dir_path.exists.return_value = True + mock_object_storage_path.return_value = mock_cache_dir_path + + configured_remote_cache_dir = _configure_remote_cache_dir() + mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id=_default_s3_conn) + assert configured_remote_cache_dir == mock_cache_dir_path + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_dir_initialization_path_not_exist_creates_path(mock_object_storage_path): + mock_cache_dir_path = MagicMock() + mock_cache_dir_path.exists.return_value = False + mock_object_storage_path.return_value = mock_cache_dir_path + + _ = _configure_remote_cache_dir() + mock_cache_dir_path.mkdir.assert_called_once_with(parents=True, exist_ok=True) + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("cosmos.cache.remote_cache_dir_conn_id", new="my_conn_id") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_with_conn_id(mock_object_storage_path): + mock_cache_path = MagicMock() + mock_cache_path.exists.return_value = True + mock_object_storage_path.return_value = mock_cache_path + + configured_remote_cache_dir = _configure_remote_cache_dir() + mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id="my_conn_id") + assert configured_remote_cache_dir == mock_cache_path diff --git a/tests/test_config.py b/tests/test_config.py index 398bc1711..d557dd4fc 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,11 +1,11 @@ from contextlib import nullcontext as does_not_raise from pathlib import Path -from unittest.mock import MagicMock, Mock, PropertyMock, call, patch +from unittest.mock import Mock, PropertyMock, call, patch import pytest from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode, InvocationMode, _default_s3_conn +from cosmos.constants import ExecutionMode, InvocationMode from cosmos.exceptions import CosmosValueError from cosmos.profiles.athena.access_key import AthenaAccessKeyProfileMapping from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping @@ -331,55 +331,3 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife _ = ProjectConfig( dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id ) - - -@patch("cosmos.config.settings_remote_cache_path", new=None) -def test_remote_cache_path_initialization_no_remote_cache(): - config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project") - assert config.remote_cache_path is None - - -@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache") -@patch("cosmos.config.AIRFLOW_IO_AVAILABLE", new=False) -def test_remote_cache_path_initialization_object_storage_unavailable_on_earlier_airflow_versions(): - with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): - ProjectConfig(dbt_project_path="/some/path", project_name="test_project") - - -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache") -@patch("airflow.io.path.ObjectStoragePath") -def test_remote_cache_path_initialization_path_available_default_connection(mock_object_storage_path): - mock_cache_path = MagicMock() - mock_cache_path.exists.return_value = True - mock_object_storage_path.return_value = mock_cache_path - - config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project") - mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id=_default_s3_conn) - assert config.remote_cache_path == mock_cache_path - - -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache") -@patch("airflow.io.path.ObjectStoragePath") -def test_remote_cache_path_initialization_path_not_exist(mock_object_storage_path): - mock_cache_path = MagicMock() - mock_cache_path.exists.return_value = False - mock_object_storage_path.return_value = mock_cache_path - - with pytest.raises(CosmosValueError, match="remote_cache_path `s3://some-bucket/cache` does not exist"): - ProjectConfig(dbt_project_path="/some/path", project_name="test_project") - - -@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache") -@patch("cosmos.config.remote_cache_conn_id", new="my_conn_id") -@patch("airflow.io.path.ObjectStoragePath") -def test_remote_cache_path_initialization_with_conn_id(mock_object_storage_path): - mock_cache_path = MagicMock() - mock_cache_path.exists.return_value = True - mock_object_storage_path.return_value = mock_cache_path - - config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project") - mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id="my_conn_id") - assert config.remote_cache_path == mock_cache_path From 94eebc135d1331a2cd1cbf481272fac6b8ade03d Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 16 Aug 2024 02:09:52 +0530 Subject: [PATCH 14/14] Add utility to delete unused remote cache files and include it in example DAG --- cosmos/cache.py | 58 ++++++++++++++++++++++++++ dev/dags/example_cosmos_cleanup_dag.py | 11 ++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index 2c4ccf562..16ca7709d 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -411,6 +411,64 @@ def delete_unused_dbt_ls_cache( return deleted_cosmos_variables +# TODO: Add integration tests once remote cache is supported in the CI pipeline +@provide_session +def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover + max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None +) -> int: + """ + Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs. + """ + if session is None: + return 0 + + logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}") + cosmos_dags_ids_remote_cache_files = defaultdict(list) + + configured_remote_cache_dir = _configure_remote_cache_dir() + if not configured_remote_cache_dir: + logger.info( + "No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage." + ) + return 0 + + dirs = [obj for obj in configured_remote_cache_dir.iterdir() if obj.is_dir()] + files = [f for label in dirs for f in label.iterdir() if f.is_file()] + + total_cosmos_remote_cache_files = 0 + for file in files: + prefix_path = (configured_remote_cache_dir / VAR_KEY_CACHE_PREFIX).as_uri() + if file.as_uri().startswith(prefix_path): + with file.open("r") as fp: + cache_dict = json.load(fp) + cosmos_dags_ids_remote_cache_files[cache_dict["dag_id"]].append(file) + total_cosmos_remote_cache_files += 1 + + deleted_cosmos_remote_cache_files = 0 + + for dag_id, files in cosmos_dags_ids_remote_cache_files.items(): + last_dag_run = ( + session.query(DagRun) + .filter( + DagRun.dag_id == dag_id, + ) + .order_by(DagRun.execution_date.desc()) + .first() + ) + if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage): + for file in files: + logger.info(f"Removing the dbt ls cache remote file {file}") + file.unlink() + deleted_cosmos_remote_cache_files += 1 + logger.info( + "Deleted %s/%s dbt ls cache files in remote storage.", + deleted_cosmos_remote_cache_files, + total_cosmos_remote_cache_files, + ) + + return deleted_cosmos_remote_cache_files + + def is_profile_cache_enabled() -> bool: """Return True if global and profile cache is enable""" return enable_cache and enable_cache_profile diff --git a/dev/dags/example_cosmos_cleanup_dag.py b/dev/dags/example_cosmos_cleanup_dag.py index c93bdf002..1d37589a0 100644 --- a/dev/dags/example_cosmos_cleanup_dag.py +++ b/dev/dags/example_cosmos_cleanup_dag.py @@ -8,7 +8,7 @@ from airflow.decorators import dag, task -from cosmos.cache import delete_unused_dbt_ls_cache +from cosmos.cache import delete_unused_dbt_ls_cache, delete_unused_dbt_ls_remote_cache_files @dag( @@ -28,6 +28,15 @@ def clear_db_ls_cache(session=None): clear_db_ls_cache() + @task() + def clear_db_ls_remote_cache(session=None): + """ + Delete the dbt ls remote cache files that have not been used for the last five days. + """ + delete_unused_dbt_ls_remote_cache_files(max_age_last_usage=timedelta(days=5)) + + clear_db_ls_remote_cache() + # [END cache_example]