Skip to content

Commit

Permalink
upgrades to mypy 1.6 (#956)
Browse files Browse the repository at this point in the history
* upgrades to mypy 1.6

* pr feedback

* changelog
  • Loading branch information
normanrz authored Oct 24, 2023
1 parent d44a484 commit 3b484c8
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 101 deletions.
2 changes: 2 additions & 0 deletions cluster_tools/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ For upgrade instructions, please check the respective *Breaking Changes* section
### Added

### Changed
- Upgrades mypy to 1.6. [#956](https://github.com/scalableminds/webknossos-libs/pull/956)


### Fixed

Expand Down
11 changes: 7 additions & 4 deletions cluster_tools/cluster_tools/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@ def submit( # type: ignore[override]
]
del kwargs["__cfut_options"]

__fn = partial(
MultiprocessingExecutor._execute_and_persist_function,
output_pickle_path,
__fn,
__fn = cast(
Callable[_P, _T],
partial(
MultiprocessingExecutor._execute_and_persist_function,
output_pickle_path,
__fn,
),
)
fut = self.client.submit(partial(__fn, *args, **kwargs))

Expand Down
23 changes: 7 additions & 16 deletions cluster_tools/cluster_tools/executors/debug_sequential.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from concurrent.futures import Future
from pathlib import Path
from typing import Callable, TypeVar, cast

from typing_extensions import ParamSpec
Expand All @@ -23,31 +24,21 @@ def submit( # type: ignore[override]
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Future[_T]":
fut: "Future[_T]" = Future()
if "__cfut_options" in kwargs:
output_pickle_path = cast(CFutDict, kwargs["__cfut_options"])[
"output_pickle_path"
]
del kwargs["__cfut_options"]
fut = self._blocking_submit(
MultiprocessingExecutor._execute_and_persist_function, # type: ignore[arg-type]
output_pickle_path, # type: ignore[arg-type]
__fn, # type: ignore[arg-type]
result = MultiprocessingExecutor._execute_and_persist_function(
Path(output_pickle_path),
__fn,
*args,
**kwargs,
)
else:
fut = self._blocking_submit(__fn, *args, **kwargs)

enrich_future_with_uncaught_warning(fut)
return fut
result = __fn(*args, **kwargs)

def _blocking_submit(
self,
__fn: Callable[_P, _T],
*args: _P.args,
**kwargs: _P.kwargs,
) -> "Future[_T]":
fut: "Future[_T]" = Future()
result = __fn(*args, **kwargs)
fut.set_result(result)
enrich_future_with_uncaught_warning(fut)
return fut
41 changes: 21 additions & 20 deletions cluster_tools/cluster_tools/executors/multiprocessing_.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,37 +113,38 @@ def submit( # type: ignore[override]
else:
submit_fn = super().submit # type: ignore[assignment]

# Depending on the start_method and output_pickle_path, wrapper functions may need to be
# Depending on the start_method and output_pickle_path, setup functions may need to be
# executed in the new process context, before the actual code is ran.
# These wrapper functions consume their arguments from *args, **kwargs and assume
# that the next argument will be another function that is then called.
# The call_stack holds all of these wrapper functions and their arguments in the correct order.
# For example, call_stack = [wrapper_fn_1, wrapper_fn_1_arg_1, wrapper_fn_2, actual_fn, actual_fn_arg_1]
# where wrapper_fn_1 is called, which eventually calls wrapper_fn_2, which eventually calls actual_fn.
call_stack: List[Callable] = []
# that the next and last argument will be another function that is then called.
# Eventually, the actually submitted function will be called.

if output_pickle_path is not None:
__fn = cast(
Callable[_P, _T],
partial(
MultiprocessingExecutor._execute_and_persist_function,
Path(output_pickle_path),
__fn,
),
)

if self._mp_logging_handler_pool is not None:
# If a start_method other than the default "fork" is used, logging needs to be re-setup,
# because the programming context is not inherited in those cases.
multiprocessing_logging_setup_fn = (
self._mp_logging_handler_pool.get_multiprocessing_logging_setup_fn()
)
call_stack.append(
__fn = cast(
Callable[_P, _T],
partial(
MultiprocessingExecutor._setup_logging_and_execute,
multiprocessing_logging_setup_fn,
)
)

if output_pickle_path is not None:
call_stack.append(
partial(
MultiprocessingExecutor._execute_and_persist_function,
output_pickle_path,
)
__fn,
),
)

fut = submit_fn(*call_stack, __fn, *args, **kwargs)
fut = submit_fn(__fn, *args, **kwargs)

enrich_future_with_uncaught_warning(fut)
return fut
Expand Down Expand Up @@ -173,7 +174,7 @@ def _submit_via_io(

output_pickle_path = Path(dirpath) / "jobdescription.pickle"

with open(output_pickle_path, "wb") as file:
with output_pickle_path.open("wb") as file:
pickling.dump((__fn, args, kwargs), file)

future = super().submit(
Expand Down Expand Up @@ -208,7 +209,7 @@ def _execute_via_io(serialized_function_info_path: os.PathLike) -> Any:

@staticmethod
def _execute_and_persist_function(
output_pickle_path: os.PathLike,
output_pickle_path: Path,
fn: Callable[_P, _T],
*args: _P.args,
**kwargs: _P.kwargs,
Expand All @@ -226,7 +227,7 @@ def _execute_and_persist_function(
# disk. However, the output will have a .preliminary prefix at first
# which is only removed in the success case so that a checkpoint at
# the desired target only exists if the job was successful.
with open(output_pickle_path, "wb") as file:
with output_pickle_path.open("wb") as file:
pickling.dump((True, result), file)
return result

Expand Down
58 changes: 29 additions & 29 deletions cluster_tools/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions webknossos/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ For upgrade instructions, please check the respective _Breaking Changes_ section
### Added

### Changed
- Upgrades mypy to 1.6. [#956](https://github.com/scalableminds/webknossos-libs/pull/956)


### Fixed

Expand Down
Loading

0 comments on commit 3b484c8

Please sign in to comment.