diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index aaad4e259..52428753e 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -40,9 +40,7 @@ FullOutputSubprocessHook, FullOutputSubprocessResult, ) -from cosmos.dbt.parser.output import ( - extract_log_issues, -) +from cosmos.dbt.parser.output import extract_log_issues, parse_output logger = get_logger(__name__) @@ -276,7 +274,7 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except (FileNotFoundError, NotImplementedError): + except (FileNotFoundError, NotImplementedError, ValueError): logger.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: @@ -349,11 +347,12 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope job_facets=job_facets, ) - def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: + def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> FullOutputSubprocessResult: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) dbt_cmd = dbt_cmd or [] result = self.run_command(cmd=dbt_cmd, env=env, context=context) logger.info(result.output) + return result def execute(self, context: Context) -> None: self.build_and_run_cmd(context=context) @@ -460,20 +459,6 @@ def __init__( self.base_cmd = ["test"] self.on_warning_callback = on_warning_callback - def _should_run_tests( - self, - result: FullOutputSubprocessResult, - no_tests_message: str = "Nothing to do", - ) -> bool: - """ - Check if any tests are defined to run in the DAG. If tests are defined - and on_warning_callback is set, then function returns True. - - :param result: The output from the build and run command. - """ - - return self.on_warning_callback is not None and no_tests_message not in result.output - def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) -> None: """ Handles warnings by extracting log issues, creating additional context, and calling the @@ -491,6 +476,13 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) if self.on_warning_callback: self.on_warning_callback(warning_context) + def execute(self, context: Context) -> None: + result = self.build_and_run_cmd(context=context) + if self.on_warning_callback and "WARN" in result.output: + warnings = parse_output(result, "WARN") + if warnings > 0: + self._handle_warnings(result, context) + class DbtRunOperationLocalOperator(DbtLocalBaseOperator): """ diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 94b0f8e27..728eea079 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1,3 +1,6 @@ +import os +import shutil +import tempfile from pathlib import Path from unittest.mock import MagicMock, patch @@ -28,6 +31,9 @@ DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" +MINI_DBT_PROJ_DIR = Path(__file__).parent.parent / "sample/mini" +MINI_DBT_PROJ_DIR_FAILING_SCHEMA = MINI_DBT_PROJ_DIR / "schema_failing_test.yml" +MINI_DBT_PROJ_PROFILE = MINI_DBT_PROJ_DIR / "profiles.yml" profile_config = ProfileConfig( profile_name="default", @@ -44,6 +50,20 @@ ), ) +mini_profile_config = ProfileConfig(profile_name="mini", target_name="dev", profiles_yml_filepath=MINI_DBT_PROJ_PROFILE) + + +@pytest.fixture +def failing_test_dbt_project(tmp_path): + tmp_dir = tempfile.TemporaryDirectory() + tmp_dir_path = Path(tmp_dir.name) / "mini" + shutil.copytree(MINI_DBT_PROJ_DIR, tmp_dir_path) + target_schema = tmp_dir_path / "models/schema.yml" + target_schema.exists() and os.remove(target_schema) + shutil.copy(MINI_DBT_PROJ_DIR_FAILING_SCHEMA, target_schema) + yield tmp_dir_path + tmp_dir.cleanup() + def test_dbt_base_operator_add_global_flags() -> None: dbt_base_operator = DbtLocalBaseOperator( @@ -167,6 +187,7 @@ def test_run_operator_dataset_inlets_and_outlets(): task_id="run", dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, + append_env=True, ) test_operator = DbtTestLocalOperator( profile_config=real_profile_config, @@ -174,6 +195,7 @@ def test_run_operator_dataset_inlets_and_outlets(): task_id="test", dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, + append_env=True, ) run_operator run_test_dag(dag) @@ -183,6 +205,52 @@ def test_run_operator_dataset_inlets_and_outlets(): assert test_operator.outlets == [] +@pytest.mark.integration +def test_run_test_operator_with_callback(failing_test_dbt_project): + on_warning_callback = MagicMock() + + with DAG("test-id-2", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtSeedLocalOperator( + profile_config=mini_profile_config, + project_dir=failing_test_dbt_project, + task_id="run", + append_env=True, + ) + test_operator = DbtTestLocalOperator( + profile_config=mini_profile_config, + project_dir=failing_test_dbt_project, + task_id="test", + append_env=True, + on_warning_callback=on_warning_callback, + ) + run_operator >> test_operator + run_test_dag(dag) + assert on_warning_callback.called + + +@pytest.mark.integration +def test_run_test_operator_without_callback(): + on_warning_callback = MagicMock() + + with DAG("test-id-3", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtSeedLocalOperator( + profile_config=mini_profile_config, + project_dir=MINI_DBT_PROJ_DIR, + task_id="run", + append_env=True, + ) + test_operator = DbtTestLocalOperator( + profile_config=mini_profile_config, + project_dir=MINI_DBT_PROJ_DIR, + task_id="test", + append_env=True, + on_warning_callback=on_warning_callback, + ) + run_operator >> test_operator + run_test_dag(dag) + assert not on_warning_callback.called + + @pytest.mark.integration def test_run_operator_emits_events(): class MockRun: diff --git a/tests/sample/mini/dbt_project.yml b/tests/sample/mini/dbt_project.yml new file mode 100644 index 000000000..eaa39d188 --- /dev/null +++ b/tests/sample/mini/dbt_project.yml @@ -0,0 +1,20 @@ +name: 'mini' + +config-version: 2 +version: '0.1' + +profile: 'mini' + +model-paths: ["models"] +seed-paths: ["seeds"] +test-paths: ["tests"] +analysis-paths: ["analysis"] +macro-paths: ["macros"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_modules" + - "logs" + +require-dbt-version: [">=1.0.0", "<2.0.0"] diff --git a/tests/sample/mini/models/.gitkeep b/tests/sample/mini/models/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/sample/mini/profiles.yml b/tests/sample/mini/profiles.yml new file mode 100644 index 000000000..0c53a3894 --- /dev/null +++ b/tests/sample/mini/profiles.yml @@ -0,0 +1,12 @@ +mini: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4 diff --git a/tests/sample/mini/schema_failing_test.yml b/tests/sample/mini/schema_failing_test.yml new file mode 100644 index 000000000..d97d1733d --- /dev/null +++ b/tests/sample/mini/schema_failing_test.yml @@ -0,0 +1,18 @@ +version: 2 + +seeds: + + - name: mini_orders + description: This table has basic information about orders, as well as some derived facts based on payments + + columns: + + - name: status + description: 'Order status' + tests: + - accepted_values: + # this will intentionally fail, since the seed has other values for this column + values: ['placed'] + config: + severity: warn + warn_if: ">1" diff --git a/tests/sample/mini/seeds/.gitkeep b/tests/sample/mini/seeds/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/sample/mini/seeds/mini_orders.csv b/tests/sample/mini/seeds/mini_orders.csv new file mode 100644 index 000000000..31ac98589 --- /dev/null +++ b/tests/sample/mini/seeds/mini_orders.csv @@ -0,0 +1,10 @@ +id,user_id,order_date,status +1,1,2018-01-01,returned +2,3,2018-01-02,completed +3,22,2018-01-26,return_pending +4,9,2018-03-17,shipped +75,69,2018-03-18,completed +76,25,2018-03-20,completed +77,35,2018-03-21,shipped +78,90,2018-03-23,shipped +6,68,2018-03-26,placed diff --git a/tests/sample/mini/snapshots/.gitkeep b/tests/sample/mini/snapshots/.gitkeep new file mode 100644 index 000000000..e69de29bb