Skip to content

Commit

Permalink
refactor Executor type to protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
normanrz committed Sep 26, 2023
1 parent 049555a commit efc1e67
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 17 deletions.
1 change: 1 addition & 0 deletions cluster_tools/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section
- Added `DaskScheduler` (only Python >= 3.9). [#943](https://github.com/scalableminds/webknossos-libs/pull/943)

### Changed
- The exported `Executor` type is now implemented as a protocol. [#943](https://github.com/scalableminds/webknossos-libs/pull/943)

### Fixed

Expand Down
6 changes: 2 additions & 4 deletions cluster_tools/cluster_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Literal, Union, overload
from typing import Any, Literal, overload

from cluster_tools.executor_protocol import Executor
from cluster_tools.executors.dask import DaskExecutor
from cluster_tools.executors.debug_sequential import DebugSequentialExecutor
from cluster_tools.executors.multiprocessing_ import MultiprocessingExecutor
Expand Down Expand Up @@ -128,6 +129,3 @@ def get_executor(environment: str, **kwargs: Any) -> "Executor":
elif environment == "test_pickling":
return PickleExecutor(**kwargs)
raise Exception("Unknown executor: {}".format(environment))


Executor = Union[ClusterExecutor, MultiprocessingExecutor, DaskExecutor]
50 changes: 50 additions & 0 deletions cluster_tools/cluster_tools/executor_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from concurrent.futures import Future
from os import PathLike
from typing import Callable, Iterable, Iterator, List, Optional, Protocol, TypeVar

from typing_extensions import ParamSpec

_T = TypeVar("_T")
_P = ParamSpec("_P")
_S = TypeVar("_S")


class Executor(Protocol):
@classmethod
def as_completed(cls, futures: List["Future[_T]"]) -> Iterator["Future[_T]"]:
...

def submit(
self,
__fn: Callable[_P, _T],
/,
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Future[_T]":
...

def map_unordered(self, fn: Callable[[_S], _T], args: Iterable[_S]) -> Iterator[_T]:
...

def map_to_futures(
self,
fn: Callable[[_S], _T],
args: Iterable[_S],
output_pickle_path_getter: Optional[Callable[[_S], PathLike]],
) -> List["Future[_T]"]:
...

def map(
self,
fn: Callable[[_S], _T],
iterables: Iterable[_S],
timeout: Optional[float],
chunksize: int,
) -> Iterator[_T]:
...

def forward_log(self, fut: "Future[_T]") -> _T:
...

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
...
6 changes: 3 additions & 3 deletions cluster_tools/cluster_tools/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ def map_to_futures(

return futs

def map(
def map( # type: ignore[override]
self,
fn: Callable[[_S], _T],
*iterables: Iterable[Any],
iterables: Iterable[Any],
timeout: Optional[float] = None,
chunksize: int = 1,
) -> Iterator[_T]:
return iter(
list(super().map(fn, *iterables, timeout=timeout, chunksize=chunksize))
list(super().map(fn, [iterables], timeout=timeout, chunksize=chunksize))
)

def forward_log(self, fut: "Future[_T]") -> _T:
Expand Down
9 changes: 9 additions & 0 deletions cluster_tools/cluster_tools/executors/multiprocessing_.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ def submit( # type: ignore[override]
enrich_future_with_uncaught_warning(fut)
return fut

def map( # type: ignore[override]
self,
fn: Callable[[_S], _T],
iterables: Iterable[Any],
timeout: Optional[float] = None,
chunksize: int = 1,
) -> Iterator[_T]:
return super().map(fn, [iterables], timeout=timeout, chunksize=chunksize)

def _submit_via_io(
self,
__fn: Callable[_P, _T],
Expand Down
26 changes: 19 additions & 7 deletions cluster_tools/cluster_tools/executors/pickle_.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
from concurrent.futures import Future
from typing import Any, Callable, TypeVar
from functools import partial
from typing import Callable, TypeVar

from typing_extensions import ParamSpec

from cluster_tools._utils import pickling
from cluster_tools.executors.multiprocessing_ import MultiprocessingExecutor

# The module name includes a _-suffix to avoid name clashes with the standard library pickle module.

_T = TypeVar("_T")
_P = ParamSpec("_P")
_S = TypeVar("_S")


def _pickle_identity(obj: _T) -> _T:
def _pickle_identity(obj: _S) -> _S:
return pickling.loads(pickling.dumps(obj))


def _pickle_identity_executor(fn: Callable[..., _T], *args: Any, **kwargs: Any) -> _T:
def _pickle_identity_executor(
fn: Callable[_P, _T],
*args: _P.args,
**kwargs: _P.kwargs,
) -> _T:
result = fn(*args, **kwargs)
return _pickle_identity(result)

Expand All @@ -27,13 +36,16 @@ class PickleExecutor(MultiprocessingExecutor):

def submit( # type: ignore[override]
self,
fn: Callable[..., _T],
*args: Any,
**kwargs: Any,
fn: Callable[_P, _T],
/,
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Future[_T]":
(fn_pickled, args_pickled, kwargs_pickled) = _pickle_identity(
(fn, args, kwargs)
)
return super().submit(
_pickle_identity_executor, fn_pickled, *args_pickled, **kwargs_pickled
partial(_pickle_identity_executor, fn_pickled),
*args_pickled,
**kwargs_pickled,
)
3 changes: 1 addition & 2 deletions cluster_tools/cluster_tools/schedulers/cluster_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,9 @@ def shutdown(self, wait: bool = True, cancel_futures: bool = True) -> None:
pass
self.files_to_clean_up = []

# TODO: args should be *iterables, this would be a breaking change!
def map( # type: ignore[override]
self,
fn: Callable[_P, _T],
fn: Callable[[_S], _T],
args: Iterable[Any],
timeout: Optional[float] = None,
chunksize: Optional[int] = None,
Expand Down
1 change: 1 addition & 0 deletions webknossos/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ For upgrade instructions, please check the respective _Breaking Changes_ section
[Commits](https://github.com/scalableminds/webknossos-libs/compare/v0.13.6...HEAD)

### Breaking Changes
- `wait_and_ensure_success` from `webknossos.utils` now requires a `executor` argument. [#943](https://github.com/scalableminds/webknossos-libs/pull/943)

### Added

Expand Down
2 changes: 1 addition & 1 deletion webknossos/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit efc1e67

Please sign in to comment.