Skip to content

Commit

Permalink
Update airflow resources and split out dbt tests and alerts (#433)
Browse files Browse the repository at this point in the history
Update airflow resources and split out dbt tests and alerts
  • Loading branch information
chowbao authored Jul 18, 2024
1 parent 1475901 commit ceec401
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 48 deletions.
29 changes: 11 additions & 18 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
"partnership_assets__account_holders_activity_fact": false,
"partnership_assets__asset_activity_fact": false
},
"dbt_image_name": "stellar/stellar-dbt:1a777f9",
"dbt_image_name": "stellar/stellar-dbt:5c7c924",
"dbt_internal_source_db": "test-hubble-319619",
"dbt_internal_source_schema": "test_crypto_stellar_internal",
"dbt_job_execution_timeout_seconds": 300,
Expand Down Expand Up @@ -154,7 +154,7 @@
},
"gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:98bea9a",
"image_name": "stellar/stellar-etl:ab57fa4",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"k8s_namespace": "hubble-composer",
Expand Down Expand Up @@ -283,32 +283,25 @@
"public_dataset": "test_crypto_stellar",
"public_project": "test-hubble-319619",
"resources": {
"cc": {
"dbt": {
"requests": {
"cpu": "0.3",
"ephemeral-storage": "1Gi",
"memory": "900Mi"
"cpu": "1",
"ephemeral-storage": "500Mi",
"memory": "600Mi"
}
},
"default": {
"requests": {
"cpu": "0.3",
"ephemeral-storage": "1Gi",
"memory": "900Mi"
}
},
"state": {
"requests": {
"cpu": "0.3",
"ephemeral-storage": "1Gi",
"memory": "900Mi"
"ephemeral-storage": "500Mi",
"memory": "600Mi"
}
},
"wocc": {
"stellaretl": {
"requests": {
"cpu": "0.3",
"ephemeral-storage": "1Gi",
"memory": "900Mi"
"ephemeral-storage": "500Mi",
"memory": "600Mi"
}
}
},
Expand Down
31 changes: 12 additions & 19 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
"partnership_assets__asset_activity_fact": false,
"trade_agg": false
},
"dbt_image_name": "stellar/stellar-dbt:1a777f9",
"dbt_image_name": "stellar/stellar-dbt:5c7c924",
"dbt_internal_source_db": "hubble-261722",
"dbt_internal_source_schema": "crypto_stellar_internal_2",
"dbt_job_execution_timeout_seconds": 1800,
Expand Down Expand Up @@ -155,7 +155,7 @@
},
"gcs_exported_data_bucket_name": "us-central1-hubble-14c4ca64-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:98bea9a",
"image_name": "stellar/stellar-etl:ab57fa4",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"k8s_namespace": "hubble-composer",
Expand Down Expand Up @@ -281,32 +281,25 @@
"public_dataset": "crypto_stellar",
"public_project": "crypto-stellar",
"resources": {
"cc": {
"dbt": {
"requests": {
"cpu": "3.5",
"ephemeral-storage": "10Gi",
"memory": "15Gi"
"cpu": "1",
"ephemeral-storage": "1Gi",
"memory": "1Gi"
}
},
"default": {
"requests": {
"cpu": "3.5",
"cpu": "0.5",
"ephemeral-storage": "1Gi",
"memory": "5Gi"
"memory": "1Gi"
}
},
"state": {
"stellaretl": {
"requests": {
"cpu": "3.5",
"ephemeral-storage": "12Gi",
"memory": "20Gi"
}
},
"wocc": {
"requests": {
"cpu": "3.5",
"ephemeral-storage": "10Gi",
"memory": "15Gi"
"cpu": "0.5",
"ephemeral-storage": "1Gi",
"memory": "1Gi"
}
}
},
Expand Down
15 changes: 4 additions & 11 deletions dags/dbt_data_quality_alerts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
default_args=get_default_dag_args(),
start_date=datetime(2024, 6, 25, 0, 0),
description="This DAG runs dbt tests and Elementary alerts at a half-hourly cadence",
schedule="*/15,*/45 * * * *", # Runs every 15th minute and every 45th minute
schedule="*/15 * * * *", # Runs every 15 minutes
user_defined_filters={
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
},
Expand All @@ -28,14 +28,7 @@
# sla_miss_callback=alert_sla_miss,
) as dag:

# DBT tests to run
singular_tests = dbt_task(
dag,
command_type="test",
tag="singular_test",
)
singular_tests_elementary_alerts = elementary_task(dag, "dbt_data_quality")
start_tests = EmptyOperator(task_id="start_tests_task")
# Trigger elementary
elementary_alerts = elementary_task(dag, "dbt_data_quality", resource_cfg="dbt")

# DAG task graph
start_tests >> singular_tests >> singular_tests_elementary_alerts
elementary_alerts
39 changes: 39 additions & 0 deletions dags/dbt_singular_tests_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

with DAG(
"dbt_singular_tests",
default_args=get_default_dag_args(),
start_date=datetime(2024, 6, 25, 0, 0),
description="This DAG runs non-model dbt tests half-hourly cadence",
schedule="*/30 * * * *", # Runs every 30 minutes
user_defined_filters={
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
},
max_active_runs=1,
catchup=False,
tags=["dbt-data-quality"],
# sla_miss_callback=alert_sla_miss,
) as dag:

# DBT tests to run
singular_tests = dbt_task(
dag,
command_type="test",
tag="singular_test",
resource_cfg="dbt",
)

singular_tests
7 changes: 7 additions & 0 deletions dags/history_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
use_gcs=True,
use_captive_core=use_captive_core,
txmeta_datastore_path=txmeta_datastore_path,
resource_cfg="stellaretl",
)

trade_export_task = build_export_task(
Expand All @@ -124,6 +125,7 @@
use_gcs=True,
use_captive_core=use_captive_core,
txmeta_datastore_path=txmeta_datastore_path,
resource_cfg="stellaretl",
)

effects_export_task = build_export_task(
Expand All @@ -136,6 +138,7 @@
use_gcs=True,
use_captive_core=use_captive_core,
txmeta_datastore_path=txmeta_datastore_path,
resource_cfg="stellaretl",
)

tx_export_task = build_export_task(
Expand All @@ -148,6 +151,7 @@
use_gcs=True,
use_captive_core=use_captive_core,
txmeta_datastore_path=txmeta_datastore_path,
resource_cfg="stellaretl",
)

ledger_export_task = build_export_task(
Expand All @@ -160,6 +164,7 @@
use_gcs=True,
use_captive_core=use_captive_core,
txmeta_datastore_path=txmeta_datastore_path,
resource_cfg="stellaretl",
)

asset_export_task = build_export_task(
Expand All @@ -172,6 +177,7 @@
use_gcs=True,
use_captive_core=use_captive_core,
txmeta_datastore_path=txmeta_datastore_path,
resource_cfg="stellaretl",
)

contract_events_export_task = build_export_task(
Expand All @@ -184,6 +190,7 @@
use_gcs=True,
use_captive_core=use_captive_core,
txmeta_datastore_path=txmeta_datastore_path,
resource_cfg="stellaretl",
)

"""
Expand Down
1 change: 1 addition & 0 deletions dags/state_table_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
use_gcs=True,
use_captive_core=use_captive_core,
txmeta_datastore_path=txmeta_datastore_path,
resource_cfg="stellaretl",
)

"""
Expand Down

0 comments on commit ceec401

Please sign in to comment.