Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add slas #359

Merged
merged 30 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8257b56
Created alert_sla_miss callback funcion
edualvess May 8, 2024
99f6204
Defined test value for export task SLA
edualvess May 8, 2024
e1acb56
Defined sla_miss_callback for DAG level parameter
edualvess May 8, 2024
9a0852f
Merge branch 'master' into feature/add_slas
edualvess May 8, 2024
6f85078
Increased time task timeout value
edualvess May 9, 2024
6d01a09
Added dev Airflow variable for tasks SLA values
edualvess May 9, 2024
9911c3b
Added slack notification logic for the SLA miss callback
edualvess May 9, 2024
8982c76
Added SLA parameter to the relavant task building functions
edualvess May 9, 2024
e0e0ea7
Removed SLA miss callback function and related logic
edualvess May 9, 2024
88c9da2
Added refactored logic for sla miss callback function
edualvess May 9, 2024
accb888
Added sla miss callback to desired dags
edualvess May 9, 2024
bc45c40
Adjusted callback logic for sentry integration
edualvess May 9, 2024
ad5a1b4
Added task SLA values for prod Airflow variables
edualvess May 15, 2024
f15c2d6
Changed alert message to better distinguish SLA miss from task fail a…
edualvess May 15, 2024
c5fa61e
Merge branch 'master' into feature/add_slas
chowbao May 15, 2024
e8c7ca2
Merge branch 'master' into feature/add_slas
chowbao May 15, 2024
7660175
Merge branch 'master' into feature/add_slas
edualvess May 20, 2024
9d5b32a
Update dev tasks sla and timeout values
edualvess May 22, 2024
fb1d4e8
Update prod tasks sla and timeout values
edualvess May 22, 2024
efd34e2
Added default sla value to dag default args
edualvess May 22, 2024
ffc003b
Added missing SLA param to build task functions
edualvess May 22, 2024
1dfdd23
Standardizing logic for fetching SLA param value
edualvess May 22, 2024
7beb0aa
Added sla miss callback reference
edualvess May 22, 2024
7f0f421
Remove default SLA from default args
edualvess May 24, 2024
6ef6453
Set missing sla params to tasks
edualvess Jun 4, 2024
48d2e69
Updated values for sla variables
edualvess Jun 5, 2024
b91062d
Merge branch 'master' into feature/add_slas
edualvess Jun 5, 2024
f28457d
Updated build_dbt_task timeout value
edualvess Jun 5, 2024
ee2b3ff
Merge branch 'master' into feature/add_slas
edualvess Jun 12, 2024
e85dba9
Changed variable names to match new naming reference
edualvess Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,41 @@
"build_batch_stats": 180,
"build_bq_insert_job": 180,
"build_copy_table": 180,
"build_dbt_task": 300,
"build_dbt_task": 960,
"build_delete_data_task": 180,
"build_export_task": 180,
"build_export_task": 420,
"build_gcs_to_bq_task": 300,
"build_time_task": 120
"build_time_task": 480
},
"task_sla": {
"default": 60,
"build_time_task": 480,
"build_export_task": 840,
"enriched_history_operations": 540,
"current_state": 540,
"elementary_dbt_enriched_base_tables": 840,
"ohlc": 720,
"liquidity_pool_trade_volume": 1140,
"mgi": 660,
"liquidity_providers": 720,
"liquidity_pools_value": 840,
"liquidity_pools_value_history": 600,
"trade_agg": 720,
"fee_stats": 840,
"asset_stats": 720,
"network_stats": 720,
"partnership_assets": 660,
"history_assets": 720,
"soroban": 720,
"snapshot_state": 600,
"elementary_dbt_stellar_marts": 1620,
"create_sandbox": 2400,
"update_sandbox": 60,
"cleanup_metadata": 60,
"build_delete_data_task": 1020,
"build_batch_stats": 840,
"build_bq_insert_job": 1080,
"build_gcs_to_bq_task": 960
},
"dbt_tables": {
"signers_current": "account_signers_current",
Expand Down
36 changes: 33 additions & 3 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,41 @@
"build_batch_stats": 180,
"build_bq_insert_job": 180,
"build_copy_table": 180,
"build_dbt_task": 6000,
"build_dbt_task": 1800,
"build_delete_data_task": 180,
"build_export_task": 180,
"build_export_task": 300,
"build_gcs_to_bq_task": 300,
"build_time_task": 120
"build_time_task": 360
},
"task_sla": {
"default": 60,
"build_time_task": 300,
"build_export_task": 600,
"enriched_history_operations": 960,
"current_state": 600,
"elementary_dbt_enriched_base_tables": 1080,
"ohlc": 960,
"liquidity_pool_trade_volume": 1200,
"mgi": 1020,
"liquidity_providers": 720,
"liquidity_pools_value": 360,
"liquidity_pools_value_history": 360,
"trade_agg": 1020,
"fee_stats": 360,
"asset_stats": 420,
"network_stats": 360,
"partnership_assets": 1380,
"history_assets": 360,
"soroban": 420,
"snapshot_state": 840,
"elementary_dbt_stellar_marts": 1560,
"create_sandbox": 1020,
"update_sandbox": 5460,
"cleanup_metadata": 60,
"build_delete_data_task": 780,
"build_batch_stats": 600,
"build_bq_insert_job": 840,
"build_gcs_to_bq_task": 660
},
"dbt_tables": {
"signers_current": "account_signers_current",
Expand Down
14 changes: 14 additions & 0 deletions dags/cleanup_metadata_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from sqlalchemy.orm import load_only
from stellar_etl_airflow.default import (
alert_after_max_retries,
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
Expand Down Expand Up @@ -207,6 +208,7 @@
default_args=get_default_dag_args(),
schedule_interval="@daily",
start_date=START_DATE,
sla_miss_callback=alert_sla_miss,
)
if hasattr(dag, "doc_md"):
dag.doc_md = __doc__
Expand Down Expand Up @@ -249,6 +251,9 @@ def print_configuration_function(**context):
python_callable=print_configuration_function,
provide_context=True,
dag=dag,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"]
),
)


Expand Down Expand Up @@ -444,6 +449,9 @@ def analyze_db():
provide_context=True,
on_failure_callback=alert_after_max_retries,
dag=dag,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"]
),
)

cleanup_session_op = PythonOperator(
Expand All @@ -452,6 +460,9 @@ def analyze_db():
provide_context=True,
on_failure_callback=alert_after_max_retries,
dag=dag,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"]
),
)

cleanup_session_op.set_downstream(analyze_op)
Expand All @@ -464,6 +475,9 @@ def analyze_db():
provide_context=True,
on_failure_callback=alert_after_max_retries,
dag=dag,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["cleanup_metadata"]
),
)

print_configuration.set_downstream(cleanup_op)
Expand Down
3 changes: 2 additions & 1 deletion dags/daily_euro_ohlc_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema
from stellar_etl_airflow.build_coingecko_api_to_gcs_task import response_to_gcs
from stellar_etl_airflow.default import alert_after_max_retries
from stellar_etl_airflow.default import alert_after_max_retries, alert_sla_miss

with DAG(
dag_id="daily_euro_ohlc_dag",
Expand All @@ -26,6 +26,7 @@
},
user_defined_filters={"fromjson": lambda s: loads(s)},
catchup=False,
sla_miss_callback=alert_sla_miss,
) as dag:
currency_ohlc = Variable.get("currency_ohlc", deserialize_json=True)
project_name = Variable.get("bq_project")
Expand Down
7 changes: 6 additions & 1 deletion dags/dataset_reset_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
path_to_execute,
)
from stellar_etl_airflow.build_delete_data_for_reset_task import build_delete_data_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -27,6 +31,7 @@
"alias": "testnet-reset",
},
user_defined_filters={"fromjson": lambda s: loads(s)},
sla_miss_callback=alert_sla_miss,
)

internal_project = "test-hubble-319619"
Expand Down
7 changes: 6 additions & 1 deletion dags/dbt_enriched_base_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -21,6 +25,7 @@
max_active_runs=1,
catchup=False,
tags=["dbt-enriched-base-tables"],
sla_miss_callback=alert_sla_miss,
)

# Wait on ingestion DAGs
Expand Down
7 changes: 6 additions & 1 deletion dags/dbt_stellar_marts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -21,6 +25,7 @@
max_active_runs=3,
catchup=True,
tags=["dbt-stellar-marts"],
sla_miss_callback=alert_sla_miss,
)

# Wait on ingestion DAGs
Expand Down
7 changes: 6 additions & 1 deletion dags/history_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from stellar_etl_airflow.build_export_task import build_export_task
from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task
from stellar_etl_airflow.build_time_task import build_time_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -42,6 +46,7 @@
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
sla_miss_callback=alert_sla_miss,
)


Expand Down
2 changes: 2 additions & 0 deletions dags/partner_pipeline_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema
from stellar_etl_airflow.default import (
alert_after_max_retries,
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
Expand All @@ -30,6 +31,7 @@
},
render_template_as_native_obj=True,
catchup=False,
sla_miss_callback=alert_sla_miss,
) as dag:
PROJECT = "{{ var.value.bq_project }}"
DATASET = "{{ var.value.bq_dataset }}"
Expand Down
13 changes: 13 additions & 0 deletions dags/sandbox_create_dag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
This DAG creates the sandbox dataset with transactions tables, state tables with history and views.
"""
from datetime import timedelta
from json import loads

from airflow import DAG
Expand All @@ -13,6 +14,7 @@
)
from stellar_etl_airflow.default import (
alert_after_max_retries,
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
Expand All @@ -29,6 +31,7 @@
"fromjson": lambda s: loads(s),
},
catchup=False,
sla_miss_callback=alert_sla_miss,
) as dag:
PROJECT = Variable.get("public_project")
DATASET = Variable.get("public_dataset")
Expand Down Expand Up @@ -61,6 +64,11 @@
}
},
on_failure_callback=alert_after_max_retries,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
"create_sandbox"
]
),
)

start_tables_task >> tables_create_task
Expand All @@ -85,5 +93,10 @@
}
},
on_failure_callback=alert_after_max_retries,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
"create_sandbox"
]
),
)
start_views_task >> dbt_tables_create_task
2 changes: 2 additions & 0 deletions dags/sandbox_update_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.default import (
alert_after_max_retries,
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
Expand All @@ -36,6 +37,7 @@
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
sla_miss_callback=alert_sla_miss,
) as dag:
TABLES_ID = Variable.get("table_ids", deserialize_json=True)
PROJECT = Variable.get("public_project")
Expand Down
7 changes: 6 additions & 1 deletion dags/state_table_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from stellar_etl_airflow.build_export_task import build_export_task
from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task
from stellar_etl_airflow.build_time_task import build_time_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -40,6 +44,7 @@
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
catchup=True,
sla_miss_callback=alert_sla_miss,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
a file in Google Cloud storage into a BigQuery table.
"""
import logging
from datetime import timedelta
from json import loads
from os.path import basename, join, splitext

Expand Down Expand Up @@ -281,4 +282,7 @@ def build_apply_gcs_changes_to_bq_task(dag, data_type):
dag=dag,
provide_context=True,
on_failure_callback=alert_after_max_retries,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["default"]
),
)
5 changes: 5 additions & 0 deletions dags/stellar_etl_airflow/build_batch_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ def build_batch_stats(dag, table):
"useLegacySql": False,
}
},
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
build_batch_stats.__name__
]
),
)
5 changes: 5 additions & 0 deletions dags/stellar_etl_airflow/build_bq_insert_job_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ def build_bq_insert_job(
),
on_failure_callback=alert_after_max_retries,
configuration=configuration,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
build_bq_insert_job.__name__
]
),
)
3 changes: 3 additions & 0 deletions dags/stellar_etl_airflow/build_copy_table_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,8 @@ def build_copy_table(
]
),
on_failure_callback=alert_after_max_retries,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["default"]
),
configuration=configuration,
)
Loading
Loading