Skip to content

Commit

Permalink
Feature/add slas (#359)
Browse files Browse the repository at this point in the history
* Created alert_sla_miss callback funcion

* Defined test value for export task SLA

* Defined sla_miss_callback for DAG level parameter

* Increased time task timeout value

* Added dev Airflow variable for tasks SLA values

* Added slack notification logic for the SLA miss callback

* Added SLA parameter to the relavant task building functions

* Removed SLA miss callback function and related logic

* Added refactored logic for sla miss callback function

* Added sla miss callback to desired dags

* Adjusted callback logic for sentry integration

* Added task SLA values for prod Airflow variables

* Changed alert message to better distinguish SLA miss from task fail alerts

* Update dev tasks sla and timeout values

* Update prod tasks sla and timeout values

* Added default sla value to dag default args

* Added missing SLA param to build task functions

* Standardizing logic for fetching SLA param value

* Added  sla miss callback reference

* Remove default SLA from default args

* Set missing sla params to tasks

* Updated values for sla variables

* Updated build_dbt_task timeout value

* Changed variable names to match new naming reference

---------

Co-authored-by: chowbao <[email protected]>
  • Loading branch information
edualvess and chowbao authored Jun 12, 2024
1 parent 599ba05 commit b7156c3
Show file tree
Hide file tree
Showing 24 changed files with 207 additions and 12 deletions.
36 changes: 33 additions & 3 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,41 @@
"build_batch_stats": 180,
"build_bq_insert_job": 180,
"build_copy_table": 180,
"build_dbt_task": 300,
"build_dbt_task": 960,
"build_delete_data_task": 180,
"build_export_task": 180,
"build_export_task": 420,
"build_gcs_to_bq_task": 300,
"build_time_task": 120
"build_time_task": 480
},
"task_sla": {
"default": 60,
"build_time_task": 480,
"build_export_task": 840,
"enriched_history_operations": 540,
"current_state": 540,
"elementary_dbt_enriched_base_tables": 840,
"ohlc": 720,
"liquidity_pool_trade_volume": 1140,
"mgi": 660,
"liquidity_providers": 720,
"liquidity_pools_value": 840,
"liquidity_pools_value_history": 600,
"trade_agg": 720,
"fee_stats": 840,
"asset_stats": 720,
"network_stats": 720,
"partnership_assets": 660,
"history_assets": 720,
"soroban": 720,
"snapshot_state": 600,
"elementary_dbt_stellar_marts": 1620,
"create_sandbox": 2400,
"update_sandbox": 60,
"cleanup_metadata": 60,
"build_delete_data_task": 1020,
"build_batch_stats": 840,
"build_bq_insert_job": 1080,
"build_gcs_to_bq_task": 960
},
"dbt_tables": {
"signers_current": "account_signers_current",
Expand Down
36 changes: 33 additions & 3 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,41 @@
"build_batch_stats": 180,
"build_bq_insert_job": 180,
"build_copy_table": 180,
"build_dbt_task": 6000,
"build_dbt_task": 1800,
"build_delete_data_task": 180,
"build_export_task": 180,
"build_export_task": 300,
"build_gcs_to_bq_task": 300,
"build_time_task": 120
"build_time_task": 360
},
"task_sla": {
"default": 60,
"build_time_task": 300,
"build_export_task": 600,
"enriched_history_operations": 960,
"current_state": 600,
"elementary_dbt_enriched_base_tables": 1080,
"ohlc": 960,
"liquidity_pool_trade_volume": 1200,
"mgi": 1020,
"liquidity_providers": 720,
"liquidity_pools_value": 360,
"liquidity_pools_value_history": 360,
"trade_agg": 1020,
"fee_stats": 360,
"asset_stats": 420,
"network_stats": 360,
"partnership_assets": 1380,
"history_assets": 360,
"soroban": 420,
"snapshot_state": 840,
"elementary_dbt_stellar_marts": 1560,
"create_sandbox": 1020,
"update_sandbox": 5460,
"cleanup_metadata": 60,
"build_delete_data_task": 780,
"build_batch_stats": 600,
"build_bq_insert_job": 840,
"build_gcs_to_bq_task": 660
},
"dbt_tables": {
"signers_current": "account_signers_current",
Expand Down
14 changes: 14 additions & 0 deletions dags/cleanup_metadata_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from sqlalchemy.orm import load_only
from stellar_etl_airflow.default import (
alert_after_max_retries,
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
Expand Down Expand Up @@ -207,6 +208,7 @@
default_args=get_default_dag_args(),
schedule_interval="@daily",
start_date=START_DATE,
sla_miss_callback=alert_sla_miss,
)
if hasattr(dag, "doc_md"):
dag.doc_md = __doc__
Expand Down Expand Up @@ -249,6 +251,9 @@ def print_configuration_function(**context):
python_callable=print_configuration_function,
provide_context=True,
dag=dag,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"]
),
)


Expand Down Expand Up @@ -444,6 +449,9 @@ def analyze_db():
provide_context=True,
on_failure_callback=alert_after_max_retries,
dag=dag,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"]
),
)

cleanup_session_op = PythonOperator(
Expand All @@ -452,6 +460,9 @@ def analyze_db():
provide_context=True,
on_failure_callback=alert_after_max_retries,
dag=dag,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"]
),
)

cleanup_session_op.set_downstream(analyze_op)
Expand All @@ -464,6 +475,9 @@ def analyze_db():
provide_context=True,
on_failure_callback=alert_after_max_retries,
dag=dag,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"]
),
)

print_configuration.set_downstream(cleanup_op)
Expand Down
3 changes: 2 additions & 1 deletion dags/daily_euro_ohlc_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema
from stellar_etl_airflow.build_coingecko_api_to_gcs_task import response_to_gcs
from stellar_etl_airflow.default import alert_after_max_retries
from stellar_etl_airflow.default import alert_after_max_retries, alert_sla_miss

with DAG(
dag_id="daily_euro_ohlc_dag",
Expand All @@ -26,6 +26,7 @@
},
user_defined_filters={"fromjson": lambda s: loads(s)},
catchup=False,
sla_miss_callback=alert_sla_miss,
) as dag:
currency_ohlc = Variable.get("currency_ohlc", deserialize_json=True)
project_name = Variable.get("bq_project")
Expand Down
7 changes: 6 additions & 1 deletion dags/dataset_reset_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
path_to_execute,
)
from stellar_etl_airflow.build_delete_data_for_reset_task import build_delete_data_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -27,6 +31,7 @@
"alias": "testnet-reset",
},
user_defined_filters={"fromjson": lambda s: loads(s)},
sla_miss_callback=alert_sla_miss,
)

internal_project = "test-hubble-319619"
Expand Down
7 changes: 6 additions & 1 deletion dags/dbt_enriched_base_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -21,6 +25,7 @@
max_active_runs=1,
catchup=False,
tags=["dbt-enriched-base-tables"],
sla_miss_callback=alert_sla_miss,
)

# Wait on ingestion DAGs
Expand Down
7 changes: 6 additions & 1 deletion dags/dbt_stellar_marts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -21,6 +25,7 @@
max_active_runs=3,
catchup=True,
tags=["dbt-stellar-marts"],
sla_miss_callback=alert_sla_miss,
)

# Wait on ingestion DAGs
Expand Down
7 changes: 6 additions & 1 deletion dags/history_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from stellar_etl_airflow.build_export_task import build_export_task
from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task
from stellar_etl_airflow.build_time_task import build_time_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -42,6 +46,7 @@
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
sla_miss_callback=alert_sla_miss,
)


Expand Down
2 changes: 2 additions & 0 deletions dags/partner_pipeline_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema
from stellar_etl_airflow.default import (
alert_after_max_retries,
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
Expand All @@ -30,6 +31,7 @@
},
render_template_as_native_obj=True,
catchup=False,
sla_miss_callback=alert_sla_miss,
) as dag:
PROJECT = "{{ var.value.bq_project }}"
DATASET = "{{ var.value.bq_dataset }}"
Expand Down
13 changes: 13 additions & 0 deletions dags/sandbox_create_dag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
This DAG creates the sandbox dataset with transactions tables, state tables with history and views.
"""
from datetime import timedelta
from json import loads

from airflow import DAG
Expand All @@ -13,6 +14,7 @@
)
from stellar_etl_airflow.default import (
alert_after_max_retries,
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
Expand All @@ -29,6 +31,7 @@
"fromjson": lambda s: loads(s),
},
catchup=False,
sla_miss_callback=alert_sla_miss,
) as dag:
PROJECT = Variable.get("public_project")
DATASET = Variable.get("public_dataset")
Expand Down Expand Up @@ -61,6 +64,11 @@
}
},
on_failure_callback=alert_after_max_retries,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
"create_sandbox"
]
),
)

start_tables_task >> tables_create_task
Expand All @@ -85,5 +93,10 @@
}
},
on_failure_callback=alert_after_max_retries,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
"create_sandbox"
]
),
)
start_views_task >> dbt_tables_create_task
2 changes: 2 additions & 0 deletions dags/sandbox_update_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.default import (
alert_after_max_retries,
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
Expand All @@ -36,6 +37,7 @@
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
sla_miss_callback=alert_sla_miss,
) as dag:
TABLES_ID = Variable.get("table_ids", deserialize_json=True)
PROJECT = Variable.get("public_project")
Expand Down
7 changes: 6 additions & 1 deletion dags/state_table_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from stellar_etl_airflow.build_export_task import build_export_task
from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task
from stellar_etl_airflow.build_time_task import build_time_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -40,6 +44,7 @@
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
catchup=True,
sla_miss_callback=alert_sla_miss,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
a file in Google Cloud storage into a BigQuery table.
"""
import logging
from datetime import timedelta
from json import loads
from os.path import basename, join, splitext

Expand Down Expand Up @@ -281,4 +282,7 @@ def build_apply_gcs_changes_to_bq_task(dag, data_type):
dag=dag,
provide_context=True,
on_failure_callback=alert_after_max_retries,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["default"]
),
)
5 changes: 5 additions & 0 deletions dags/stellar_etl_airflow/build_batch_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ def build_batch_stats(dag, table):
"useLegacySql": False,
}
},
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
build_batch_stats.__name__
]
),
)
5 changes: 5 additions & 0 deletions dags/stellar_etl_airflow/build_bq_insert_job_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ def build_bq_insert_job(
),
on_failure_callback=alert_after_max_retries,
configuration=configuration,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
build_bq_insert_job.__name__
]
),
)
3 changes: 3 additions & 0 deletions dags/stellar_etl_airflow/build_copy_table_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,8 @@ def build_copy_table(
]
),
on_failure_callback=alert_after_max_retries,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["default"]
),
configuration=configuration,
)
Loading

0 comments on commit b7156c3

Please sign in to comment.