diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 219ea2019..a5736c295 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -163,7 +163,6 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 - SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 @@ -235,7 +234,6 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 - SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 @@ -379,7 +377,6 @@ jobs: POSTGRES_DB: postgres POSTGRES_SCHEMA: public POSTGRES_PORT: 5432 - SOURCE_RENDERING_BEHAVIOR: all - name: Upload coverage to Github uses: actions/upload-artifact@v2 @@ -461,12 +458,82 @@ jobs: AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + Run-Kubernetes-Tests: + needs: Authorize + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.11" ] + airflow-version: [ "2.9" ] + steps: + - uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + - uses: actions/cache@v3 + with: + path: | + ~/.cache/pip + .local/share/hatch/ + key: coverage-integration-kubernetes-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Create KinD cluster + uses: container-tools/kind-action@v1 + + - name: Install packages and dependencies + run: | + python -m venv venv + source venv/bin/activate + pip install --upgrade pip + pip install -e ".[tests]" + pip install apache-airflow-providers-cncf-kubernetes + pip install dbt-postgres==1.8.2 psycopg2==2.9.3 pytz + pip install apache-airflow==${{ matrix.airflow-version }} + # hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + + - name: Run kubernetes tests + run: | + source venv/bin/activate + sh ./scripts/test/kubernetes-setup.sh + cd dev && sh ../scripts/test/integration-kubernetes.sh + # hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-kubernetes + env: + AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ + AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres + AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} + AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 + PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }} + DATABRICKS_CLUSTER_ID: mock + DATABRICKS_HOST: mock + DATABRICKS_WAREHOUSE_ID: mock + DATABRICKS_TOKEN: mock + POSTGRES_HOST: localhost + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + POSTGRES_SCHEMA: public + POSTGRES_PORT: 5432 + + - name: Upload coverage to Github + uses: actions/upload-artifact@v2 + with: + name: coverage-integration-kubernetes-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + path: .coverage + Code-Coverage: if: github.event.action != 'labeled' needs: - Run-Unit-Tests - Run-Integration-Tests - Run-Integration-Tests-Expensive + - Run-Kubernetes-Tests runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/dev/Dockerfile.postgres_profile_docker_k8s b/dev/Dockerfile.postgres_profile_docker_k8s new file mode 100644 index 000000000..ae81a5b9f --- /dev/null +++ b/dev/Dockerfile.postgres_profile_docker_k8s @@ -0,0 +1,18 @@ +FROM python:3.11 + +RUN pip install dbt-postgres==1.8.2 psycopg2==2.9.3 pytz + +ENV POSTGRES_DATABASE=postgres +ENV POSTGRES_DB=postgres +ENV POSTGRES_HOST=postgres.default.svc.cluster.local +ENV POSTGRES_PASSWORD=postgres +ENV POSTGRES_PORT=5432 +ENV POSTGRES_SCHEMA=public +ENV POSTGRES_USER=postgres + +RUN mkdir /root/.dbt +COPY dags/dbt/jaffle_shop/profiles.yml /root/.dbt/profiles.yml + +RUN mkdir dags +COPY dags dags +RUN rm dags/dbt/jaffle_shop/packages.yml diff --git a/dev/dags/dbt/jaffle_shop/profiles.yml b/dev/dags/dbt/jaffle_shop/profiles.yml index 224f565f4..3960ca84d 100644 --- a/dev/dags/dbt/jaffle_shop/profiles.yml +++ b/dev/dags/dbt/jaffle_shop/profiles.yml @@ -10,3 +10,15 @@ default: dbname: "{{ env_var('POSTGRES_DB') }}" schema: "{{ env_var('POSTGRES_SCHEMA') }}" threads: 4 + +postgres_profile: + target: dev + outputs: + dev: + type: postgres + dbname: "{{ env_var('POSTGRES_DATABASE') }}" + host: "{{ env_var('POSTGRES_HOST') }}" + pass: "{{ env_var('POSTGRES_PASSWORD') }}" + port: 5432 # "{{ env_var('POSTGRES_PORT') | as_number }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + user: "{{ env_var('POSTGRES_USER') }}" diff --git a/dev/dags/jaffle_shop_kubernetes.py b/dev/dags/jaffle_shop_kubernetes.py new file mode 100644 index 000000000..aaca09ad9 --- /dev/null +++ b/dev/dags/jaffle_shop_kubernetes.py @@ -0,0 +1,98 @@ +""" +## Jaffle Shop DAG +[Jaffle Shop](https://github.com/dbt-labs/jaffle_shop) is a fictional eCommerce store. This dbt project originates from +dbt labs as an example project with dummy data to demonstrate a working dbt core project. This DAG uses the cosmos dbt +parser to generate an Airflow TaskGroup from the dbt project folder. + + +The step-by-step to run this DAG are described in: +https://astronomer.github.io/astronomer-cosmos/getting_started/kubernetes.html#kubernetes + +""" + +from airflow import DAG +from airflow.providers.cncf.kubernetes.secret import Secret +from pendulum import datetime + +from cosmos import ( + DbtSeedKubernetesOperator, + DbtTaskGroup, + ExecutionConfig, + ExecutionMode, + ProfileConfig, + ProjectConfig, +) +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DBT_IMAGE = "dbt-jaffle-shop:1.0.0" + +project_seeds = [{"project": "jaffle_shop", "seeds": ["raw_customers", "raw_payments", "raw_orders"]}] + +postgres_password_secret = Secret( + deploy_type="env", + deploy_target="POSTGRES_PASSWORD", + secret="postgres-secrets", + key="password", +) + +postgres_host_secret = Secret( + deploy_type="env", + deploy_target="POSTGRES_HOST", + secret="postgres-secrets", + key="host", +) + +with DAG( + dag_id="jaffle_shop_kubernetes", + start_date=datetime(2022, 11, 27), + doc_md=__doc__, + catchup=False, +) as dag: + # [START kubernetes_seed_example] + load_seeds = DbtSeedKubernetesOperator( + task_id="load_seeds", + project_dir="dags/dbt/jaffle_shop", + get_logs=True, + schema="public", + image=DBT_IMAGE, + is_delete_operator_pod=False, + secrets=[postgres_password_secret, postgres_host_secret], + profile_config=ProfileConfig( + profile_name="postgres_profile", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="postgres_default", + profile_args={ + "schema": "public", + }, + ), + ), + ) + # [END kubernetes_seed_example] + + # [START kubernetes_tg_example] + run_models = DbtTaskGroup( + profile_config=ProfileConfig( + profile_name="postgres_profile", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="postgres_default", + profile_args={ + "schema": "public", + }, + ), + ), + project_config=ProjectConfig(dbt_project_path="dags/dbt/jaffle_shop"), + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.KUBERNETES, + ), + operator_args={ + "image": DBT_IMAGE, + "get_logs": True, + "is_delete_operator_pod": False, + "secrets": [postgres_password_secret, postgres_host_secret], + }, + ) + # [END kubernetes_tg_example] + + load_seeds >> run_models diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst index 266b40f32..df9b0f26d 100644 --- a/docs/getting_started/execution-modes.rst +++ b/docs/getting_started/execution-modes.rst @@ -144,27 +144,11 @@ Check the step-by-step guide on using the ``kubernetes`` execution mode at :ref: Example DAG: -.. code-block:: python - - postgres_password_secret = Secret( - deploy_type="env", - deploy_target="POSTGRES_PASSWORD", - secret="postgres-secrets", - key="password", - ) +.. literalinclude:: ../../dev/dags/jaffle_shop_kubernetes.py + :language: python + :start-after: [START kubernetes_seed_example] + :end-before: [END kubernetes_seed_example] - docker_cosmos_dag = DbtDag( - # ... - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.KUBERNETES, - ), - operator_args={ - "image": "dbt-jaffle-shop:1.0.0", - "get_logs": True, - "is_delete_operator_pod": False, - "secrets": [postgres_password_secret], - }, - ) AWS_EKS ---------- diff --git a/docs/getting_started/kubernetes.rst b/docs/getting_started/kubernetes.rst index 1ae918a9a..bfa02023d 100644 --- a/docs/getting_started/kubernetes.rst +++ b/docs/getting_started/kubernetes.rst @@ -28,30 +28,10 @@ Additional KubernetesPodOperator parameters can be added on the operator_args pa For instance, -.. code-block:: python - - run_models = DbtTaskGroup( - profile_config=ProfileConfig( - profile_name="postgres_profile", - target_name="dev", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="postgres_default", - profile_args={ - "schema": "public", - }, - ), - ), - project_config=ProjectConfig(PROJECT_DIR), - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.KUBERNETES, - ), - operator_args={ - "image": DBT_IMAGE, - "get_logs": True, - "is_delete_operator_pod": False, - "secrets": [postgres_password_secret, postgres_host_secret], - }, - ) +.. literalinclude:: ../../dev/dags/jaffle_shop_kubernetes.py + :language: python + :start-after: [START kubernetes_tg_example] + :end-before: [END kubernetes_tg_example] Step-by-step instructions +++++++++++++++++++++++++ diff --git a/pyproject.toml b/pyproject.toml index 8183d4b64..93b97541f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -168,6 +168,7 @@ freeze = "pip freeze" test = 'sh scripts/test/unit.sh' test-cov = 'sh scripts/test/unit-cov.sh' test-integration = 'sh scripts/test/integration.sh' +test-kubernetes = "sh scripts/test/integration-kubernetes.sh" test-integration-dbt-1-5-4 = 'sh scripts/test/integration-dbt-1-5-4.sh' test-integration-expensive = 'sh scripts/test/integration-expensive.sh' test-integration-setup = 'sh scripts/test/integration-setup.sh' diff --git a/scripts/test/integration-dbt-1-5-4.sh b/scripts/test/integration-dbt-1-5-4.sh index 284f60517..bb936fc21 100644 --- a/scripts/test/integration-dbt-1-5-4.sh +++ b/scripts/test/integration-dbt-1-5-4.sh @@ -10,4 +10,5 @@ pytest -vv \ --durations=0 \ -m integration \ --ignore=tests/perf \ + --ignore=tests/test_example_k8s_dags.py \ -k 'basic_cosmos_task_group' diff --git a/scripts/test/integration-expensive.sh b/scripts/test/integration-expensive.sh index 96c2388cf..a8d94546c 100644 --- a/scripts/test/integration-expensive.sh +++ b/scripts/test/integration-expensive.sh @@ -6,4 +6,5 @@ pytest -vv \ --durations=0 \ -m integration \ --ignore=tests/perf \ + --ignore=tests/test_example_k8s_dags.py \ -k 'example_cosmos_python_models or example_virtualenv' diff --git a/scripts/test/integration-kubernetes.sh b/scripts/test/integration-kubernetes.sh new file mode 100644 index 000000000..1cc305c62 --- /dev/null +++ b/scripts/test/integration-kubernetes.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -x +set -e + +# Reset the Airflow database to its initial state +airflow db reset -y + +# Run tests using pytest +pytest -vv \ + --cov=cosmos \ + --cov-report=term-missing \ + --cov-report=xml \ + --durations=0 \ + -m integration \ + ../tests/test_example_k8s_dags.py diff --git a/scripts/test/integration-sqlite.sh b/scripts/test/integration-sqlite.sh index dc32324d4..dab70efb4 100644 --- a/scripts/test/integration-sqlite.sh +++ b/scripts/test/integration-sqlite.sh @@ -5,4 +5,5 @@ pytest -vv \ --durations=0 \ -m integration \ --ignore=tests/perf \ + --ignore=tests/test_example_k8s_dags.py \ -k 'example_cosmos_sources or sqlite' diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index a39ef63b1..db11bff6c 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -20,4 +20,5 @@ pytest -vv \ --durations=0 \ -m integration \ --ignore=tests/perf \ - -k 'not (sqlite or example_cosmos_sources or example_cosmos_python_models or example_virtualenv)' + --ignore=tests/test_example_k8s_dags.py \ + -k 'not (sqlite or example_cosmos_sources or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes)' diff --git a/scripts/test/kubernetes-setup.sh b/scripts/test/kubernetes-setup.sh new file mode 100644 index 000000000..d40aaa187 --- /dev/null +++ b/scripts/test/kubernetes-setup.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +# Print each command before executing it +# Exit the script immediately if any command exits with a non-zero status (for debugging purposes) +set -x +set -e + +# Create a Kubernetes secret named 'postgres-secrets' with the specified literals for host and password +kubectl create secret generic postgres-secrets \ + --from-literal=host=postgres-postgresql.default.svc.cluster.local \ + --from-literal=password=postgres + +# Apply the PostgreSQL deployment configuration from the specified YAML file +kubectl apply -f scripts/test/postgres-deployment.yaml + +# Build the Docker image with tag 'dbt-jaffle-shop:1.0.0' using the specified Dockerfile +cd dev && docker build --progress=plain --no-cache -t dbt-jaffle-shop:1.0.0 -f Dockerfile.postgres_profile_docker_k8s . + +# Load the Docker image into the local KIND cluster +kind load docker-image dbt-jaffle-shop:1.0.0 + +# Retrieve the name of the PostgreSQL pod using the label selector 'app=postgres' +# The output is filtered to get the first pod's name +POD_NAME=$(kubectl get pods -n default -l app=postgres -o jsonpath='{.items[0].metadata.name}') + +# Print the name of the PostgreSQL pod +echo "$POD_NAME" + +# Forward port 5432 from the PostgreSQL pod to the local machine's port 5432 +# This allows local access to the PostgreSQL instance running in the pod +kubectl port-forward --namespace default "$POD_NAME" 5432:5432 & + +# List all pods in the default namespace to verify the status of pods +kubectl get pod diff --git a/scripts/test/performance.sh b/scripts/test/performance.sh index ea58c1960..2023026d3 100644 --- a/scripts/test/performance.sh +++ b/scripts/test/performance.sh @@ -2,4 +2,5 @@ pytest -vv \ -s \ -m 'perf' \ --ignore=tests/test_example_dags.py \ - --ignore=tests/test_example_dags_no_connections.py + --ignore=tests/test_example_dags_no_connections.py \ + --ignore=tests/test_example_k8s_dags.py diff --git a/scripts/test/postgres-deployment.yaml b/scripts/test/postgres-deployment.yaml new file mode 100644 index 000000000..8939d7f36 --- /dev/null +++ b/scripts/test/postgres-deployment.yaml @@ -0,0 +1,59 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: postgres + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: bitnami/postgresql:latest + env: + - name: POSTGRES_DB + value: postgres + - name: POSTGRES_HOST + valueFrom: + secretKeyRef: + name: postgres-secrets + key: host + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: postgres-secrets + key: password + ports: + - containerPort: 5432 + +--- +apiVersion: v1 +kind: Secret +metadata: + name: postgres-secrets +type: Opaque +data: + host: cG9zdGdyZXMuZGVmYXVsdC5zdmMuY2x1c3Rlci5sb2NhbA== + password: cG9zdGdyZXM= + +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: default +spec: + type: NodePort + ports: + - port: 5432 + targetPort: 5432 + protocol: TCP + nodePort: 32583 + selector: + app: postgres diff --git a/scripts/test/unit-cov.sh b/scripts/test/unit-cov.sh index 89a6244ba..d134c7494 100644 --- a/scripts/test/unit-cov.sh +++ b/scripts/test/unit-cov.sh @@ -7,4 +7,5 @@ pytest \ -m "not (integration or perf)" \ --ignore=tests/perf \ --ignore=tests/test_example_dags.py \ - --ignore=tests/test_example_dags_no_connections.py + --ignore=tests/test_example_dags_no_connections.py \ + --ignore=tests/test_example_k8s_dags.py diff --git a/scripts/test/unit.sh b/scripts/test/unit.sh index ecc1a049a..a60f3c85b 100644 --- a/scripts/test/unit.sh +++ b/scripts/test/unit.sh @@ -4,4 +4,5 @@ pytest \ -m "not (integration or perf)" \ --ignore=tests/perf \ --ignore=tests/test_example_dags.py \ - --ignore=tests/test_example_dags_no_connections.py + --ignore=tests/test_example_dags_no_connections.py \ + --ignore=tests/test_example_k8s_dags.py diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 8b526a8ab..d1891169d 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1518,9 +1518,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "a9879ec2ec503b0fe023d059caf50d41" + assert hash_dir == "b556a0a268e28868971d14c98548c349" else: - assert hash_dir == "9001bedf4aa8a329f7b669c89f337c24" + assert hash_dir == "6f63493009733a7be34364a6ea3ffd3c" @pytest.mark.integration diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index e10093bb5..9f8601156 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -24,13 +24,14 @@ AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" DBT_VERSION = Version(get_dbt_version().to_version_string()[1:]) AIRFLOW_VERSION = Version(airflow.__version__) +KUBERNETES_DAGS = ["jaffle_shop_kubernetes"] MIN_VER_DAG_FILE: dict[str, list[str]] = { "2.4": ["cosmos_seed_dag.py"], "2.8": ["cosmos_manifest_example.py"], } -IGNORED_DAG_FILES = ["performance_dag.py"] +IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py"] # Sort descending based on Versions and convert string to an actual version @@ -53,6 +54,7 @@ def session(): @cache def get_dag_bag() -> DagBag: """Create a DagBag by adding the files that are not supported to .airflowignore""" + if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS: return DagBag(dag_folder=None, include_examples=False) @@ -92,6 +94,8 @@ def get_dag_ids() -> list[str]: @pytest.mark.integration @pytest.mark.parametrize("dag_id", get_dag_ids()) def test_example_dag(session, dag_id: str): + if dag_id in KUBERNETES_DAGS: + return dag_bag = get_dag_bag() dag = dag_bag.get_dag(dag_id) test_utils.run_dag(dag) diff --git a/tests/test_example_dags_no_connections.py b/tests/test_example_dags_no_connections.py index d075a389d..0cc560ecc 100644 --- a/tests/test_example_dags_no_connections.py +++ b/tests/test_example_dags_no_connections.py @@ -22,7 +22,7 @@ "2.8": ["cosmos_manifest_example.py"], } -IGNORED_DAG_FILES = ["performance_dag.py"] +IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py"] # Sort descending based on Versions and convert string to an actual version MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = { diff --git a/tests/test_example_k8s_dags.py b/tests/test_example_k8s_dags.py new file mode 100644 index 000000000..2b24fd466 --- /dev/null +++ b/tests/test_example_k8s_dags.py @@ -0,0 +1,44 @@ +import os +from pathlib import Path + +import pytest +from airflow.models.dagbag import DagBag +from airflow.utils.db import create_default_connections +from airflow.utils.session import provide_session + +from . import utils as test_utils + +EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" +AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" + +KUBERNETES_DAG_FILES = ["jaffle_shop_kubernetes.py"] + + +@provide_session +def get_session(session=None): + create_default_connections(session) + return session + + +@pytest.fixture() +def session(): + return get_session() + + +def get_all_dag_files(): + python_files = [] + for file in os.listdir(EXAMPLE_DAGS_DIR): + if file.endswith(".py") and file not in KUBERNETES_DAG_FILES: + python_files.append(file) + + with open(AIRFLOW_IGNORE_FILE, "w+") as dag_ignorefile: + dag_ignorefile.writelines([f"{file}\n" for file in python_files]) + + +@pytest.mark.integration +def test_example_dag_kubernetes(session): + get_all_dag_files() + db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) + # for dag_id in KUBERNETES_DAG_FILES: + dag = db.get_dag("jaffle_shop_kubernetes") + test_utils.run_dag(dag)