Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add slas #359

Merged
merged 30 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8257b56
Created alert_sla_miss callback funcion
edualvess May 8, 2024
99f6204
Defined test value for export task SLA
edualvess May 8, 2024
e1acb56
Defined sla_miss_callback for DAG level parameter
edualvess May 8, 2024
9a0852f
Merge branch 'master' into feature/add_slas
edualvess May 8, 2024
6f85078
Increased time task timeout value
edualvess May 9, 2024
6d01a09
Added dev Airflow variable for tasks SLA values
edualvess May 9, 2024
9911c3b
Added slack notification logic for the SLA miss callback
edualvess May 9, 2024
8982c76
Added SLA parameter to the relavant task building functions
edualvess May 9, 2024
e0e0ea7
Removed SLA miss callback function and related logic
edualvess May 9, 2024
88c9da2
Added refactored logic for sla miss callback function
edualvess May 9, 2024
accb888
Added sla miss callback to desired dags
edualvess May 9, 2024
bc45c40
Adjusted callback logic for sentry integration
edualvess May 9, 2024
ad5a1b4
Added task SLA values for prod Airflow variables
edualvess May 15, 2024
f15c2d6
Changed alert message to better distinguish SLA miss from task fail a…
edualvess May 15, 2024
c5fa61e
Merge branch 'master' into feature/add_slas
chowbao May 15, 2024
e8c7ca2
Merge branch 'master' into feature/add_slas
chowbao May 15, 2024
7660175
Merge branch 'master' into feature/add_slas
edualvess May 20, 2024
9d5b32a
Update dev tasks sla and timeout values
edualvess May 22, 2024
fb1d4e8
Update prod tasks sla and timeout values
edualvess May 22, 2024
efd34e2
Added default sla value to dag default args
edualvess May 22, 2024
ffc003b
Added missing SLA param to build task functions
edualvess May 22, 2024
1dfdd23
Standardizing logic for fetching SLA param value
edualvess May 22, 2024
7beb0aa
Added sla miss callback reference
edualvess May 22, 2024
7f0f421
Remove default SLA from default args
edualvess May 24, 2024
6ef6453
Set missing sla params to tasks
edualvess Jun 4, 2024
48d2e69
Updated values for sla variables
edualvess Jun 5, 2024
b91062d
Merge branch 'master' into feature/add_slas
edualvess Jun 5, 2024
f28457d
Updated build_dbt_task timeout value
edualvess Jun 5, 2024
ee2b3ff
Merge branch 'master' into feature/add_slas
edualvess Jun 12, 2024
e85dba9
Changed variable names to match new naming reference
edualvess Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,29 @@
"build_delete_data_task": 180,
"build_export_task": 180,
"build_gcs_to_bq_task": 300,
"build_time_task": 120
"build_time_task": 300
},
"task_sla": {
"get_ledger_range_from_times": 240,
"export_task": 240,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to adjust some of these sla times. Like for instance this export_task sla 240 will never be hit because the task_timeout for export_task is 180.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the current sla times are too aggressive from all the alerts we are getting in the slack channel #alerts-hubble-testnet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to adjust some of these sla times. Like for instance this export_task sla 240 will never be hit because the task_timeout for export_task is 180.

There's an interesting behavior going on for the export_task. Even though the task_timeout is set for 180 seconds, the tasks usually run for around 220 seconds without triggering the timeout. Differently from time_task, where the timeout triggers in the middle of the job.

I'll push the commits where I increase the timeout for export_task too, which makes more sense to the actual behavior of the pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the current sla times are too aggressive from all the alerts we are getting in the slack channel #alerts-hubble-testnet

Those have been reviewed through the suggested buffer and will be updated

"enriched_history_operations": 480,
"current_state": 480,
"elementary_dbt_enriched_base_tables": 240,
"ohlc": 720,
"liquidity_pool_trade_volume": 840,
"mgi": 480,
"liquidity_providers": 720,
"liquidity_pools_value": 840,
"liquidity_pools_value_history": 600,
"trade_agg": 720,
"fee_stats": 840,
"asset_stats": 600,
"network_stats": 720,
"partnership_assets": 600,
"history_assets": 720,
"soroban": 720,
"snapshot_state": 600,
"elementary_dbt_sdf_marts": 300
},
"dbt_tables": {
"signers_current": "account_signers_current",
Expand Down
24 changes: 23 additions & 1 deletion airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,29 @@
"build_delete_data_task": 180,
"build_export_task": 180,
"build_gcs_to_bq_task": 300,
"build_time_task": 120
"build_time_task": 300
},
"task_sla": {
"get_ledger_range_from_times": 240,
"export_task": 240,
"enriched_history_operations": 840,
"current_state": 600,
"elementary_dbt_enriched_base_tables": 240,
"ohlc": 720,
"liquidity_pool_trade_volume": 240,
"mgi": 720,
"liquidity_providers": 600,
"liquidity_pools_value": 300,
"liquidity_pools_value_history": 300,
"trade_agg": 720,
"fee_stats": 300,
"asset_stats": 360,
"network_stats": 300,
"partnership_assets": 1200,
"history_assets": 360,
"soroban": 360,
"snapshot_state": 840,
"elementary_dbt_sdf_marts": 120
},
"dbt_tables": {
"signers_current": "account_signers_current",
Expand Down
7 changes: 6 additions & 1 deletion dags/dbt_enriched_base_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -21,6 +25,7 @@
max_active_runs=1,
catchup=False,
tags=["dbt-enriched-base-tables"],
sla_miss_callback=alert_sla_miss,
)

# Wait on ingestion DAGs
Expand Down
7 changes: 6 additions & 1 deletion dags/dbt_sdf_marts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -21,6 +25,7 @@
max_active_runs=3,
catchup=True,
tags=["dbt-sdf-marts"],
sla_miss_callback=alert_sla_miss,
)

# Wait on ingestion DAGs
Expand Down
7 changes: 6 additions & 1 deletion dags/history_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from stellar_etl_airflow.build_export_task import build_export_task
from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task
from stellar_etl_airflow.build_time_task import build_time_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -42,6 +46,7 @@
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
sla_miss_callback=alert_sla_miss,
)


Expand Down
7 changes: 6 additions & 1 deletion dags/state_table_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from stellar_etl_airflow.build_export_task import build_export_task
from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task
from stellar_etl_airflow.build_time_task import build_time_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

Expand All @@ -40,6 +44,7 @@
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
catchup=True,
sla_miss_callback=alert_sla_miss,
)


Expand Down
3 changes: 3 additions & 0 deletions dags/stellar_etl_airflow/build_dbt_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def dbt_task(
on_failure_callback=alert_after_max_retries,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[task_name]
),
edualvess marked this conversation as resolved.
Show resolved Hide resolved
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import logging
from datetime import timedelta

from airflow.configuration import conf
from airflow.models import Variable
Expand Down Expand Up @@ -89,4 +90,9 @@ def elementary_task(
on_failure_callback=alert_after_max_retries,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
f"elementary_{task_name}"
]
),
)
3 changes: 3 additions & 0 deletions dags/stellar_etl_airflow/build_export_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,7 @@ def build_export_task(
affinity=affinity,
on_failure_callback=alert_after_max_retries,
image_pull_policy=Variable.get("image_pull_policy"),
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["export_task"]
),
)
5 changes: 5 additions & 0 deletions dags/stellar_etl_airflow/build_time_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@ def build_time_task(
container_resources=resources_requests,
on_failure_callback=alert_after_max_retries,
image_pull_policy=Variable.get("image_pull_policy"),
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
"get_ledger_range_from_times"
]
),
)
21 changes: 21 additions & 0 deletions dags/stellar_etl_airflow/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,24 @@ def alert_after_max_retries(context):
f"The task {ti.task_id} belonging to DAG {ti.dag_id} failed after max retries.",
"fatal",
)


def alert_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
"""
When a task takes longer then expected to run while having a defined SLA,
it misses it.
This alerts the IT team about the unexpected behavior in order
to enable faster response in case of underlying infrastructure issues.
"""
dag_id = slas[0].dag_id
task_id = slas[0].task_id
execution_date = slas[0].execution_date.isoformat()

with push_scope() as scope:
scope.set_extra("dag_id", dag_id)
scope.set_extra("task_id", task_id)
scope.set_extra("execution_date", execution_date)
capture_message(
f"SLA Miss! The task {task_id} belonging to DAG {dag_id} missed its SLA for the run date {execution_date}.",
"warn",
)
Loading