Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

full refacto sensors error management #41047

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions airflow/example_dags/example_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.base import FailPolicy
from airflow.sensors.bash import BashSensor
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.python import PythonSensor
Expand Down Expand Up @@ -68,7 +69,7 @@ def failure_callable():
t2 = TimeSensor(
task_id="timeout_after_second_date_in_the_future",
timeout=1,
soft_fail=True,
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(),
)
# [END example_time_sensors]
Expand All @@ -81,15 +82,20 @@ def failure_callable():
t2a = TimeSensorAsync(
task_id="timeout_after_second_date_in_the_future_async",
timeout=1,
soft_fail=True,
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(),
)
# [END example_time_sensors_async]

# [START example_bash_sensors]
t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0")

t4 = BashSensor(task_id="Sensor_fails_after_3_seconds", timeout=3, soft_fail=True, bash_command="exit 1")
t4 = BashSensor(
task_id="Sensor_fails_after_3_seconds",
timeout=3,
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
bash_command="exit 1",
)
# [END example_bash_sensors]

t5 = BashOperator(task_id="remove_file", bash_command="rm -rf /tmp/temporary_file_for_testing")
Expand All @@ -112,13 +118,19 @@ def failure_callable():
t9 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)

t10 = PythonSensor(
task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
task_id="failure_timeout_sensor_python",
timeout=3,
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
python_callable=failure_callable,
)
# [END example_python_sensors]

# [START example_day_of_week_sensor]
t11 = DayOfWeekSensor(
task_id="week_day_sensor_failing_on_timeout", timeout=3, soft_fail=True, week_day=WeekDay.MONDAY
task_id="week_day_sensor_failing_on_timeout",
timeout=3,
fail_policy=FailPolicy.SKIP_ON_TIMEOUT,
week_day=WeekDay.MONDAY,
)
# [END example_day_of_week_sensor]

Expand Down
15 changes: 15 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class AirflowSensorTimeout(AirflowException):
"""Raise when there is a timeout on sensor polling."""


class AirflowPokeFailException(AirflowException):
"""Raise when a sensor must not try to poke again."""


class AirflowRescheduleException(AirflowException):
"""
Raise when the task should be re-scheduled at a later time.
Expand Down Expand Up @@ -439,6 +443,17 @@ class PodReconciliationError(AirflowException): # type: ignore[no-redef]
"""Raised when an error is encountered while trying to merge pod configs."""


class RemovedInAirflow3SoftWarning(DeprecationWarning):
"""
Issued for usage of deprecated features that will be removed in Airflow3.

But that do not fail in the tests.
"""

deprecated_since: str | None = None
"Indicates the airflow version that started raising this deprecation warning"


class RemovedInAirflow3Warning(DeprecationWarning):
"""Issued for usage of deprecated features that will be removed in Airflow3."""

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
__version__ = "8.27.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.7.0"
"2.7.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you change the min version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cause I removed

# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1

):
raise RuntimeError(
f"The package `apache-airflow-providers-amazon:{__version__}` needs Apache Airflow 2.7.0+"
f"The package `apache-airflow-providers-amazon:{__version__}` needs Apache Airflow 2.7.1+"
)
11 changes: 1 addition & 10 deletions airflow/providers/amazon/aws/sensors/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
if TYPE_CHECKING:
from airflow.utils.context import Context

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.triggers.s3 import S3KeysUnchangedTrigger, S3KeyTrigger
from airflow.sensors.base import BaseSensorOperator, poke_mode_only
Expand Down Expand Up @@ -219,9 +219,6 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
if not found_keys:
self._defer()
elif event["status"] == "error":
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(event["message"])
raise AirflowException(event["message"])

@deprecated(reason="use `hook` property instead.", category=AirflowProviderDeprecationWarning)
Expand Down Expand Up @@ -342,13 +339,10 @@ def is_keys_unchanged(self, current_objects: set[str]) -> bool:
)
return False

# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = (
f"Illegal behavior: objects were deleted in"
f" {os.path.join(self.bucket_name, self.prefix)} between pokes."
)
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)

if self.last_activity_time:
Expand Down Expand Up @@ -411,8 +405,5 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None
event = validate_execute_complete_event(event)

if event and event["status"] == "error":
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(event["message"])
raise AirflowException(event["message"])
return None
3 changes: 1 addition & 2 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,10 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.7.0
- apache-airflow>=2.7.1
- apache-airflow-providers-common-compat>=1.1.0
- apache-airflow-providers-common-sql>=1.3.1
- apache-airflow-providers-http
- apache-airflow-providers-common-compat>=1.1.0
# We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number
# of candidates to consider. Make sure to configure boto3 version here as well as in all the tools below
# in the `devel-dependencies` section to be the same minimum version.
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/common/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@
raise RuntimeError(
f"The package `apache-airflow-providers-common-compat:{__version__}` needs Apache Airflow 2.7.0+"
)


def is_at_least_2_10_0() -> bool:
return packaging.version.parse(
packaging.version.parse(airflow_version).base_version
) >= packaging.version.parse("2.10.0")
4 changes: 2 additions & 2 deletions airflow/providers/ftp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
__version__ = "3.10.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.7.0"
"2.7.1"
):
raise RuntimeError(
f"The package `apache-airflow-providers-ftp:{__version__}` needs Apache Airflow 2.7.0+"
f"The package `apache-airflow-providers-ftp:{__version__}` needs Apache Airflow 2.7.1+"
)
2 changes: 1 addition & 1 deletion airflow/providers/ftp/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.7.0
- apache-airflow>=2.7.1

integrations:
- integration-name: File Transfer Protocol (FTP)
Expand Down
7 changes: 3 additions & 4 deletions airflow/providers/ftp/sensors/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import re
from typing import TYPE_CHECKING, Sequence

from airflow.exceptions import AirflowSkipException
from airflow.exceptions import AirflowSensorTimeout
from airflow.providers.ftp.hooks.ftp import FTPHook, FTPSHook
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -83,9 +83,8 @@ def poke(self, context: Context) -> bool:
if (error_code != 550) and (
self.fail_on_transient_errors or (error_code not in self.transient_errors)
):
if self.soft_fail:
raise AirflowSkipException from e
raise e
# TODO: replace by AirflowPokeFailException when min_airflow_version is set to at least 2.10.0
raise AirflowSensorTimeout from e

return False

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
__version__ = "4.12.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.7.0"
"2.7.1"
):
raise RuntimeError(
f"The package `apache-airflow-providers-http:{__version__}` needs Apache Airflow 2.7.0+"
f"The package `apache-airflow-providers-http:{__version__}` needs Apache Airflow 2.7.1+"
)
2 changes: 1 addition & 1 deletion airflow/providers/http/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.7.0
- apache-airflow>=2.7.1
# The 2.26.0 release of requests got rid of the chardet LGPL mandatory dependency, allowing us to
# release it as a requirement for airflow
- requests>=2.27.0,<3
Expand Down
6 changes: 1 addition & 5 deletions airflow/providers/http/sensors/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import TYPE_CHECKING, Any, Callable, Sequence

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.triggers.http import HttpSensorTrigger
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -151,10 +151,6 @@ def poke(self, context: Context) -> bool:
except AirflowException as exc:
if str(exc).startswith(self.response_error_codes_allowlist):
return False
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException from exc

raise exc

return True
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/sftp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
__version__ = "4.10.2"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.7.0"
"2.7.1"
):
raise RuntimeError(
f"The package `apache-airflow-providers-sftp:{__version__}` needs Apache Airflow 2.7.0+"
f"The package `apache-airflow-providers-sftp:{__version__}` needs Apache Airflow 2.7.1+"
)
2 changes: 1 addition & 1 deletion airflow/providers/sftp/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.7.0
- apache-airflow>=2.7.1
- apache-airflow-providers-ssh>=2.1.0
- paramiko>=2.9.0
- asyncssh>=2.12.0
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/sftp/sensors/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from paramiko.sftp import SFTP_NO_SUCH_FILE

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowSensorTimeout
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
Expand Down Expand Up @@ -98,10 +98,8 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
self.log.info("Found File %s last modified: %s", actual_file_to_check, mod_time)
except OSError as e:
if e.errno != SFTP_NO_SUCH_FILE:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException from e
raise e
# TODO: replace by AirflowPokeFailException when min_airflow_version is set to at least 2.10.0
raise AirflowSensorTimeout from e
continue

if self.newer_than:
Expand Down
Loading