Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: skeleton for PythonOperator #16

Closed
wants to merge 0 commits into from
Closed

Conversation

YeungOnion
Copy link

Looking for some feedback.

wrote few tests for testing builtin and importable with failure

  • __call__ is unimplemented, need to know more about behavior, discuss in Python Operator #12
  • where should failure occur if the callable wouldn't work regardless of the arguments provided?

@YeungOnion YeungOnion marked this pull request as draft February 25, 2024 19:18
@YeungOnion YeungOnion changed the title WIP: skeleton for PythonOperator, related to issue #12 WIP: skeleton for PythonOperator Feb 25, 2024
@YeungOnion
Copy link
Author

Hi, @kreczko could I get some of your input here?

I've got an implementation, but I'm unsure if it's a good way to go. Additionally, I'm not sure how the output / side effects of __call__ should be captured, print-based logging isn't great, but it should work, but should it be passed back to the main process? Comparing to the BashOperator, the output is a dictionary of stdout, stderr, and exit code, how should it work for PythonOperator?

@kreczko
Copy link
Contributor

kreczko commented Mar 25, 2024

Comparing to the BashOperator, the output is a dictionary of stdout, stderr, and exit code, how should it work for PythonOperator?

For now, we can do things slightly different to BashOperator - we will normalize outputs later.
With python there are three main cases:

  1. Return value directly
  2. python function outputs to stdout or stderr
  3. An error occurs.

The dictionary return I would propose:

{
  'stderr':  <captured stderr>,
  'stdout':  <captured stdout>,
  'data': <return value>,
  'error': {
    'msg': <error message>,
    'type': <e.g. type of exception>
}

You can capture the stdout and stderr via StringIO and the return value directly (see example).
For error message you will need to capture the exception.

BTW: Instead of using exec, it would be better to try and import the python function directly.
You can use a similar block to https://github.com/FAST-HEP/fasthep-flow/blob/main/src/fasthep_flow/config.py#L32 but will have to search the builtins as well, e.g.

import builtins
builtin_functions = dir(builtins)

if callable_str in builtin_functions
   callable = getattr(builtins, callable_str)
else:
  # use importlib

@YeungOnion
Copy link
Author

Thanks for the suggestion on the direction to go in for output. I've got a question about the return from calling PythonOperator,

  1. Return value directly
  2. python function outputs to stdout or stderr
  3. An error occurs.

Would you expect to call a function that has more than one of these possibilities?

I ask because if it's the case that they will always be exclusive, then a dictionary seems like it's unneeded flexibility. Perhaps a match would work well, mypy can test exhaustiveness as well.

Additionally, I think it may be useful for later generalizing logging beyond cases of standard out and standard error.

Those are my thoughts, let me know what you think.


Thanks for examples to not rely on exec! Seems better this way, keep the python definitions in code and the analysis description in the YAML.

@kreczko
Copy link
Contributor

kreczko commented Mar 28, 2024

Would you expect to call a function that has more than one of these possibilities?

In principle - yes. In practise I would want to avoid that.

I ask because if it's the case that they will always be exclusive, then a dictionary seems like it's unneeded flexibility. Perhaps a match would work well, mypy can test exhaustiveness as well.

I guess the different test cases could be:

  • print("Hello world") → stdout
  • `sum([1, 2]) → return value
  • requests.get("https://status.example.org/")requests.exceptions.ConnectionError

Additionally, I think it may be useful for later generalizing logging beyond cases of standard out and standard error.

I am curious to see the implementation. Happy to try something I did not think of :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants