Skip to content

Commit

Permalink
Clean up executors (#1193)
Browse files Browse the repository at this point in the history
* Clean up executors and make sequential executor truly synchronous.

Rename "test_pickling" to "multiprocessing_with_pickling" and "debug_sequential"
to "sequential_with_pickling". No longer use a multiprocessing pool of size 1
for "sequential" executor by replacing it with the former "debug_sequential" executor.

Clean up and speed up multiprocessing and non-scheduling executor tests.

* Remove MULTIPROCESSING_VIA_IO which was only needed for python <3.8

* Make Future forward references normal types

* Remove unused map_unordered methods

* Remove todo comment

* clean up tests some more

* Revert "Remove unused map_unordered methods"

This reverts commit f0b0dbe.

* Reapply "Remove unused map_unordered methods"

This reverts commit 98929e8.

* Fix slurm test by indirectly parametrizing the tests. Also update pytest to the newest version.

* Fix linting and typing

* avoid flaky tests

* Apply suggestions from code review

Co-authored-by: Philipp Otto <[email protected]>

* PR feedback: Add deprecation warning for renamed executor strategies. Try to combat test flakiness for slurm sleep tests

* update changelog

* Apply PR feedback for execution order tests

---------

Co-authored-by: Philipp Otto <[email protected]>
  • Loading branch information
daniel-wer and philippotto authored Nov 27, 2024
1 parent f24f24c commit 1b9a068
Show file tree
Hide file tree
Showing 21 changed files with 376 additions and 429 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jobs:
if: ${{ matrix.executors == 'multiprocessing' }}
run: |
cd tests
PYTEST_EXECUTORS=multiprocessing,sequential,test_pickling,debug_sequential \
PYTEST_EXECUTORS=multiprocessing,sequential,multiprocessing_with_pickling,sequential_with_pickling \
uv run --frozen python -m pytest -sv test_all.py test_multiprocessing.py
- name: Run slurm tests
Expand Down
3 changes: 3 additions & 0 deletions cluster_tools/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ For upgrade instructions, please check the respective *Breaking Changes* section
[Commits](https://github.com/scalableminds/webknossos-libs/compare/v0.15.11...HEAD)

### Breaking Changes
- Removed the `map_unordered` function of executors. [#1193](https://github.com/scalableminds/webknossos-libs/pull/1193)

### Added

### Changed
- Deprecated the test_pickling and debug_sequential executor strategies. The strategies multiprocessing_with_pickling and sequential should be used instead. [#1193](https://github.com/scalableminds/webknossos-libs/pull/1193)
- The sequential executor strategy no longer uses multiprocessing functionality internally and instead executes functions sequentially and synchronously in the same process. [#1193](https://github.com/scalableminds/webknossos-libs/pull/1193)

### Fixed

Expand Down
47 changes: 38 additions & 9 deletions cluster_tools/cluster_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import warnings
from typing import Any, Literal, overload

from cluster_tools.executor_protocol import Executor
from cluster_tools.executors.dask import DaskExecutor
from cluster_tools.executors.debug_sequential import DebugSequentialExecutor
from cluster_tools.executors.multiprocessing_ import MultiprocessingExecutor
from cluster_tools.executors.pickle_ import PickleExecutor
from cluster_tools.executors.multiprocessing_pickle import MultiprocessingPickleExecutor
from cluster_tools.executors.sequential import SequentialExecutor
from cluster_tools.executors.sequential_pickle import SequentialPickleExecutor
from cluster_tools.schedulers.cluster_executor import (
ClusterExecutor, # noqa: F401 `cluster_tools.schedulers.cluster_executor.ClusterExecutor` imported but unused;
RemoteOutOfMemoryException, # noqa: F401 `cluster_tools.schedulers.cluster_executor.ClusterExecutor` imported but unused;
Expand All @@ -16,7 +17,7 @@
from cluster_tools.schedulers.pbs import PBSExecutor
from cluster_tools.schedulers.slurm import SlurmExecutor

# For backwards-compatibility:
# For backwards-compatibility, remove in version 2.0:
WrappedProcessPoolExecutor = MultiprocessingExecutor


Expand Down Expand Up @@ -77,6 +78,18 @@ def get_executor(
) -> MultiprocessingExecutor: ...


@overload
def get_executor(
environment: Literal["multiprocessing_with_pickling"], **kwargs: Any
) -> MultiprocessingPickleExecutor: ...


@overload
def get_executor(
environment: Literal["test_pickling"], **kwargs: Any
) -> MultiprocessingPickleExecutor: ...


@overload
def get_executor(
environment: Literal["sequential"], **kwargs: Any
Expand All @@ -86,13 +99,13 @@ def get_executor(
@overload
def get_executor(
environment: Literal["debug_sequential"], **kwargs: Any
) -> DebugSequentialExecutor: ...
) -> SequentialExecutor: ...


@overload
def get_executor(
environment: Literal["test_pickling"], **kwargs: Any
) -> PickleExecutor: ...
environment: Literal["sequential_with_pickling"], **kwargs: Any
) -> SequentialPickleExecutor: ...


def get_executor(environment: str, **kwargs: Any) -> "Executor":
Expand All @@ -116,8 +129,24 @@ def get_executor(environment: str, **kwargs: Any) -> "Executor":
return MultiprocessingExecutor(**kwargs)
elif environment == "sequential":
return SequentialExecutor(**kwargs)
elif environment == "debug_sequential":
return DebugSequentialExecutor(**kwargs)
elif environment == "sequential_with_pickling":
return SequentialPickleExecutor(**kwargs)
elif environment == "multiprocessing_with_pickling":
return MultiprocessingPickleExecutor(**kwargs)
elif environment == "test_pickling":
return PickleExecutor(**kwargs)
# For backwards-compatibility, remove in version 2.0:
warnings.warn(
"The test_pickling execution strategy is deprecated and will be removed in version 2.0. Use multiprocessing_with_pickling instead.",
DeprecationWarning,
stacklevel=2,
)
return MultiprocessingPickleExecutor(**kwargs)
elif environment == "debug_sequential":
# For backwards-compatibility, remove in version 2.0:
warnings.warn(
"The debug_sequential execution strategy is deprecated and will be removed in version 2.0. Use sequential instead.",
DeprecationWarning,
stacklevel=2,
)
return SequentialExecutor(**kwargs)
raise Exception("Unknown executor: {}".format(environment))
12 changes: 4 additions & 8 deletions cluster_tools/cluster_tools/executor_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,22 @@

class Executor(Protocol, ContextManager["Executor"]):
@classmethod
def as_completed(cls, futures: List["Future[_T]"]) -> Iterator["Future[_T]"]: ...
def as_completed(cls, futures: List[Future[_T]]) -> Iterator[Future[_T]]: ...

def submit(
self,
__fn: Callable[_P, _T],
/,
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Future[_T]": ...

def map_unordered(
self, fn: Callable[[_S], _T], args: Iterable[_S]
) -> Iterator[_T]: ...
) -> Future[_T]: ...

def map_to_futures(
self,
fn: Callable[[_S], _T],
args: Iterable[_S],
output_pickle_path_getter: Optional[Callable[[_S], PathLike]] = None,
) -> List["Future[_T]"]: ...
) -> List[Future[_T]]: ...

def map(
self,
Expand All @@ -49,6 +45,6 @@ def map(
chunksize: Optional[int] = None,
) -> Iterator[_T]: ...

def forward_log(self, fut: "Future[_T]") -> _T: ...
def forward_log(self, fut: Future[_T]) -> _T: ...

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: ...
20 changes: 4 additions & 16 deletions cluster_tools/cluster_tools/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def from_config(
return cls(client, job_resources=job_resources)

@classmethod
def as_completed(cls, futures: List["Future[_T]"]) -> Iterator["Future[_T]"]:
def as_completed(cls, futures: List[Future[_T]]) -> Iterator[Future[_T]]:
from distributed import as_completed

return as_completed(futures)
Expand All @@ -172,7 +172,7 @@ def submit( # type: ignore[override]
__fn: Callable[_P, _T],
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Future[_T]":
) -> Future[_T]:
if "__cfut_options" in kwargs:
output_pickle_path = cast(CFutDict, kwargs["__cfut_options"])[
"output_pickle_path"
Expand Down Expand Up @@ -236,26 +236,14 @@ def check_resources(
enrich_future_with_uncaught_warning(fut)
return fut

def map_unordered(self, fn: Callable[[_S], _T], args: Iterable[_S]) -> Iterator[_T]:
futs: List["Future[_T]"] = self.map_to_futures(fn, args)

# Return a separate generator to avoid that map_unordered
# is executed lazily (otherwise, jobs would be submitted
# lazily, as well).
def result_generator() -> Iterator:
for fut in self.as_completed(futs):
yield fut.result()

return result_generator()

def map_to_futures(
self,
fn: Callable[[_S], _T],
args: Iterable[
_S
], # TODO change: allow more than one arg per call # noqa FIX002 Line contains TODO
output_pickle_path_getter: Optional[Callable[[_S], os.PathLike]] = None,
) -> List["Future[_T]"]:
) -> List[Future[_T]]:
if output_pickle_path_getter is not None:
futs = [
self.submit( # type: ignore[call-arg]
Expand Down Expand Up @@ -283,7 +271,7 @@ def map( # type: ignore[override]
chunksize = 1
return super().map(fn, iterables, timeout=timeout, chunksize=chunksize)

def forward_log(self, fut: "Future[_T]") -> _T:
def forward_log(self, fut: Future[_T]) -> _T:
return fut.result()

def handle_kill(
Expand Down
44 changes: 0 additions & 44 deletions cluster_tools/cluster_tools/executors/debug_sequential.py

This file was deleted.

Loading

0 comments on commit 1b9a068

Please sign in to comment.