Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not push stale update to related DagRun on TI update after task execution #45348

Merged

Conversation

mobuchowski
Copy link
Contributor

@mobuchowski mobuchowski commented Jan 2, 2025

This is a re-targeted pr #44547 for Airflow 2.X version only, as this fix loses relevancy for Airflow 3.

When TI is marked as failed via UI, and later fails itself, scheduler changes the state to failed twice.

(some extra logs for clarity)

[2024-11-28T14:48:11.276+0000] {scheduler_job_runner.py:1092} INFO - another scheduler loop 75
[2024-11-28T14:48:11.284+0000] {scheduler_job_runner.py:1666} ERROR - Scheduling dag run <DagRun wait_to_fail @ 2024-11-28 14:48:04.731
025+00:00: manual__2024-11-28T14:48:04.731025+00:00, state:running, queued_at: 2024-11-28 14:48:04.739184+00:00. externally triggered:
True> state running
[2024-11-28T14:48:11.287+0000] {dagrun.py:906} ERROR - Marking run <DagRun wait_to_fail @ 2024-11-28 14:48:04.731025+00:00: manual__202
4-11-28T14:48:04.731025+00:00, state:running, queued_at: 2024-11-28 14:48:04.739184+00:00. externally triggered: True> failed - from st
ate running at 2024-11-28 14:48:11.286513+00:00
[2024-11-28T14:48:11.288+0000] {dagrun.py:1101} ERROR - NOTIFYING STATE CHANGED TO failed
[2024-11-28T14:48:11.289+0000] {dagrun.py:989} INFO - DagRun Finished: dag_id=wait_to_fail, logical_date=2024-11-28 14:48:04.731025+00:
00, run_id=manual__2024-11-28T14:48:04.731025+00:00, run_start_date=2024-11-28 14:48:05.186397+00:00, run_end_date=2024-11-28 14:48:11.
288084+00:00, run_duration=6.101687, state=failed, external_trigger=True, run_type=manual, data_interval_start=2024-11-28 14:48:04.7310
25+00:00, data_interval_end=2024-11-28 14:48:04.731025+00:00, dag_version_name=wait_to_fail-1
[2024-11-28T14:48:11.299+0000] {client.py:110} INFO - OpenLineageClient will use `http` transport
[2024-11-28T14:48:12.307+0000] {scheduler_job_runner.py:1092} INFO - another scheduler loop 76
[2024-11-28T14:48:13.344+0000] {scheduler_job_runner.py:1092} INFO - another scheduler loop 77
[2024-11-28T14:48:14.382+0000] {scheduler_job_runner.py:1092} INFO - another scheduler loop 78
[2024-11-28T14:48:15.274+0000] {scheduler_job_runner.py:1092} INFO - another scheduler loop 79
[2024-11-28T14:48:16.309+0000] {scheduler_job_runner.py:1092} INFO - another scheduler loop 80
[2024-11-28T14:48:17.345+0000] {scheduler_job_runner.py:1092} INFO - another scheduler loop 81
[2024-11-28T14:48:17.361+0000] {scheduler_job_runner.py:1666} ERROR - Scheduling dag run <DagRun wait_to_fail @ 2024-11-28 14:48:04.731
025+00:00: manual__2024-11-28T14:48:04.731025+00:00, state:running, queued_at: 2024-11-28 14:48:04.739184+00:00. externally triggered:
True> state running
[2024-11-28T14:48:17.366+0000] {dagrun.py:906} ERROR - Marking run <DagRun wait_to_fail @ 2024-11-28 14:48:04.731025+00:00: manual__202
4-11-28T14:48:04.731025+00:00, state:running, queued_at: 2024-11-28 14:48:04.739184+00:00. externally triggered: True> failed - from st
ate running at 2024-11-28 14:48:17.364879+00:00
[2024-11-28T14:48:17.366+0000] {dagrun.py:1101} ERROR - NOTIFYING STATE CHANGED TO failed
[2024-11-28T14:48:17.367+0000] {dagrun.py:989} INFO - DagRun Finished: dag_id=wait_to_fail, logical_date=2024-11-28 14:48:04.731025+00:
00, run_id=manual__2024-11-28T14:48:04.731025+00:00, run_start_date=2024-11-28 14:48:05.186397+00:00, run_end_date=2024-11-28 14:48:17.
366330+00:00, run_duration=12.179933, state=failed, external_trigger=True, run_type=manual, data_interval_start=2024-11-28 14:48:04.731
025+00:00, data_interval_end=2024-11-28 14:48:04.731025+00:00, dag_version_name=wait_to_fail-1
[2024-11-28T14:48:17.370+0000] {client.py:110} INFO - OpenLineageClient will use `http` transport

This happens because on handle_failure, TaskInstance.save_to_db not only updates state of that task instance, it also pushes stale DagRun state - the one it got on TI start. So the actual DR state goes running -> failed -> running -> failed.

This causes other unintended behavior, such as calling on_dag_run_failed listeners twice.

The solution just loads DR state from db before pushing TI state. However, there probably is better solution, that someone with more knowledge of SQLAlchemy might help with.

Link to discussion on Airflow slack: https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1732805503889679

@boring-cyborg boring-cyborg bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Jan 2, 2025
@mobuchowski mobuchowski force-pushed the do-not-update-dr-after-ti-fails-2.10 branch 4 times, most recently from 2d536fc to 4e4906b Compare January 3, 2025 15:05
@mobuchowski mobuchowski force-pushed the do-not-update-dr-after-ti-fails-2.10 branch from 4e4906b to 295a85d Compare January 7, 2025 10:58
@potiuk potiuk merged commit 586f1ea into apache:v2-10-test Jan 8, 2025
48 checks passed
@potiuk potiuk added this to the Airflow 2.10.5 milestone Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants