diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b5cdb4b63589..c1f934105f8b 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1092,30 +1092,30 @@ def _task_instance_exists(session, source_table, dag_run, task_instance): """ if 'run_id' not in task_instance.c: # db is < 2.2.0 - source_to_ti_join_cond = and_( + where_clause = and_( source_table.c.dag_id == task_instance.c.dag_id, source_table.c.task_id == task_instance.c.task_id, source_table.c.execution_date == task_instance.c.execution_date, ) ti_to_dr_join_cond = and_( - source_table.c.dag_id == task_instance.c.dag_id, - source_table.c.execution_date == task_instance.c.execution_date, + dag_run.c.dag_id == task_instance.c.dag_id, + dag_run.c.execution_date == task_instance.c.execution_date, ) else: # db is 2.2.0 <= version < 2.3.0 - source_to_ti_join_cond = and_( + where_clause = and_( source_table.c.dag_id == task_instance.c.dag_id, source_table.c.task_id == task_instance.c.task_id, + source_table.c.execution_date == dag_run.c.execution_date, ) ti_to_dr_join_cond = and_( - source_table.c.dag_id == task_instance.c.dag_id, + dag_run.c.dag_id == task_instance.c.dag_id, dag_run.c.run_id == task_instance.c.run_id, - source_table.c.execution_date == dag_run.c.execution_date, ) exists_subquery = ( session.query(text('1')) .select_from(task_instance.join(dag_run, onclause=ti_to_dr_join_cond)) - .filter(source_to_ti_join_cond) + .filter(where_clause) ) return exists_subquery