diff --git a/src/prefect/flows.py b/src/prefect/flows.py index e378e0d00161..af9ae12b5f70 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -2116,6 +2116,48 @@ def _sanitize_and_load_flow( ) func_def.returns = None + # Remove flow decorator callback hooks + + def get_decorator_function_name(func_decorator): + """Retrieve decorator function name.""" + func_decorator = ( + func_decorator.func + if isinstance(func_decorator, ast.Call) + else func_decorator + ) + return ( + func_decorator.attr + if isinstance(func_decorator, ast.Attribute) + else func_decorator.id + ) + + if func_def.decorator_list: + # Keep only prefect flow decorator only + func_def.decorator_list = [ + func_decorator + for func_decorator in func_def.decorator_list + if get_decorator_function_name(func_decorator) == "flow" + ] + exclude_keyword_args = ( + "on_failure", + "on_completion", + "on_cancellation", + "on_crashed", + "on_running", + ) + + # Exclude callable type keyword arguments from flow decorator + for func_decorator in func_def.decorator_list: + if not hasattr(func_decorator, "keywords"): + continue + + func_decorator.keywords = list( + filter( + lambda keyword_arg: keyword_arg.arg not in exclude_keyword_args, + func_decorator.keywords, + ) + ) + # Attempt to compile the function without annotations and defaults that # can't be compiled try: diff --git a/tests/test_flows.py b/tests/test_flows.py index b68de0d96708..584771952720 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -5225,3 +5225,30 @@ def dog(): with pytest.raises(NameError, match="name 'not_a_function' is not defined"): safe_load_flow_from_entrypoint(entrypoint)() + + def test_remove_callback_hooks_for_missing_import(self, tmp_path: Path): + flow_source = dedent( + """ + from prefect import flow + + from non_existent import DEFAULT_NAME, DEFAULT_AGE + + def test_callback(): + print(DEFAULT_NAME) + def wrapper(*args, **kwargs): + pass + + return wrapper + + @flow(on_running=[test_callback()]) + def flow_function(name = DEFAULT_NAME, age = DEFAULT_AGE) -> str: + return name, age + """ + ) + + tmp_path.joinpath("flow.py").write_text(flow_source) + + entrypoint = f"{tmp_path.joinpath('flow.py')}:flow_function" + + result = safe_load_flow_from_entrypoint(entrypoint) + assert result is not None