Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

full refacto sensors error management #41047

Closed
wants to merge 8 commits into from

Conversation

raphaelauv
Copy link
Contributor

@raphaelauv raphaelauv commented Jul 26, 2024

fix: #40787

  • introduce fail_policy parameter

  • deprecate soft_fail and silent_fail parameters

  • introduce the SKIP_ON_ANY_ERROR

  • replace soft_fail by SKIP_ON_TIMEOUT that is no more ignoring technical errors

  • replace silent_fail by IGNORE_ERROR

why ? cause we currently can't trust the soft_fail parameters for at least once delivery since sensor can be skipped on any technical errors with soft_fail ( that mean we can skip data that was available or present )

@raphaelauv raphaelauv changed the title full refacto sensors full refacto sensors error management Jul 26, 2024
@raphaelauv
Copy link
Contributor Author

raphaelauv commented Jul 26, 2024

@potiuk should I make the warning conditional to the minimal airflow-core compatible version of the package of the super class ?

it yes , do you have an elegant idea how ? thanks

( but doing so isn't breaking the hierarchical OOP ? )

@potiuk
Copy link
Member

potiuk commented Jul 26, 2024

Yes. You cannot have a warning for 2.7-2.9 and do not have an option to remove the warning other than upgrading.

Yes the idea is simple. Check airlfow version and do it conditionally. Every provider already does it in __init__.py

from airflow import __version__ as airflow_version

__all__ = ["__version__"]

__version__ = "1.3.2"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
    "2.8.0"
):
    raise RuntimeError(
        f"The package `apache-airflow-providers-common-io:{__version__}` needs Apache Airflow 2.8.0+"
    )

@raphaelauv raphaelauv force-pushed the fix_full_refacto_sensors branch from fd66a10 to f3e233b Compare July 26, 2024 16:07
@raphaelauv
Copy link
Contributor Author

thanks @potiuk for the guidance ,but I still don't know how to do that in the BaseSensorOperator. I'm quit lost 😅

@potiuk
Copy link
Member

potiuk commented Jul 26, 2024

You can't do it in BaseOperator. The compatibility code shoudl be implemented in those operators that gets incompatible (because eventually you will have - say - 2.7.1 BaseSensor and the new - say Http - Operator.

We recently added common.compat provider for that purpose - so such common code (raising warnings) could be placed there and called from each operator. In this case that code should raise the warning on 2.10 and not raise warning on <2.10. This way we avoid logic copy-pasting between providers.

Then each of those provider will have to depend on the next version of the common.compat, in order to make sure that code is available.

For example we have get_hook_lineage_collector added in 2.10 : https://github.com/apache/airflow/blob/main/airflow/providers/common/compat/lineage/hook.py#L20 and a number of providers using it import it (until they reach 2.10 minimum level) from there

from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

And the aws provider has a dependency on common.compat: 1.1.0

- apache-airflow-providers-common-compat>=1.1.0

The 1.1.0 compat has not been released yet https://airflow.apache.org/docs/apache-airflow-providers-common-compat/stable/index.html - so you can still add the code to common.compat and set all the providers to depend on 1.1.0 of common compat.

@raphaelauv
Copy link
Contributor Author

but that mean only operators from official providers of the apache-airflow projects will show the warnings , is not that a problem ?

@potiuk
Copy link
Member

potiuk commented Jul 26, 2024

but that mean only operators from official providers of the apache-airflow projects will show the warnings , is not that a problem ?

Well. It's logically impossible to influence behaviour of already released airflow version. If you have 3ed-party operator - you have absolutely no way to influence their beheaviour on past airflow version.

@raphaelauv
Copy link
Contributor Author

We could create a new DeprecatedAirflowCoreException that we ignore in unit tests and that get show to users of "old" providers versions

WDYT ?

and if no , does my commit "first try" is what you expect ? Thanks

@potiuk
Copy link
Member

potiuk commented Jul 26, 2024

The code Ilooks good - just make it into common.compat provider and use it from there for every provider that is affected (and add dependency to "apache-airflow-provder-common-compat" for those providers.

@raphaelauv
Copy link
Contributor Author

raphaelauv commented Jul 26, 2024

Okay .Also all the providers are affected , could you confirm me that the design is okay before I do the big refacto , thanks

cause I would need to put that lines in every sensors -> https://github.com/apache/airflow/blob/f5b2364b4a5d965d07da0f79f834507f716c5de1/airflow/providers/sftp/sensors/sftp.py#L70

and that mean not official providers will not have the warning

@raphaelauv raphaelauv force-pushed the fix_full_refacto_sensors branch from f5b2364 to 26b74ea Compare July 28, 2024 00:01
@@ -32,8 +32,8 @@
__version__ = "8.26.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.7.0"
"2.7.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you change the min version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cause I removed

# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1

@raphaelauv
Copy link
Contributor Author

raphaelauv commented Jul 28, 2024

Okay .Also all the providers are affected , could you confirm me that the design is okay before I do the big refacto , thanks

cause I would need to put that lines in every sensors ->

https://github.com/apache/airflow/blob/f5b2364b4a5d965d07da0f79f834507f716c5de1/airflow/providers/sftp/sensors/sftp.py#L70

and that mean not official providers will not have the warning

I made a commit other design proposition that have multiple advantages :

  • no need to add provider common.compat in dependancy of all providers
  • no need to add a proactive call to all existing sensors to _is_at_least_2_10_0()
  • all non-official providers will show a warning from airflow 2.10.0
  • no need to convert all sensors to the new parameters right now , a big band is error prone

wdyt @eladkal @potiuk thanks

@raphaelauv raphaelauv force-pushed the fix_full_refacto_sensors branch 2 times, most recently from 824acb9 to 4285487 Compare July 29, 2024 22:22
@raphaelauv raphaelauv force-pushed the fix_full_refacto_sensors branch from f9a21c7 to caaa606 Compare July 30, 2024 23:12
@raphaelauv
Copy link
Contributor Author

Gentle ping @eladkal @potiuk , thanks again for your review

@potiuk potiuk force-pushed the fix_full_refacto_sensors branch from caaa606 to d6314b4 Compare August 1, 2024 23:37
@potiuk
Copy link
Member

potiuk commented Aug 1, 2024

Generally - we rarely look at changes that have errors :) - and here I guess the problem is still back-compatibility ( I rebased to double-check)

@raphaelauv raphaelauv force-pushed the fix_full_refacto_sensors branch from d6314b4 to 140de1b Compare August 2, 2024 08:44
@raphaelauv raphaelauv force-pushed the fix_full_refacto_sensors branch 2 times, most recently from 88598b2 to e01cc3b Compare August 2, 2024 10:49
@raphaelauv raphaelauv force-pushed the fix_full_refacto_sensors branch from e01cc3b to 20a55fe Compare August 3, 2024 14:45
@raphaelauv raphaelauv force-pushed the fix_full_refacto_sensors branch from 20a55fe to 7ae0db7 Compare August 3, 2024 15:21
@raphaelauv
Copy link
Contributor Author

Fixed the tests , could you please re-review thanks

@potiuk
Copy link
Member

potiuk commented Aug 5, 2024

Will take a look later today.

@potiuk
Copy link
Member

potiuk commented Aug 5, 2024

It looks good, but I would love to see if others have any concerns. That seems like a good one to leave a clean solution for Airflow 3 without deprecations, so it would be good to have it this way. @eladkal - WDYT? - also @kaxil -> that one looks like a "biggish" change in terms of DAG incompatibility, so maybe we should consider that one as a candidate to mention at the next dev call on Thursday?

@potiuk potiuk requested a review from kaxil August 5, 2024 12:21
@raphaelauv
Copy link
Contributor Author

raphaelauv commented Aug 5, 2024

thank @potiuk for the review , this PR is mostly about restoring how sensors where working since beginning before the "fix" of 2.7.1 https://github.com/apache/airflow/pull/33401/files

@potiuk
Copy link
Member

potiuk commented Aug 5, 2024

Partially yes. But we should assume that most people are at 2.7.1 + now (our providers are going soon to be 2.8.0+ compatible only) - and removing soft_fail in Airlfow 3 will mean that you will need to rewrite your dags that use it.

@raphaelauv
Copy link
Contributor Author

raphaelauv commented Aug 5, 2024

But we should assume that most people are at 2.7.1 + now
so most people suffer from this silent bug and should not trust soft_fail ( I had to remove soft_fail from all sensors and add a ShortCircuitOperator checking the status of previous task and check if it's a AirflowSensorTimeout ) ->

from airflow.exceptions import AirflowException
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.trigger_rule import TriggerRule
from pendulum import today
from airflow import DAG


def a():
    # return False # task_c will be skip
    raise ValueError("toto")  # task_c will not run
    # return True # task_c will run


def b(error: str | None):
    if error == "AirflowSensorTimeout":
        return False
    elif error == "None":
        return True
    else:
        raise AirflowException()


def sav_error(context):
    exception_name = context["exception"].__class__.__name__
    context.get('task_instance').xcom_push("error", exception_name)


with DAG(
        dag_id='example',
        schedule_interval='0 0 * * *',
        start_date=today("UTC").add(days=-1)):
    a = PythonSensor(task_id="a", python_callable=a, on_failure_callback=sav_error, timeout=5, poke_interval=5)

    b = ShortCircuitOperator(task_id="b", python_callable=b, trigger_rule=TriggerRule.ALL_DONE, op_kwargs={
        'error': "{{ ti.xcom_pull(task_ids='a', key='error') }}"})

    c = EmptyOperator(task_id="c")

    a >> b >> c

@raphaelauv
Copy link
Contributor Author

@potiuk what should we do ? thanks

@potiuk
Copy link
Member

potiuk commented Aug 13, 2024

@potiuk what should we do ? thanks

Rebase/ review - potentually simplify and merge it as breaking change to main (which is already Airflow 3). We likely won't backport it to 2.10

@raphaelauv
Copy link
Contributor Author

okay so I close for this PR #41463
that is based on main branch ( airflow 3 )

@raphaelauv raphaelauv closed this Aug 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

soft_fail | operator is skipped in all cases and not only "data" related fail
3 participants