Skip to content

Commit

Permalink
feat: minimum scheduler age before task-creation-check (#612)
Browse files Browse the repository at this point in the history
Signed-off-by: Mathew Wicks <[email protected]>
  • Loading branch information
thesuperzapper authored Jun 22, 2022
1 parent 726af25 commit afcbe79
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 25 deletions.
21 changes: 13 additions & 8 deletions charts/airflow/docs/faq/monitoring/scheduler-liveness-probe.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ for each airflow scheduler which regularly queries the Airflow Metadata Database
A scheduler is "healthy" if it has had a "heartbeat" in the last `AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD` seconds.
Each scheduler will perform a "heartbeat" every `AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC` seconds by updating the `latest_heartbeat` of its `SchedulerJob` in the Airflow Metadata `jobs` table.

> 🟥 __Warning__ 🟥
>
> A scheduler can have a "heartbeat" but be deadlocked such that it's unable to schedule new tasks,
> we provide the [`scheduler.livenessProbe.taskCreationCheck.*`](#scheduler-task-creation-check) values to automatically restart the scheduler in these cases.
>
> - https://github.com/apache/airflow/issues/7935 - patched in airflow `2.0.2`
> - https://github.com/apache/airflow/issues/15938 - patched in airflow `2.1.1`
By default, the chart runs a liveness probe every __30 seconds__ (`periodSeconds`), and will restart a scheduler if __5 probe failures__ (`failureThreshold`) occur in a row.
This means a scheduler must be unhealthy for at least `30 x 5 = 150` seconds before Kubernetes will automatically restart a scheduler Pod.

Expand Down Expand Up @@ -49,6 +41,14 @@ scheduler:
failureThreshold: 5
```
> 🟥 __Warning__ 🟥
>
> A scheduler can have a "heartbeat" but be deadlocked such that it's unable to schedule new tasks,
> the ["task creation check"](#scheduler-task-creation-check) should detect these situations and force a scheduler restart.
>
> - https://github.com/apache/airflow/issues/7935 - patched in airflow `2.0.2`
> - https://github.com/apache/airflow/issues/15938 - patched in airflow `2.1.1`

## Scheduler "Task Creation Check"

The liveness probe can additionally check if the Scheduler is creating new [tasks](https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html) as an indication of its health.
Expand All @@ -73,6 +73,11 @@ scheduler:
## WARNING: must be AT LEAST equal to your shortest DAG schedule_interval
## WARNING: DummyOperator tasks will NOT be seen by this probe
thresholdSeconds: 300
## minimum number of seconds the scheduler must have run before the task creation check begins
## WARNING: must be long enough for the scheduler to boot and create a task
##
schedulerAgeBeforeCheck: 180
```

> 🟦 __Tip__ 🟦
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/examples/google-gke/custom-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/examples/minikube/custom-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/sample-values-CeleryExecutor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/sample-values-CeleryKubernetesExecutor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
1 change: 1 addition & 0 deletions charts/airflow/sample-values-KubernetesExecutor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ scheduler:
taskCreationCheck:
enabled: false
thresholdSeconds: 300
schedulerAgeBeforeCheck: 180

###################################
## COMPONENT | Airflow Webserver
Expand Down
45 changes: 28 additions & 17 deletions charts/airflow/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ spec:
{{- end }}
with create_session() as session:
########################
# heartbeat check
########################
# ensure the SchedulerJob with most recent heartbeat for this `hostname` is alive
hostname = get_hostname()
scheduler_job = session \
Expand All @@ -161,29 +164,37 @@ spec:
pass
else:
sys.exit(f"The SchedulerJob (id={scheduler_job.id}) for hostname '{hostname}' is not alive")
{{- if .Values.scheduler.livenessProbe.taskCreationCheck.enabled }}
{{- $min_scheduler_age := .Values.scheduler.livenessProbe.taskCreationCheck.schedulerAgeBeforeCheck }}
{{- if not (or (typeIs "float64" $min_scheduler_age) (typeIs "int64" $min_scheduler_age)) }}
{{- /* the type of a number could be float64 or int64 depending on how it was set (values.yaml, or --set) */ -}}
{{ required (printf "`scheduler.livenessProbe.taskCreationCheck.schedulerAgeBeforeCheck` must be int-type, but got %s!" (typeOf $min_scheduler_age)) nil }}
{{- end }}
{{- $task_job_threshold := .Values.scheduler.livenessProbe.taskCreationCheck.thresholdSeconds }}
{{- if not (or (typeIs "float64" $task_job_threshold) (typeIs "int64" $task_job_threshold)) }}
{{- /* the type of a number could be float64 or int64 depending on how it was set (values.yaml, or --set) */ -}}
{{ required (printf "`scheduler.livenessProbe.taskCreationCheck.thresholdSeconds` must be int-type, but got %s!" (typeOf $task_job_threshold)) nil }}
{{- end }}
# ensure the most recent LocalTaskJob had a start_date in the last `task_job_threshold` seconds
task_job_threshold = {{ $task_job_threshold }}
task_job = session \
.query(LocalTaskJob) \
.order_by(LocalTaskJob.id.desc()) \
.limit(1) \
.first()
if task_job is not None:
if (timezone.utcnow() - task_job.start_date).total_seconds() < task_job_threshold:
pass
else:
sys.exit(
f"The most recent LocalTaskJob (id={task_job.id}, dag_id={task_job.dag_id}) "
f"started over {task_job_threshold} seconds ago"
)
########################
# task creation check
########################
min_scheduler_age = {{ $min_scheduler_age }}
if (timezone.utcnow() - scheduler_job.start_date).total_seconds() > min_scheduler_age:
# ensure the most recent LocalTaskJob had a start_date in the last `task_job_threshold` seconds
task_job_threshold = {{ $task_job_threshold }}
task_job = session \
.query(LocalTaskJob) \
.order_by(LocalTaskJob.id.desc()) \
.limit(1) \
.first()
if task_job is not None:
if (timezone.utcnow() - task_job.start_date).total_seconds() < task_job_threshold:
pass
else:
sys.exit(
f"The most recent LocalTaskJob (id={task_job.id}, dag_id={task_job.dag_id}) "
f"started over {task_job_threshold} seconds ago"
)
{{- end }}
{{- end }}
{{- if or ($volumeMounts) (include "airflow.executor.kubernetes_like" .) }}
Expand Down
5 changes: 5 additions & 0 deletions charts/airflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,11 @@ scheduler:
##
thresholdSeconds: 300

## minimum number of seconds the scheduler must have run before the task creation check begins
## - [WARNING] must be long enough for the scheduler to boot and create a task
##
schedulerAgeBeforeCheck: 180

## extra pip packages to install in the scheduler Pods
##
## ____ EXAMPLE _______________
Expand Down

0 comments on commit afcbe79

Please sign in to comment.