From d46cb7720c585b2d1229005d2206292825b16c77 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 26 Sep 2023 09:57:04 +0100 Subject: [PATCH 1/7] Fix on_warning_callback issue Since 1.1.0, the on_warning_callback functionality no longer works, it worked on 1.0.5 Closes: #549 --- cosmos/operators/local.py | 15 +++-- tests/operators/test_local.py | 44 ++++++++++++++- tests/sample/schema_failing_test.yml | 83 ++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 6 deletions(-) create mode 100644 tests/sample/schema_failing_test.yml diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index aaad4e259..56fc8ec7a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -40,9 +40,7 @@ FullOutputSubprocessHook, FullOutputSubprocessResult, ) -from cosmos.dbt.parser.output import ( - extract_log_issues, -) +from cosmos.dbt.parser.output import extract_log_issues, parse_output logger = get_logger(__name__) @@ -349,11 +347,12 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope job_facets=job_facets, ) - def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: + def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> FullOutputSubprocessResult: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) dbt_cmd = dbt_cmd or [] result = self.run_command(cmd=dbt_cmd, env=env, context=context) logger.info(result.output) + return result def execute(self, context: Context) -> None: self.build_and_run_cmd(context=context) @@ -471,7 +470,6 @@ def _should_run_tests( :param result: The output from the build and run command. """ - return self.on_warning_callback is not None and no_tests_message not in result.output def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) -> None: @@ -491,6 +489,13 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) if self.on_warning_callback: self.on_warning_callback(warning_context) + def execute(self, context: Context) -> None: + result = self.build_and_run_cmd(context=context) + if self._should_run_tests(result): + warnings = parse_output(result, "WARN") + if warnings > 0: + self._handle_warnings(result, context) + class DbtRunOperationLocalOperator(DbtLocalBaseOperator): """ diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 94b0f8e27..223305fbe 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1,3 +1,6 @@ +import os +import shutil +import tempfile from pathlib import Path from unittest.mock import MagicMock, patch @@ -28,6 +31,8 @@ DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" +SCHEMA_FAILING_TEST = Path(__file__).parent.parent / "sample/schema_failing_test.yml" + profile_config = ProfileConfig( profile_name="default", @@ -45,6 +50,18 @@ ) +@pytest.fixture +def failing_test_dbt_project(tmp_path): + tmp_dir = tempfile.TemporaryDirectory() + tmp_dir_path = Path(tmp_dir.name) / "jaffle_shop" + shutil.copytree(DBT_PROJ_DIR, tmp_dir_path) + target_schema = tmp_dir_path / "models/schema.yml" + os.remove(target_schema) + shutil.copy(SCHEMA_FAILING_TEST, target_schema) + yield tmp_dir_path + tmp_dir.cleanup() + + def test_dbt_base_operator_add_global_flags() -> None: dbt_base_operator = DbtLocalBaseOperator( profile_config=profile_config, @@ -175,7 +192,7 @@ def test_run_operator_dataset_inlets_and_outlets(): dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, ) - run_operator + run_operator >> test_operator run_test_dag(dag) assert run_operator.inlets == [] assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] @@ -183,6 +200,31 @@ def test_run_operator_dataset_inlets_and_outlets(): assert test_operator.outlets == [] +@pytest.mark.integration +def test_run_test_operator_with_callback(failing_test_dbt_project): + on_warning_callback = MagicMock() + + with DAG("test-id-2", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=failing_test_dbt_project, + task_id="run", + dbt_cmd_flags=["--models", "orders"], + install_deps=True, + ) + test_operator = DbtTestLocalOperator( + profile_config=real_profile_config, + project_dir=failing_test_dbt_project, + task_id="test", + dbt_cmd_flags=["--models", "orders"], + install_deps=True, + on_warning_callback=on_warning_callback, + ) + run_operator >> test_operator + run_test_dag(dag) + assert on_warning_callback.called + + @pytest.mark.integration def test_run_operator_emits_events(): class MockRun: diff --git a/tests/sample/schema_failing_test.yml b/tests/sample/schema_failing_test.yml new file mode 100644 index 000000000..c75df8152 --- /dev/null +++ b/tests/sample/schema_failing_test.yml @@ -0,0 +1,83 @@ +version: 2 + +models: + - name: customers + description: This table has basic information about a customer, as well as some derived facts based on a customer's orders + + columns: + - name: customer_id + description: This is a unique identifier for a customer + tests: + - unique + - not_null + + - name: first_name + description: Customer's first name. PII. + + - name: last_name + description: Customer's last name. PII. + + - name: first_order + description: Date (UTC) of a customer's first order + + - name: most_recent_order + description: Date (UTC) of a customer's most recent order + + - name: number_of_orders + description: Count of the number of orders a customer has placed + + - name: total_order_amount + description: Total value (AUD) of a customer's orders + + - name: orders + description: This table has basic information about orders, as well as some derived facts based on payments + + columns: + - name: order_id + tests: + - unique + - not_null + description: This is a unique identifier for an order + + - name: customer_id + description: Foreign key to the customers table + tests: + - not_null + - relationships: + to: ref('customers') + field: customer_id + + - name: order_date + description: Date (UTC) that the order was placed + + - name: status + description: '{{ doc("orders_status") }}' + tests: + - accepted_values: + # this test will fail, since this column has more values + values: ['placed'] + + - name: amount + description: Total amount (AUD) of the order + tests: + - not_null + + - name: credit_card_amount + description: Amount of the order (AUD) paid for by credit card + tests: + - not_null + + - name: coupon_amount + description: Amount of the order (AUD) paid for by coupon + tests: + - not_null + + - name: bank_transfer_amount + description: Amount of the order (AUD) paid for by bank transfer + tests: + - not_null + + - name: gift_card_amount + description: Amount of the order (AUD) paid for by gift card + tests: + - not_null From 1989c63f84b557db2a6f27c182f0069cabc9ad06 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 26 Sep 2023 23:38:04 +0100 Subject: [PATCH 2/7] Fix callback test --- tests/sample/schema_failing_test.yml | 71 ++-------------------------- 1 file changed, 3 insertions(+), 68 deletions(-) diff --git a/tests/sample/schema_failing_test.yml b/tests/sample/schema_failing_test.yml index c75df8152..f19639355 100644 --- a/tests/sample/schema_failing_test.yml +++ b/tests/sample/schema_failing_test.yml @@ -1,54 +1,11 @@ version: 2 models: - - name: customers - description: This table has basic information about a customer, as well as some derived facts based on a customer's orders - - columns: - - name: customer_id - description: This is a unique identifier for a customer - tests: - - unique - - not_null - - - name: first_name - description: Customer's first name. PII. - - - name: last_name - description: Customer's last name. PII. - - - name: first_order - description: Date (UTC) of a customer's first order - - - name: most_recent_order - description: Date (UTC) of a customer's most recent order - - - name: number_of_orders - description: Count of the number of orders a customer has placed - - - name: total_order_amount - description: Total value (AUD) of a customer's orders - name: orders description: This table has basic information about orders, as well as some derived facts based on payments columns: - - name: order_id - tests: - - unique - - not_null - description: This is a unique identifier for an order - - - name: customer_id - description: Foreign key to the customers table - tests: - - not_null - - relationships: - to: ref('customers') - field: customer_id - - - name: order_date - description: Date (UTC) that the order was placed - name: status description: '{{ doc("orders_status") }}' @@ -56,28 +13,6 @@ models: - accepted_values: # this test will fail, since this column has more values values: ['placed'] - - - name: amount - description: Total amount (AUD) of the order - tests: - - not_null - - - name: credit_card_amount - description: Amount of the order (AUD) paid for by credit card - tests: - - not_null - - - name: coupon_amount - description: Amount of the order (AUD) paid for by coupon - tests: - - not_null - - - name: bank_transfer_amount - description: Amount of the order (AUD) paid for by bank transfer - tests: - - not_null - - - name: gift_card_amount - description: Amount of the order (AUD) paid for by gift card - tests: - - not_null + config: + severity: warn + warn_if: ">1" From 920cdda2d198190516f42dd8f5fd4e83a60fbd46 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 26 Sep 2023 23:54:40 +0100 Subject: [PATCH 3/7] Add test for when callback is not called --- tests/operators/test_local.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 223305fbe..c41a627fb 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -225,6 +225,31 @@ def test_run_test_operator_with_callback(failing_test_dbt_project): assert on_warning_callback.called +@pytest.mark.integration +def test_run_test_operator_without_callback(): + on_warning_callback = MagicMock() + + with DAG("test-id-3", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "orders"], + install_deps=True, + ) + test_operator = DbtTestLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="test", + dbt_cmd_flags=["--models", "orders"], + install_deps=True, + exclude="relationships_orders_customer_id__customer_id__ref_customers_", + ) + run_operator >> test_operator + run_test_dag(dag) + assert not on_warning_callback.called + + @pytest.mark.integration def test_run_operator_emits_events(): class MockRun: From 6a09ce8660fe4814a2bd991e5b8b60f88a7411dd Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 27 Sep 2023 10:48:12 +0100 Subject: [PATCH 4/7] Improve tests for callback / dbt project fixture --- cosmos/operators/local.py | 2 +- tests/operators/test_local.py | 49 +++++++++---------- tests/sample/mini/dbt_project.yml | 20 ++++++++ tests/sample/mini/models/.gitkeep | 0 tests/sample/mini/profiles.yml | 12 +++++ .../sample/{ => mini}/schema_failing_test.yml | 8 +-- tests/sample/mini/seeds/.gitkeep | 0 tests/sample/mini/seeds/mini_orders.csv | 10 ++++ tests/sample/mini/snapshots/.gitkeep | 0 9 files changed, 71 insertions(+), 30 deletions(-) create mode 100644 tests/sample/mini/dbt_project.yml create mode 100644 tests/sample/mini/models/.gitkeep create mode 100644 tests/sample/mini/profiles.yml rename tests/sample/{ => mini}/schema_failing_test.yml (66%) create mode 100644 tests/sample/mini/seeds/.gitkeep create mode 100644 tests/sample/mini/seeds/mini_orders.csv create mode 100644 tests/sample/mini/snapshots/.gitkeep diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 56fc8ec7a..14f5f81b0 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -274,7 +274,7 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except (FileNotFoundError, NotImplementedError): + except (FileNotFoundError, NotImplementedError, ValueError): logger.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index c41a627fb..d03b16943 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -31,8 +31,9 @@ DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" -SCHEMA_FAILING_TEST = Path(__file__).parent.parent / "sample/schema_failing_test.yml" - +MINI_DBT_PROJ_DIR = Path(__file__).parent.parent / "sample/mini" +MINI_DBT_PROJ_DIR_FAILING_SCHEMA = MINI_DBT_PROJ_DIR / "schema_failing_test.yml" +MINI_DBT_PROJ_PROFILE = MINI_DBT_PROJ_DIR / "profiles.yml" profile_config = ProfileConfig( profile_name="default", @@ -49,15 +50,17 @@ ), ) +mini_profile_config = ProfileConfig(profile_name="mini", target_name="dev", profiles_yml_filepath=MINI_DBT_PROJ_PROFILE) + @pytest.fixture def failing_test_dbt_project(tmp_path): tmp_dir = tempfile.TemporaryDirectory() - tmp_dir_path = Path(tmp_dir.name) / "jaffle_shop" - shutil.copytree(DBT_PROJ_DIR, tmp_dir_path) + tmp_dir_path = Path(tmp_dir.name) / "mini" + shutil.copytree(MINI_DBT_PROJ_DIR, tmp_dir_path) target_schema = tmp_dir_path / "models/schema.yml" - os.remove(target_schema) - shutil.copy(SCHEMA_FAILING_TEST, target_schema) + target_schema.exists() and os.remove(target_schema) + shutil.copy(MINI_DBT_PROJ_DIR_FAILING_SCHEMA, target_schema) yield tmp_dir_path tmp_dir.cleanup() @@ -179,14 +182,14 @@ def test_run_operator_dataset_inlets_and_outlets(): with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: run_operator = DbtRunLocalOperator( - profile_config=real_profile_config, + profile_config=mini_profile_config, project_dir=DBT_PROJ_DIR, task_id="run", dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, ) test_operator = DbtTestLocalOperator( - profile_config=real_profile_config, + profile_config=mini_profile_config, project_dir=DBT_PROJ_DIR, task_id="test", dbt_cmd_flags=["--models", "stg_customers"], @@ -205,19 +208,17 @@ def test_run_test_operator_with_callback(failing_test_dbt_project): on_warning_callback = MagicMock() with DAG("test-id-2", start_date=datetime(2022, 1, 1)) as dag: - run_operator = DbtRunLocalOperator( - profile_config=real_profile_config, + run_operator = DbtSeedLocalOperator( + profile_config=mini_profile_config, project_dir=failing_test_dbt_project, task_id="run", - dbt_cmd_flags=["--models", "orders"], - install_deps=True, + append_env=True, ) test_operator = DbtTestLocalOperator( - profile_config=real_profile_config, + profile_config=mini_profile_config, project_dir=failing_test_dbt_project, task_id="test", - dbt_cmd_flags=["--models", "orders"], - install_deps=True, + append_env=True, on_warning_callback=on_warning_callback, ) run_operator >> test_operator @@ -230,20 +231,18 @@ def test_run_test_operator_without_callback(): on_warning_callback = MagicMock() with DAG("test-id-3", start_date=datetime(2022, 1, 1)) as dag: - run_operator = DbtRunLocalOperator( - profile_config=real_profile_config, - project_dir=DBT_PROJ_DIR, + run_operator = DbtSeedLocalOperator( + profile_config=mini_profile_config, + project_dir=MINI_DBT_PROJ_DIR, task_id="run", - dbt_cmd_flags=["--models", "orders"], - install_deps=True, + append_env=True, ) test_operator = DbtTestLocalOperator( - profile_config=real_profile_config, - project_dir=DBT_PROJ_DIR, + profile_config=mini_profile_config, + project_dir=MINI_DBT_PROJ_DIR, task_id="test", - dbt_cmd_flags=["--models", "orders"], - install_deps=True, - exclude="relationships_orders_customer_id__customer_id__ref_customers_", + append_env=True, + on_warning_callback=on_warning_callback, ) run_operator >> test_operator run_test_dag(dag) diff --git a/tests/sample/mini/dbt_project.yml b/tests/sample/mini/dbt_project.yml new file mode 100644 index 000000000..eaa39d188 --- /dev/null +++ b/tests/sample/mini/dbt_project.yml @@ -0,0 +1,20 @@ +name: 'mini' + +config-version: 2 +version: '0.1' + +profile: 'mini' + +model-paths: ["models"] +seed-paths: ["seeds"] +test-paths: ["tests"] +analysis-paths: ["analysis"] +macro-paths: ["macros"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_modules" + - "logs" + +require-dbt-version: [">=1.0.0", "<2.0.0"] diff --git a/tests/sample/mini/models/.gitkeep b/tests/sample/mini/models/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/sample/mini/profiles.yml b/tests/sample/mini/profiles.yml new file mode 100644 index 000000000..0c53a3894 --- /dev/null +++ b/tests/sample/mini/profiles.yml @@ -0,0 +1,12 @@ +mini: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4 diff --git a/tests/sample/schema_failing_test.yml b/tests/sample/mini/schema_failing_test.yml similarity index 66% rename from tests/sample/schema_failing_test.yml rename to tests/sample/mini/schema_failing_test.yml index f19639355..d97d1733d 100644 --- a/tests/sample/schema_failing_test.yml +++ b/tests/sample/mini/schema_failing_test.yml @@ -1,17 +1,17 @@ version: 2 -models: +seeds: - - name: orders + - name: mini_orders description: This table has basic information about orders, as well as some derived facts based on payments columns: - name: status - description: '{{ doc("orders_status") }}' + description: 'Order status' tests: - accepted_values: - # this test will fail, since this column has more values + # this will intentionally fail, since the seed has other values for this column values: ['placed'] config: severity: warn diff --git a/tests/sample/mini/seeds/.gitkeep b/tests/sample/mini/seeds/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/sample/mini/seeds/mini_orders.csv b/tests/sample/mini/seeds/mini_orders.csv new file mode 100644 index 000000000..31ac98589 --- /dev/null +++ b/tests/sample/mini/seeds/mini_orders.csv @@ -0,0 +1,10 @@ +id,user_id,order_date,status +1,1,2018-01-01,returned +2,3,2018-01-02,completed +3,22,2018-01-26,return_pending +4,9,2018-03-17,shipped +75,69,2018-03-18,completed +76,25,2018-03-20,completed +77,35,2018-03-21,shipped +78,90,2018-03-23,shipped +6,68,2018-03-26,placed diff --git a/tests/sample/mini/snapshots/.gitkeep b/tests/sample/mini/snapshots/.gitkeep new file mode 100644 index 000000000..e69de29bb From 220bec1449966d1e985525cd53d2cb8b814a1ec9 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 27 Sep 2023 11:18:50 +0100 Subject: [PATCH 5/7] Remove unnecessary method based on @edgga's comment on d70e6d7 and PR #556 --- cosmos/operators/local.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 14f5f81b0..bb71c16d6 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -459,19 +459,6 @@ def __init__( self.base_cmd = ["test"] self.on_warning_callback = on_warning_callback - def _should_run_tests( - self, - result: FullOutputSubprocessResult, - no_tests_message: str = "Nothing to do", - ) -> bool: - """ - Check if any tests are defined to run in the DAG. If tests are defined - and on_warning_callback is set, then function returns True. - - :param result: The output from the build and run command. - """ - return self.on_warning_callback is not None and no_tests_message not in result.output - def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) -> None: """ Handles warnings by extracting log issues, creating additional context, and calling the @@ -491,7 +478,7 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) def execute(self, context: Context) -> None: result = self.build_and_run_cmd(context=context) - if self._should_run_tests(result): + if self.on_warning_callback: warnings = parse_output(result, "WARN") if warnings > 0: self._handle_warnings(result, context) From 2574e2bf21a5e2f52678e8ab4ab53d469fa86268 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 27 Sep 2023 12:01:12 +0100 Subject: [PATCH 6/7] Undo test change out of scope --- tests/operators/test_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index d03b16943..3aa17e0f2 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -195,7 +195,7 @@ def test_run_operator_dataset_inlets_and_outlets(): dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, ) - run_operator >> test_operator + run_operator run_test_dag(dag) assert run_operator.inlets == [] assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] From 9f77ea0049a79b28189040b6da19a61e1ba6c493 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 27 Sep 2023 13:37:13 +0100 Subject: [PATCH 7/7] Fix test issue --- cosmos/operators/local.py | 2 +- tests/operators/test_local.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index bb71c16d6..52428753e 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -478,7 +478,7 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) def execute(self, context: Context) -> None: result = self.build_and_run_cmd(context=context) - if self.on_warning_callback: + if self.on_warning_callback and "WARN" in result.output: warnings = parse_output(result, "WARN") if warnings > 0: self._handle_warnings(result, context) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 3aa17e0f2..728eea079 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -182,18 +182,20 @@ def test_run_operator_dataset_inlets_and_outlets(): with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: run_operator = DbtRunLocalOperator( - profile_config=mini_profile_config, + profile_config=real_profile_config, project_dir=DBT_PROJ_DIR, task_id="run", dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, + append_env=True, ) test_operator = DbtTestLocalOperator( - profile_config=mini_profile_config, + profile_config=real_profile_config, project_dir=DBT_PROJ_DIR, task_id="test", dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, + append_env=True, ) run_operator run_test_dag(dag)