From 1103c8201d887ce3f090cba707c5552e2d93e53a Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 14 Oct 2021 16:09:38 +0100 Subject: [PATCH] Row lock TI query in SchedulerJob._process_executor_events (#18975) Using multiple schedulers causes Deadlock in _process_executor_events. This PR fixes it. Co-authored-by: Tzu-ping Chung (cherry picked from commit 52cc84c66afc2c73e8a2a80aae46f2208f07c4cb) (cherry picked from commit d161134d540be46d6a641dd85df1a31a2cd80779) --- airflow/jobs/scheduler_job.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 2bfd56a3810b..4062edd67075 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -26,7 +26,7 @@ import time from collections import defaultdict from datetime import timedelta -from typing import DefaultDict, Dict, Iterable, List, Optional, Tuple +from typing import DefaultDict, Dict, Iterable, Iterator, List, Optional, Tuple from sqlalchemy import and_, func, not_, or_, tuple_ from sqlalchemy.exc import OperationalError @@ -626,7 +626,15 @@ def _process_executor_events(self, session: Session = None) -> int: # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) - tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all() + query = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')) + # row lock this entire set of taskinstances to make sure the scheduler doesn't fail when we have + # multi-schedulers + tis: Iterator[TI] = with_row_locks( + query, + of=TI, + session=session, + **skip_locked(session=session), + ) for ti in tis: try_number = ti_primary_key_to_try_number_map[ti.key.primary] buffer_key = ti.key.with_try_number(try_number)