From 14d5e0b631d6766a5e342523f571c5b8dd62e3d6 Mon Sep 17 00:00:00 2001 From: Spencer horton Date: Sat, 9 Dec 2023 08:12:46 +1000 Subject: [PATCH] feat: add example dag + update dev env --- dev/Dockerfile | 2 +- dev/dags/example_athena_profile.py | 64 ++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 dev/dags/example_athena_profile.py diff --git a/dev/Dockerfile b/dev/Dockerfile index 90c49ed6c..4b2a7e3b0 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -7,7 +7,7 @@ COPY ./README.rst ${AIRFLOW_HOME}/astronomer_cosmos/ COPY ./cosmos/ ${AIRFLOW_HOME}/astronomer_cosmos/cosmos/ # install the package in editable mode -RUN pip install -e "${AIRFLOW_HOME}/astronomer_cosmos"[dbt-postgres,dbt-databricks] +RUN pip install -e "${AIRFLOW_HOME}/astronomer_cosmos"[dbt-postgres,dbt-databricks,dbt-athena] # make sure astro user owns the package RUN chown -R astro:astro ${AIRFLOW_HOME}/astronomer_cosmos diff --git a/dev/dags/example_athena_profile.py b/dev/dags/example_athena_profile.py new file mode 100644 index 000000000..45b2fb23c --- /dev/null +++ b/dev/dags/example_athena_profile.py @@ -0,0 +1,64 @@ +""" +An example DAG that uses Cosmos to render a dbt project as a TaskGroup. + +It uses the automatic profile rendering from an Airflow connection. +""" +import os +from datetime import datetime +from pathlib import Path + +from airflow.decorators import dag +from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig +from cosmos.profiles import get_automatic_profile_mapping +from cosmos.profiles.athena.access_key import AthenaAccessKeyProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + +DATABASE = "AwsDataCatalog" +REGION_NAME = "us-east-1" +S3_STAGING_DIR = "s3://staging-dir-example/" +SCHEMA = "example_schema" + + +@dag( + schedule_interval="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, +) +def example_athena_profile() -> None: + """ + Turns a dbt project into a TaskGroup with a profile mapping. + """ + pre_dbt = EmptyOperator(task_id="pre_dbt") + + jaffle_shop = DbtTaskGroup( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=ProfileConfig( + profile_name="athena", + target_name="dev", + profile_mapping=AthenaAccessKeyProfileMapping( + conn_id="athena_db", + profile_args={ + "database": DATABASE, + "region_name": REGION_NAME, + "s3_staging_dir": S3_STAGING_DIR, + "schema": SCHEMA, + "threads": 4, + }, + ), + ), + operator_args={"install_deps": True}, + default_args={"retries": 2}, + ) + + post_dbt = EmptyOperator(task_id="post_dbt") + + pre_dbt >> jaffle_shop >> post_dbt + + +example_athena_profile()