Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
normanrz committed Oct 24, 2023
1 parent d5d5f5e commit a9f30ab
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions cluster_tools/cluster_tools/executors/multiprocessing_.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,22 @@ def submit( # type: ignore[override]
else:
submit_fn = super().submit # type: ignore[assignment]

# Depending on the start_method and output_pickle_path, setup functions may need to be
# executed in the new process context, before the actual code is ran.
# These wrapper functions consume their arguments from *args, **kwargs and assume
# that the next and last argument will be another function that is then called.
# Eventually, the actually submitted function will be called.

if output_pickle_path is not None:
__fn = cast(
Callable[_P, _T],
partial(
MultiprocessingExecutor._execute_and_persist_function,
Path(output_pickle_path),
__fn,
),
)

if self._mp_logging_handler_pool is not None:
# If a start_method other than the default "fork" is used, logging needs to be re-setup,
# because the programming context is not inherited in those cases.
Expand All @@ -128,16 +144,6 @@ def submit( # type: ignore[override]
),
)

if output_pickle_path is not None:
__fn = cast(
Callable[_P, _T],
partial(
MultiprocessingExecutor._execute_and_persist_function,
Path(output_pickle_path),
__fn,
),
)

fut = submit_fn(__fn, *args, **kwargs)

enrich_future_with_uncaught_warning(fut)
Expand Down

0 comments on commit a9f30ab

Please sign in to comment.