From afcbe796e7e2d979abcb172f7ebe011ab55ce44c Mon Sep 17 00:00:00 2001 From: Mathew Wicks Date: Wed, 22 Jun 2022 17:24:07 +1000 Subject: [PATCH] feat: minimum scheduler age before task-creation-check (#612) Signed-off-by: Mathew Wicks --- .../monitoring/scheduler-liveness-probe.md | 21 +++++---- .../examples/google-gke/custom-values.yaml | 1 + .../examples/minikube/custom-values.yaml | 1 + .../airflow/sample-values-CeleryExecutor.yaml | 1 + ...ample-values-CeleryKubernetesExecutor.yaml | 1 + .../sample-values-KubernetesExecutor.yaml | 1 + .../scheduler/scheduler-deployment.yaml | 45 ++++++++++++------- charts/airflow/values.yaml | 5 +++ 8 files changed, 51 insertions(+), 25 deletions(-) diff --git a/charts/airflow/docs/faq/monitoring/scheduler-liveness-probe.md b/charts/airflow/docs/faq/monitoring/scheduler-liveness-probe.md index ac9ae1bd..93049335 100644 --- a/charts/airflow/docs/faq/monitoring/scheduler-liveness-probe.md +++ b/charts/airflow/docs/faq/monitoring/scheduler-liveness-probe.md @@ -12,14 +12,6 @@ for each airflow scheduler which regularly queries the Airflow Metadata Database A scheduler is "healthy" if it has had a "heartbeat" in the last `AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD` seconds. Each scheduler will perform a "heartbeat" every `AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC` seconds by updating the `latest_heartbeat` of its `SchedulerJob` in the Airflow Metadata `jobs` table. -> 🟥 __Warning__ 🟥 -> -> A scheduler can have a "heartbeat" but be deadlocked such that it's unable to schedule new tasks, -> we provide the [`scheduler.livenessProbe.taskCreationCheck.*`](#scheduler-task-creation-check) values to automatically restart the scheduler in these cases. -> -> - https://github.com/apache/airflow/issues/7935 - patched in airflow `2.0.2` -> - https://github.com/apache/airflow/issues/15938 - patched in airflow `2.1.1` - By default, the chart runs a liveness probe every __30 seconds__ (`periodSeconds`), and will restart a scheduler if __5 probe failures__ (`failureThreshold`) occur in a row. This means a scheduler must be unhealthy for at least `30 x 5 = 150` seconds before Kubernetes will automatically restart a scheduler Pod. @@ -49,6 +41,14 @@ scheduler: failureThreshold: 5 ``` +> 🟥 __Warning__ 🟥 +> +> A scheduler can have a "heartbeat" but be deadlocked such that it's unable to schedule new tasks, +> the ["task creation check"](#scheduler-task-creation-check) should detect these situations and force a scheduler restart. +> +> - https://github.com/apache/airflow/issues/7935 - patched in airflow `2.0.2` +> - https://github.com/apache/airflow/issues/15938 - patched in airflow `2.1.1` + ## Scheduler "Task Creation Check" The liveness probe can additionally check if the Scheduler is creating new [tasks](https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html) as an indication of its health. @@ -73,6 +73,11 @@ scheduler: ## WARNING: must be AT LEAST equal to your shortest DAG schedule_interval ## WARNING: DummyOperator tasks will NOT be seen by this probe thresholdSeconds: 300 + + ## minimum number of seconds the scheduler must have run before the task creation check begins + ## WARNING: must be long enough for the scheduler to boot and create a task + ## + schedulerAgeBeforeCheck: 180 ``` > 🟦 __Tip__ 🟦 diff --git a/charts/airflow/examples/google-gke/custom-values.yaml b/charts/airflow/examples/google-gke/custom-values.yaml index c24ed9a1..91e807f3 100644 --- a/charts/airflow/examples/google-gke/custom-values.yaml +++ b/charts/airflow/examples/google-gke/custom-values.yaml @@ -154,6 +154,7 @@ scheduler: taskCreationCheck: enabled: false thresholdSeconds: 300 + schedulerAgeBeforeCheck: 180 ################################### ## COMPONENT | Airflow Webserver diff --git a/charts/airflow/examples/minikube/custom-values.yaml b/charts/airflow/examples/minikube/custom-values.yaml index fb9e4c3f..eed78ef8 100644 --- a/charts/airflow/examples/minikube/custom-values.yaml +++ b/charts/airflow/examples/minikube/custom-values.yaml @@ -118,6 +118,7 @@ scheduler: taskCreationCheck: enabled: false thresholdSeconds: 300 + schedulerAgeBeforeCheck: 180 ################################### ## COMPONENT | Airflow Webserver diff --git a/charts/airflow/sample-values-CeleryExecutor.yaml b/charts/airflow/sample-values-CeleryExecutor.yaml index 613bce42..020f3479 100644 --- a/charts/airflow/sample-values-CeleryExecutor.yaml +++ b/charts/airflow/sample-values-CeleryExecutor.yaml @@ -98,6 +98,7 @@ scheduler: taskCreationCheck: enabled: false thresholdSeconds: 300 + schedulerAgeBeforeCheck: 180 ################################### ## COMPONENT | Airflow Webserver diff --git a/charts/airflow/sample-values-CeleryKubernetesExecutor.yaml b/charts/airflow/sample-values-CeleryKubernetesExecutor.yaml index 8f053dbd..71cc193b 100644 --- a/charts/airflow/sample-values-CeleryKubernetesExecutor.yaml +++ b/charts/airflow/sample-values-CeleryKubernetesExecutor.yaml @@ -126,6 +126,7 @@ scheduler: taskCreationCheck: enabled: false thresholdSeconds: 300 + schedulerAgeBeforeCheck: 180 ################################### ## COMPONENT | Airflow Webserver diff --git a/charts/airflow/sample-values-KubernetesExecutor.yaml b/charts/airflow/sample-values-KubernetesExecutor.yaml index 19bcf9c5..06e997e7 100644 --- a/charts/airflow/sample-values-KubernetesExecutor.yaml +++ b/charts/airflow/sample-values-KubernetesExecutor.yaml @@ -126,6 +126,7 @@ scheduler: taskCreationCheck: enabled: false thresholdSeconds: 300 + schedulerAgeBeforeCheck: 180 ################################### ## COMPONENT | Airflow Webserver diff --git a/charts/airflow/templates/scheduler/scheduler-deployment.yaml b/charts/airflow/templates/scheduler/scheduler-deployment.yaml index f6ccd3eb..529aa2ce 100644 --- a/charts/airflow/templates/scheduler/scheduler-deployment.yaml +++ b/charts/airflow/templates/scheduler/scheduler-deployment.yaml @@ -149,6 +149,9 @@ spec: {{- end }} with create_session() as session: + ######################## + # heartbeat check + ######################## # ensure the SchedulerJob with most recent heartbeat for this `hostname` is alive hostname = get_hostname() scheduler_job = session \ @@ -161,29 +164,37 @@ spec: pass else: sys.exit(f"The SchedulerJob (id={scheduler_job.id}) for hostname '{hostname}' is not alive") - {{- if .Values.scheduler.livenessProbe.taskCreationCheck.enabled }} + {{- $min_scheduler_age := .Values.scheduler.livenessProbe.taskCreationCheck.schedulerAgeBeforeCheck }} + {{- if not (or (typeIs "float64" $min_scheduler_age) (typeIs "int64" $min_scheduler_age)) }} + {{- /* the type of a number could be float64 or int64 depending on how it was set (values.yaml, or --set) */ -}} + {{ required (printf "`scheduler.livenessProbe.taskCreationCheck.schedulerAgeBeforeCheck` must be int-type, but got %s!" (typeOf $min_scheduler_age)) nil }} + {{- end }} {{- $task_job_threshold := .Values.scheduler.livenessProbe.taskCreationCheck.thresholdSeconds }} {{- if not (or (typeIs "float64" $task_job_threshold) (typeIs "int64" $task_job_threshold)) }} {{- /* the type of a number could be float64 or int64 depending on how it was set (values.yaml, or --set) */ -}} {{ required (printf "`scheduler.livenessProbe.taskCreationCheck.thresholdSeconds` must be int-type, but got %s!" (typeOf $task_job_threshold)) nil }} {{- end }} - - # ensure the most recent LocalTaskJob had a start_date in the last `task_job_threshold` seconds - task_job_threshold = {{ $task_job_threshold }} - task_job = session \ - .query(LocalTaskJob) \ - .order_by(LocalTaskJob.id.desc()) \ - .limit(1) \ - .first() - if task_job is not None: - if (timezone.utcnow() - task_job.start_date).total_seconds() < task_job_threshold: - pass - else: - sys.exit( - f"The most recent LocalTaskJob (id={task_job.id}, dag_id={task_job.dag_id}) " - f"started over {task_job_threshold} seconds ago" - ) + ######################## + # task creation check + ######################## + min_scheduler_age = {{ $min_scheduler_age }} + if (timezone.utcnow() - scheduler_job.start_date).total_seconds() > min_scheduler_age: + # ensure the most recent LocalTaskJob had a start_date in the last `task_job_threshold` seconds + task_job_threshold = {{ $task_job_threshold }} + task_job = session \ + .query(LocalTaskJob) \ + .order_by(LocalTaskJob.id.desc()) \ + .limit(1) \ + .first() + if task_job is not None: + if (timezone.utcnow() - task_job.start_date).total_seconds() < task_job_threshold: + pass + else: + sys.exit( + f"The most recent LocalTaskJob (id={task_job.id}, dag_id={task_job.dag_id}) " + f"started over {task_job_threshold} seconds ago" + ) {{- end }} {{- end }} {{- if or ($volumeMounts) (include "airflow.executor.kubernetes_like" .) }} diff --git a/charts/airflow/values.yaml b/charts/airflow/values.yaml index 841c9ff6..9b7b987d 100644 --- a/charts/airflow/values.yaml +++ b/charts/airflow/values.yaml @@ -624,6 +624,11 @@ scheduler: ## thresholdSeconds: 300 + ## minimum number of seconds the scheduler must have run before the task creation check begins + ## - [WARNING] must be long enough for the scheduler to boot and create a task + ## + schedulerAgeBeforeCheck: 180 + ## extra pip packages to install in the scheduler Pods ## ## ____ EXAMPLE _______________