diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index f51733e0..bd88d1e4 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -240,11 +240,41 @@ "build_batch_stats": 180, "build_bq_insert_job": 180, "build_copy_table": 180, - "build_dbt_task": 300, + "build_dbt_task": 960, "build_delete_data_task": 180, - "build_export_task": 180, + "build_export_task": 420, "build_gcs_to_bq_task": 300, - "build_time_task": 120 + "build_time_task": 480 + }, + "task_sla": { + "default": 60, + "build_time_task": 480, + "build_export_task": 840, + "enriched_history_operations": 540, + "current_state": 540, + "elementary_dbt_enriched_base_tables": 840, + "ohlc": 720, + "liquidity_pool_trade_volume": 1140, + "mgi": 660, + "liquidity_providers": 720, + "liquidity_pools_value": 840, + "liquidity_pools_value_history": 600, + "trade_agg": 720, + "fee_stats": 840, + "asset_stats": 720, + "network_stats": 720, + "partnership_assets": 660, + "history_assets": 720, + "soroban": 720, + "snapshot_state": 600, + "elementary_dbt_stellar_marts": 1620, + "create_sandbox": 2400, + "update_sandbox": 60, + "cleanup_metadata": 60, + "build_delete_data_task": 1020, + "build_batch_stats": 840, + "build_bq_insert_job": 1080, + "build_gcs_to_bq_task": 960 }, "dbt_tables": { "signers_current": "account_signers_current", diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 9608b756..7bd39a04 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -235,11 +235,41 @@ "build_batch_stats": 180, "build_bq_insert_job": 180, "build_copy_table": 180, - "build_dbt_task": 6000, + "build_dbt_task": 1800, "build_delete_data_task": 180, - "build_export_task": 180, + "build_export_task": 300, "build_gcs_to_bq_task": 300, - "build_time_task": 120 + "build_time_task": 360 + }, + "task_sla": { + "default": 60, + "build_time_task": 300, + "build_export_task": 600, + "enriched_history_operations": 960, + "current_state": 600, + "elementary_dbt_enriched_base_tables": 1080, + "ohlc": 960, + "liquidity_pool_trade_volume": 1200, + "mgi": 1020, + "liquidity_providers": 720, + "liquidity_pools_value": 360, + "liquidity_pools_value_history": 360, + "trade_agg": 1020, + "fee_stats": 360, + "asset_stats": 420, + "network_stats": 360, + "partnership_assets": 1380, + "history_assets": 360, + "soroban": 420, + "snapshot_state": 840, + "elementary_dbt_stellar_marts": 1560, + "create_sandbox": 1020, + "update_sandbox": 5460, + "cleanup_metadata": 60, + "build_delete_data_task": 780, + "build_batch_stats": 600, + "build_bq_insert_job": 840, + "build_gcs_to_bq_task": 660 }, "dbt_tables": { "signers_current": "account_signers_current", diff --git a/dags/cleanup_metadata_dag.py b/dags/cleanup_metadata_dag.py index 74859975..5a058a94 100644 --- a/dags/cleanup_metadata_dag.py +++ b/dags/cleanup_metadata_dag.py @@ -27,6 +27,7 @@ from sqlalchemy.orm import load_only from stellar_etl_airflow.default import ( alert_after_max_retries, + alert_sla_miss, get_default_dag_args, init_sentry, ) @@ -207,6 +208,7 @@ default_args=get_default_dag_args(), schedule_interval="@daily", start_date=START_DATE, + sla_miss_callback=alert_sla_miss, ) if hasattr(dag, "doc_md"): dag.doc_md = __doc__ @@ -249,6 +251,9 @@ def print_configuration_function(**context): python_callable=print_configuration_function, provide_context=True, dag=dag, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"] + ), ) @@ -444,6 +449,9 @@ def analyze_db(): provide_context=True, on_failure_callback=alert_after_max_retries, dag=dag, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"] + ), ) cleanup_session_op = PythonOperator( @@ -452,6 +460,9 @@ def analyze_db(): provide_context=True, on_failure_callback=alert_after_max_retries, dag=dag, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"] + ), ) cleanup_session_op.set_downstream(analyze_op) @@ -464,6 +475,9 @@ def analyze_db(): provide_context=True, on_failure_callback=alert_after_max_retries, dag=dag, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"] + ), ) print_configuration.set_downstream(cleanup_op) diff --git a/dags/daily_euro_ohlc_dag.py b/dags/daily_euro_ohlc_dag.py index 20213934..38db7a30 100644 --- a/dags/daily_euro_ohlc_dag.py +++ b/dags/daily_euro_ohlc_dag.py @@ -14,7 +14,7 @@ ) from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema from stellar_etl_airflow.build_coingecko_api_to_gcs_task import response_to_gcs -from stellar_etl_airflow.default import alert_after_max_retries +from stellar_etl_airflow.default import alert_after_max_retries, alert_sla_miss with DAG( dag_id="daily_euro_ohlc_dag", @@ -26,6 +26,7 @@ }, user_defined_filters={"fromjson": lambda s: loads(s)}, catchup=False, + sla_miss_callback=alert_sla_miss, ) as dag: currency_ohlc = Variable.get("currency_ohlc", deserialize_json=True) project_name = Variable.get("bq_project") diff --git a/dags/dataset_reset_dag.py b/dags/dataset_reset_dag.py index 8e1e2fcc..5632b339 100644 --- a/dags/dataset_reset_dag.py +++ b/dags/dataset_reset_dag.py @@ -12,7 +12,11 @@ path_to_execute, ) from stellar_etl_airflow.build_delete_data_for_reset_task import build_delete_data_task -from stellar_etl_airflow.default import get_default_dag_args, init_sentry +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) init_sentry() @@ -27,6 +31,7 @@ "alias": "testnet-reset", }, user_defined_filters={"fromjson": lambda s: loads(s)}, + sla_miss_callback=alert_sla_miss, ) internal_project = "test-hubble-319619" diff --git a/dags/dbt_enriched_base_tables_dag.py b/dags/dbt_enriched_base_tables_dag.py index 5d5f9f90..4d1ce477 100644 --- a/dags/dbt_enriched_base_tables_dag.py +++ b/dags/dbt_enriched_base_tables_dag.py @@ -5,7 +5,11 @@ from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import dbt_task from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task -from stellar_etl_airflow.default import get_default_dag_args, init_sentry +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) init_sentry() @@ -21,6 +25,7 @@ max_active_runs=1, catchup=False, tags=["dbt-enriched-base-tables"], + sla_miss_callback=alert_sla_miss, ) # Wait on ingestion DAGs diff --git a/dags/dbt_stellar_marts_dag.py b/dags/dbt_stellar_marts_dag.py index 3738ebee..d0b73843 100644 --- a/dags/dbt_stellar_marts_dag.py +++ b/dags/dbt_stellar_marts_dag.py @@ -5,7 +5,11 @@ from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import dbt_task from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task -from stellar_etl_airflow.default import get_default_dag_args, init_sentry +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) init_sentry() @@ -21,6 +25,7 @@ max_active_runs=3, catchup=True, tags=["dbt-stellar-marts"], + sla_miss_callback=alert_sla_miss, ) # Wait on ingestion DAGs diff --git a/dags/history_tables_dag.py b/dags/history_tables_dag.py index 9f4e603d..3dcda1f8 100644 --- a/dags/history_tables_dag.py +++ b/dags/history_tables_dag.py @@ -17,7 +17,11 @@ from stellar_etl_airflow.build_export_task import build_export_task from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task from stellar_etl_airflow.build_time_task import build_time_task -from stellar_etl_airflow.default import get_default_dag_args, init_sentry +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) init_sentry() @@ -42,6 +46,7 @@ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, + sla_miss_callback=alert_sla_miss, ) diff --git a/dags/partner_pipeline_dag.py b/dags/partner_pipeline_dag.py index b29fdfab..b81d8178 100644 --- a/dags/partner_pipeline_dag.py +++ b/dags/partner_pipeline_dag.py @@ -13,6 +13,7 @@ from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema from stellar_etl_airflow.default import ( alert_after_max_retries, + alert_sla_miss, get_default_dag_args, init_sentry, ) @@ -30,6 +31,7 @@ }, render_template_as_native_obj=True, catchup=False, + sla_miss_callback=alert_sla_miss, ) as dag: PROJECT = "{{ var.value.bq_project }}" DATASET = "{{ var.value.bq_dataset }}" diff --git a/dags/sandbox_create_dag.py b/dags/sandbox_create_dag.py index e1b7b0b0..de047517 100644 --- a/dags/sandbox_create_dag.py +++ b/dags/sandbox_create_dag.py @@ -1,6 +1,7 @@ """ This DAG creates the sandbox dataset with transactions tables, state tables with history and views. """ +from datetime import timedelta from json import loads from airflow import DAG @@ -13,6 +14,7 @@ ) from stellar_etl_airflow.default import ( alert_after_max_retries, + alert_sla_miss, get_default_dag_args, init_sentry, ) @@ -29,6 +31,7 @@ "fromjson": lambda s: loads(s), }, catchup=False, + sla_miss_callback=alert_sla_miss, ) as dag: PROJECT = Variable.get("public_project") DATASET = Variable.get("public_dataset") @@ -61,6 +64,11 @@ } }, on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + "create_sandbox" + ] + ), ) start_tables_task >> tables_create_task @@ -85,5 +93,10 @@ } }, on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + "create_sandbox" + ] + ), ) start_views_task >> dbt_tables_create_task diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 2e7e662e..afd04525 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -16,6 +16,7 @@ from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.default import ( alert_after_max_retries, + alert_sla_miss, get_default_dag_args, init_sentry, ) @@ -36,6 +37,7 @@ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, + sla_miss_callback=alert_sla_miss, ) as dag: TABLES_ID = Variable.get("table_ids", deserialize_json=True) PROJECT = Variable.get("public_project") diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index e8a396bd..b0aa7350 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -15,7 +15,11 @@ from stellar_etl_airflow.build_export_task import build_export_task from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task from stellar_etl_airflow.build_time_task import build_time_task -from stellar_etl_airflow.default import get_default_dag_args, init_sentry +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) init_sentry() @@ -40,6 +44,7 @@ "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, catchup=True, + sla_miss_callback=alert_sla_miss, ) diff --git a/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py b/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py index 59cdc751..5eddeb09 100644 --- a/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py +++ b/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py @@ -3,6 +3,7 @@ a file in Google Cloud storage into a BigQuery table. """ import logging +from datetime import timedelta from json import loads from os.path import basename, join, splitext @@ -281,4 +282,7 @@ def build_apply_gcs_changes_to_bq_task(dag, data_type): dag=dag, provide_context=True, on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)["default"] + ), ) diff --git a/dags/stellar_etl_airflow/build_batch_stats.py b/dags/stellar_etl_airflow/build_batch_stats.py index 4ce949d8..92d97680 100644 --- a/dags/stellar_etl_airflow/build_batch_stats.py +++ b/dags/stellar_etl_airflow/build_batch_stats.py @@ -35,4 +35,9 @@ def build_batch_stats(dag, table): "useLegacySql": False, } }, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_batch_stats.__name__ + ] + ), ) diff --git a/dags/stellar_etl_airflow/build_bq_insert_job_task.py b/dags/stellar_etl_airflow/build_bq_insert_job_task.py index e9e3b632..60548870 100644 --- a/dags/stellar_etl_airflow/build_bq_insert_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_insert_job_task.py @@ -85,4 +85,9 @@ def build_bq_insert_job( ), on_failure_callback=alert_after_max_retries, configuration=configuration, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_bq_insert_job.__name__ + ] + ), ) diff --git a/dags/stellar_etl_airflow/build_copy_table_task.py b/dags/stellar_etl_airflow/build_copy_table_task.py index 489d9eb8..25a98b32 100644 --- a/dags/stellar_etl_airflow/build_copy_table_task.py +++ b/dags/stellar_etl_airflow/build_copy_table_task.py @@ -58,5 +58,8 @@ def build_copy_table( ] ), on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)["default"] + ), configuration=configuration, ) diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index 5dde1d7e..6c0d4a0d 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -134,6 +134,9 @@ def dbt_task( on_failure_callback=alert_after_max_retries, image_pull_policy="IfNotPresent", image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[task_name] + ), ) @@ -212,4 +215,7 @@ def build_dbt_task( on_failure_callback=alert_after_max_retries, image_pull_policy="IfNotPresent", image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[model_name] + ), ) diff --git a/dags/stellar_etl_airflow/build_delete_data_for_reset_task.py b/dags/stellar_etl_airflow/build_delete_data_for_reset_task.py index 80b26188..774110fe 100644 --- a/dags/stellar_etl_airflow/build_delete_data_for_reset_task.py +++ b/dags/stellar_etl_airflow/build_delete_data_for_reset_task.py @@ -18,6 +18,9 @@ def build_delete_data_task(dag, project, dataset, table, type_of_dataset): ] ), on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)["default"] + ), configuration={ "query": { "query": DELETE_ROWS_QUERY, diff --git a/dags/stellar_etl_airflow/build_delete_data_task.py b/dags/stellar_etl_airflow/build_delete_data_task.py index b11d19f0..28c80c99 100644 --- a/dags/stellar_etl_airflow/build_delete_data_task.py +++ b/dags/stellar_etl_airflow/build_delete_data_task.py @@ -27,6 +27,11 @@ def build_delete_data_task(dag, project, dataset, table, dataset_type="bq"): ] ), on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_delete_data_task.__name__ + ] + ), configuration={ "query": { "query": DELETE_ROWS_QUERY, diff --git a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py index c9e75f21..86b0727c 100644 --- a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py +++ b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py @@ -1,5 +1,6 @@ import base64 import logging +from datetime import timedelta from airflow.configuration import conf from airflow.models import Variable @@ -87,4 +88,9 @@ def elementary_task( on_failure_callback=alert_after_max_retries, image_pull_policy="IfNotPresent", image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + f"elementary_{task_name}" + ] + ), ) diff --git a/dags/stellar_etl_airflow/build_export_task.py b/dags/stellar_etl_airflow/build_export_task.py index 43f4b913..79f82ace 100644 --- a/dags/stellar_etl_airflow/build_export_task.py +++ b/dags/stellar_etl_airflow/build_export_task.py @@ -238,4 +238,9 @@ def build_export_task( config_file=config_file_location, on_failure_callback=alert_after_max_retries, image_pull_policy=Variable.get("image_pull_policy"), + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_export_task.__name__ + ] + ), ) diff --git a/dags/stellar_etl_airflow/build_gcs_to_bq_task.py b/dags/stellar_etl_airflow/build_gcs_to_bq_task.py index e3726e43..0ebe3d74 100644 --- a/dags/stellar_etl_airflow/build_gcs_to_bq_task.py +++ b/dags/stellar_etl_airflow/build_gcs_to_bq_task.py @@ -135,6 +135,11 @@ def build_gcs_to_bq_task( + '\')["failed_transforms"] }}', max_failed_transforms=0, on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_gcs_to_bq_task.__name__ + ] + ), dag=dag, ) @@ -165,5 +170,10 @@ def build_gcs_to_bq_task( time_partitioning=time_partition, cluster_fields=cluster_fields, on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_gcs_to_bq_task.__name__ + ] + ), dag=dag, ) diff --git a/dags/stellar_etl_airflow/build_time_task.py b/dags/stellar_etl_airflow/build_time_task.py index 6a456bfc..6f5dd471 100644 --- a/dags/stellar_etl_airflow/build_time_task.py +++ b/dags/stellar_etl_airflow/build_time_task.py @@ -86,4 +86,9 @@ def build_time_task( container_resources=resources_requests, on_failure_callback=alert_after_max_retries, image_pull_policy=Variable.get("image_pull_policy"), + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_time_task.__name__ + ] + ), ) diff --git a/dags/stellar_etl_airflow/default.py b/dags/stellar_etl_airflow/default.py index 8f6a4ba9..c2d62a6f 100644 --- a/dags/stellar_etl_airflow/default.py +++ b/dags/stellar_etl_airflow/default.py @@ -61,3 +61,24 @@ def alert_after_max_retries(context): f"The task {ti.task_id} belonging to DAG {ti.dag_id} failed after max retries.", "fatal", ) + + +def alert_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis): + """ + When a task takes longer then expected to run while having a defined SLA, + it misses it. + This alerts the IT team about the unexpected behavior in order + to enable faster response in case of underlying infrastructure issues. + """ + dag_id = slas[0].dag_id + task_id = slas[0].task_id + execution_date = slas[0].execution_date.isoformat() + + with push_scope() as scope: + scope.set_extra("dag_id", dag_id) + scope.set_extra("task_id", task_id) + scope.set_extra("execution_date", execution_date) + capture_message( + f"SLA Miss! The task {task_id} belonging to DAG {dag_id} missed its SLA for the run date {execution_date}.", + "warn", + )