Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More dask features #959

Merged
merged 15 commits into from
Nov 16, 2023
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
executors: [multiprocessing, slurm, kubernetes]
executors: [multiprocessing, slurm, kubernetes, dask]
python-version: ["3.11", "3.10", "3.9", "3.8"]
defaults:
run:
Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
./kind load docker-image scalableminds/cluster-tools:latest

- name: Install dependencies (without docker)
if: ${{ matrix.executors == 'multiprocessing' || matrix.executors == 'kubernetes' }}
if: ${{ matrix.executors != 'slurm' }}
run: |
pip install -r ../requirements.txt
poetry install
Expand Down Expand Up @@ -130,6 +130,12 @@ jobs:
cd tests
PYTEST_EXECUTORS=kubernetes poetry run python -m pytest -sv test_all.py test_kubernetes.py

- name: Run dask tests
if: ${{ matrix.executors == 'dask' && matrix.python-version != '3.8' }}
run: |
cd tests
PYTEST_EXECUTORS=dask poetry run python -m pytest -sv test_all.py

webknossos_linux:
needs: changes
if: |
Expand Down
1 change: 0 additions & 1 deletion cluster_tools/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ For upgrade instructions, please check the respective *Breaking Changes* section
- The cluster address for the `DaskExecutor` can be configured via the `DASK_ADDRESS` env var. [#959](https://github.com/scalableminds/webknossos-libs/pull/959)

### Changed
- Upgrades mypy to 1.6. [#956](https://github.com/scalableminds/webknossos-libs/pull/956)
- Tasks using the `DaskExecutor` are run in their own process. This is required to not block the GIL for the dask worker to communicate with the scheduler. Env variables are propagated to the task processes. [#959](https://github.com/scalableminds/webknossos-libs/pull/959)

### Fixed
Expand Down
28 changes: 24 additions & 4 deletions cluster_tools/cluster_tools/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
TypeVar,
cast,
)
from weakref import ReferenceType, ref

from typing_extensions import ParamSpec

Expand Down Expand Up @@ -66,12 +67,25 @@ def _run_with_nanny(

def _parse_mem(size: str) -> int:
units = {"": 1, "K": 2**10, "M": 2**20, "G": 2**30, "T": 2**40}
m = re.match(r"^([\d\.]+)\s*([a-zA-Z]{0,3})$", str(size).strip())
assert m is not None
m = re.match(r"^([\d\.]+)\s*([kmgtKMGT]{0,1})$", str(size).strip())
assert m is not None, f"Could not parse {size}"
number, unit = float(m.group(1)), m.group(2).upper()
assert unit in units
return int(number * units[unit])


def _handle_kill_through_weakref(
executor_ref: "ReferenceType[DaskExecutor]",
existing_sigint_handler: Any,
signum: Optional[int],
frame: Any,
) -> None:
executor = executor_ref()
if executor is None:
return
executor.handle_kill(existing_sigint_handler, signum, frame)


class DaskExecutor(futures.Executor):
"""
The `DaskExecutor` allows to run workloads on a dask cluster.
Expand Down Expand Up @@ -113,7 +127,10 @@ def __init__(
# shutdown of the main process which sends SIGTERM signals to terminate all
# child processes.
existing_sigint_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, partial(self.handle_kill, existing_sigint_handler))
signal.signal(
signal.SIGINT,
partial(_handle_kill_through_weakref, ref(self), existing_sigint_handler),
)

@classmethod
def from_config(
Expand Down Expand Up @@ -224,7 +241,10 @@ def forward_log(self, fut: "Future[_T]") -> _T:
return fut.result()

def handle_kill(
self, existing_sigint_handler: Any, signum: Optional[int], frame: Any
self,
existing_sigint_handler: Any,
signum: Optional[int],
frame: Any,
) -> None:
if self.is_shutting_down:
return
Expand Down
18 changes: 17 additions & 1 deletion cluster_tools/cluster_tools/schedulers/cluster_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Union,
cast,
)
from weakref import ReferenceType, ref

from typing_extensions import ParamSpec

Expand All @@ -45,6 +46,18 @@
_S = TypeVar("_S")


def _handle_kill_through_weakref(
executor_ref: "ReferenceType[ClusterExecutor]",
existing_sigint_handler: Any,
signum: Optional[int],
frame: Any,
) -> None:
executor = executor_ref()
if executor is None:
return
executor.handle_kill(existing_sigint_handler, signum, frame)


def join_messages(strings: List[str]) -> str:
return " ".join(x.strip() for x in strings if x.strip())

Expand Down Expand Up @@ -130,7 +143,10 @@ def __init__(
# shutdown of the main process which sends SIGTERM signals to terminate all
# child processes.
existing_sigint_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, partial(self.handle_kill, existing_sigint_handler))
signal.signal(
signal.SIGINT,
partial(_handle_kill_through_weakref, ref(self), existing_sigint_handler),
)

self.meta_data = {}
assert not (
Expand Down
14 changes: 9 additions & 5 deletions cluster_tools/tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from distributed import LocalCluster

import cluster_tools
from cluster_tools.executors.dask import DaskExecutor


# "Worker" functions.
Expand Down Expand Up @@ -79,10 +78,14 @@ def get_executors(with_debug_sequential: bool = False) -> List[cluster_tools.Exe
executors.append(cluster_tools.get_executor("sequential"))
if "dask" in executor_keys:
if not _dask_cluster:
from distributed import LocalCluster
from distributed import LocalCluster, Worker

_dask_cluster = LocalCluster()
executors.append(cluster_tools.get_executor("dask", address=_dask_cluster))
_dask_cluster = LocalCluster(
worker_class=Worker, resources={"mem": 20e9, "cpus": 4}, nthreads=6
)
executors.append(
cluster_tools.get_executor("dask", job_resources={"address": _dask_cluster})
)
if "test_pickling" in executor_keys:
executors.append(cluster_tools.get_executor("test_pickling"))
if "pbs" in executor_keys:
Expand Down Expand Up @@ -328,7 +331,8 @@ def run_map(executor: cluster_tools.Executor) -> None:
assert list(result) == [4, 9, 16]

for exc in get_executors():
run_map(exc)
if not isinstance(exc, cluster_tools.DaskExecutor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this specific test excluded for the DaskExecutor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures of the DaskExecutor become invalid when the executor is closed. This makes this test invalid. I was thinking about removing this test or making this test fail for all executors (probably a bit of effort).
cc @philippotto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thoughts:

  • yes, we should strive for similar behavior between dask and slurm. --> (A) the futures should become invalid for the slurm context, too OR (B) (maybe I'd prefer this?) we wrap/copy the results in/into different future objects that survive the context termination. Or is there a benefit in letting the futures die? the copying could be done upon context exit.
  • if we do (A), this would be a breaking change (and likely needs fixing in vx etc). therefore, I'd tackle this in a separate PR.
  • either way, the test itself should not be removed without replacement. I think, what the test intends to assert is that the iterator that is returned by map contains futures that were kicked off before the iterator is consumed (read the comment here). essentially, this is covering an implementation detail, but the overall expected behavior is that that the map call eagerly submits all futures, but lazily awaits its results (so that they don't need to be in RAM all at once). the test exploits the use-futures-after-context-was-shutdown-behavior to test the eager submit (if it was not eager, the test would fail because the submit would be after context exit). if you remove that behavior, the eager submits should still be checked for in my opinion.

I hope this is somewhat comprehensible. If not, let's have a call 🤙

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe dask transfers the data lazily from the workers or scheduler. That doesn't work anymore, once the client closes. We could wrap the futures to eagerly collect the data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could wrap the futures to eagerly collect the data.

Yes, either this, or try to hook into the closing client (so that collection is done in the last moment where it's possible).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave that for a followup and merge this as is. Ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure 👍

run_map(exc)


def test_executor_args() -> None:
Expand Down