From ceec401e63c04a5f12b415d2bcf4ea9ce641c3a0 Mon Sep 17 00:00:00 2001 From: chowbao Date: Thu, 18 Jul 2024 09:53:27 -0400 Subject: [PATCH] Update airflow resources and split out dbt tests and alerts (#433) Update airflow resources and split out dbt tests and alerts --- airflow_variables_dev.json | 29 ++++++++------------- airflow_variables_prod.json | 31 +++++++++-------------- dags/dbt_data_quality_alerts_dag.py | 15 +++-------- dags/dbt_singular_tests_dag.py | 39 +++++++++++++++++++++++++++++ dags/history_tables_dag.py | 7 ++++++ dags/state_table_dag.py | 1 + 6 files changed, 74 insertions(+), 48 deletions(-) create mode 100644 dags/dbt_singular_tests_dag.py diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 496e1e3a..ac7dd616 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -122,7 +122,7 @@ "partnership_assets__account_holders_activity_fact": false, "partnership_assets__asset_activity_fact": false }, - "dbt_image_name": "stellar/stellar-dbt:1a777f9", + "dbt_image_name": "stellar/stellar-dbt:5c7c924", "dbt_internal_source_db": "test-hubble-319619", "dbt_internal_source_schema": "test_crypto_stellar_internal", "dbt_job_execution_timeout_seconds": 300, @@ -154,7 +154,7 @@ }, "gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:98bea9a", + "image_name": "stellar/stellar-etl:ab57fa4", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "k8s_namespace": "hubble-composer", @@ -283,32 +283,25 @@ "public_dataset": "test_crypto_stellar", "public_project": "test-hubble-319619", "resources": { - "cc": { + "dbt": { "requests": { - "cpu": "0.3", - "ephemeral-storage": "1Gi", - "memory": "900Mi" + "cpu": "1", + "ephemeral-storage": "500Mi", + "memory": "600Mi" } }, "default": { "requests": { "cpu": "0.3", - "ephemeral-storage": "1Gi", - "memory": "900Mi" - } - }, - "state": { - "requests": { - "cpu": "0.3", - "ephemeral-storage": "1Gi", - "memory": "900Mi" + "ephemeral-storage": "500Mi", + "memory": "600Mi" } }, - "wocc": { + "stellaretl": { "requests": { "cpu": "0.3", - "ephemeral-storage": "1Gi", - "memory": "900Mi" + "ephemeral-storage": "500Mi", + "memory": "600Mi" } } }, diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index b05d2b70..6ef19808 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -123,7 +123,7 @@ "partnership_assets__asset_activity_fact": false, "trade_agg": false }, - "dbt_image_name": "stellar/stellar-dbt:1a777f9", + "dbt_image_name": "stellar/stellar-dbt:5c7c924", "dbt_internal_source_db": "hubble-261722", "dbt_internal_source_schema": "crypto_stellar_internal_2", "dbt_job_execution_timeout_seconds": 1800, @@ -155,7 +155,7 @@ }, "gcs_exported_data_bucket_name": "us-central1-hubble-14c4ca64-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:98bea9a", + "image_name": "stellar/stellar-etl:ab57fa4", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "k8s_namespace": "hubble-composer", @@ -281,32 +281,25 @@ "public_dataset": "crypto_stellar", "public_project": "crypto-stellar", "resources": { - "cc": { + "dbt": { "requests": { - "cpu": "3.5", - "ephemeral-storage": "10Gi", - "memory": "15Gi" + "cpu": "1", + "ephemeral-storage": "1Gi", + "memory": "1Gi" } }, "default": { "requests": { - "cpu": "3.5", + "cpu": "0.5", "ephemeral-storage": "1Gi", - "memory": "5Gi" + "memory": "1Gi" } }, - "state": { + "stellaretl": { "requests": { - "cpu": "3.5", - "ephemeral-storage": "12Gi", - "memory": "20Gi" - } - }, - "wocc": { - "requests": { - "cpu": "3.5", - "ephemeral-storage": "10Gi", - "memory": "15Gi" + "cpu": "0.5", + "ephemeral-storage": "1Gi", + "memory": "1Gi" } } }, diff --git a/dags/dbt_data_quality_alerts_dag.py b/dags/dbt_data_quality_alerts_dag.py index 1d3a195c..3a599218 100644 --- a/dags/dbt_data_quality_alerts_dag.py +++ b/dags/dbt_data_quality_alerts_dag.py @@ -18,7 +18,7 @@ default_args=get_default_dag_args(), start_date=datetime(2024, 6, 25, 0, 0), description="This DAG runs dbt tests and Elementary alerts at a half-hourly cadence", - schedule="*/15,*/45 * * * *", # Runs every 15th minute and every 45th minute + schedule="*/15 * * * *", # Runs every 15 minutes user_defined_filters={ "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), }, @@ -28,14 +28,7 @@ # sla_miss_callback=alert_sla_miss, ) as dag: - # DBT tests to run - singular_tests = dbt_task( - dag, - command_type="test", - tag="singular_test", - ) - singular_tests_elementary_alerts = elementary_task(dag, "dbt_data_quality") - start_tests = EmptyOperator(task_id="start_tests_task") + # Trigger elementary + elementary_alerts = elementary_task(dag, "dbt_data_quality", resource_cfg="dbt") - # DAG task graph - start_tests >> singular_tests >> singular_tests_elementary_alerts + elementary_alerts diff --git a/dags/dbt_singular_tests_dag.py b/dags/dbt_singular_tests_dag.py new file mode 100644 index 00000000..bb828870 --- /dev/null +++ b/dags/dbt_singular_tests_dag.py @@ -0,0 +1,39 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.empty import EmptyOperator +from kubernetes.client import models as k8s +from stellar_etl_airflow.build_dbt_task import dbt_task +from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) + +init_sentry() + +with DAG( + "dbt_singular_tests", + default_args=get_default_dag_args(), + start_date=datetime(2024, 6, 25, 0, 0), + description="This DAG runs non-model dbt tests half-hourly cadence", + schedule="*/30 * * * *", # Runs every 30 minutes + user_defined_filters={ + "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + }, + max_active_runs=1, + catchup=False, + tags=["dbt-data-quality"], + # sla_miss_callback=alert_sla_miss, +) as dag: + + # DBT tests to run + singular_tests = dbt_task( + dag, + command_type="test", + tag="singular_test", + resource_cfg="dbt", + ) + + singular_tests diff --git a/dags/history_tables_dag.py b/dags/history_tables_dag.py index 68b7cbbd..2d073f04 100644 --- a/dags/history_tables_dag.py +++ b/dags/history_tables_dag.py @@ -112,6 +112,7 @@ use_gcs=True, use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, + resource_cfg="stellaretl", ) trade_export_task = build_export_task( @@ -124,6 +125,7 @@ use_gcs=True, use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, + resource_cfg="stellaretl", ) effects_export_task = build_export_task( @@ -136,6 +138,7 @@ use_gcs=True, use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, + resource_cfg="stellaretl", ) tx_export_task = build_export_task( @@ -148,6 +151,7 @@ use_gcs=True, use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, + resource_cfg="stellaretl", ) ledger_export_task = build_export_task( @@ -160,6 +164,7 @@ use_gcs=True, use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, + resource_cfg="stellaretl", ) asset_export_task = build_export_task( @@ -172,6 +177,7 @@ use_gcs=True, use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, + resource_cfg="stellaretl", ) contract_events_export_task = build_export_task( @@ -184,6 +190,7 @@ use_gcs=True, use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, + resource_cfg="stellaretl", ) """ diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 236027f1..437db4f5 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -102,6 +102,7 @@ use_gcs=True, use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, + resource_cfg="stellaretl", ) """