diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a989aa7fe3..6d79c00e86 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -670,9 +670,7 @@ def clean(self) -> WorkerState: ) ws._occupancy_cache = self.occupancy - ws.executing = { - ts.key: duration for ts, duration in self.executing.items() # type: ignore - } + ws.executing = {ts.key: duration for ts, duration in self.executing.items()} # type: ignore return ws def __repr__(self) -> str: diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index c750e3faa5..6718060f7c 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -2,7 +2,6 @@ import asyncio import logging -import os import pathlib import signal import socket @@ -16,7 +15,6 @@ from unittest import mock import pytest -import yaml from tornado import gen import dask.config @@ -41,7 +39,6 @@ check_process_leak, check_thread_leak, cluster, - dump_cluster_state, ensure_no_new_clients, freeze_batched_send, gen_cluster, @@ -439,40 +436,6 @@ async def ping_pong(): assert await fut == "pong" -@pytest.mark.slow() -def test_dump_cluster_state_timeout(tmp_path): - sleep_time = 30 - - async def inner_test(c, s, a, b): - await asyncio.sleep(sleep_time) - - # This timeout includes cluster startup and teardown which sometimes can - # take a significant amount of time. For this particular test we would like - # to keep the _test timeout_ small because we intend to trigger it but the - # overall timeout large. - test = gen_cluster(client=True, timeout=5, cluster_dump_directory=tmp_path)( - inner_test - ) - try: - with pytest.raises(asyncio.TimeoutError) as exc: - test() - assert "inner_test" in str(exc) - assert "await asyncio.sleep(sleep_time)" in str(exc) - except gen.TimeoutError: - pytest.xfail("Cluster startup or teardown took too long") - - _, dirs, files = next(os.walk(tmp_path)) - assert not dirs - assert files == [inner_test.__name__ + ".yaml"] - import yaml - - with open(tmp_path / files[0], "rb") as fd: - state = yaml.load(fd, Loader=yaml.Loader) - - assert "scheduler" in state - assert "workers" in state - - def test_assert_story(): now = time() story = [ @@ -558,64 +521,6 @@ async def test_assert_story_identity(c, s, a, strict): assert_story(worker_story, scheduler_story, strict=strict) -@gen_cluster() -async def test_dump_cluster_state(s, a, b, tmp_path): - await dump_cluster_state(s, [a, b], str(tmp_path), "dump") - with open(f"{tmp_path}/dump.yaml") as fh: - out = yaml.safe_load(fh) - - assert out.keys() == {"scheduler", "workers", "versions"} - assert out["workers"].keys() == {a.address, b.address} - - -@gen_cluster(nthreads=[]) -async def test_dump_cluster_state_no_workers(s, tmp_path): - await dump_cluster_state(s, [], str(tmp_path), "dump") - with open(f"{tmp_path}/dump.yaml") as fh: - out = yaml.safe_load(fh) - - assert out.keys() == {"scheduler", "workers", "versions"} - assert out["workers"] == {} - - -@gen_cluster(Worker=Nanny) -async def test_dump_cluster_state_nannies(s, a, b, tmp_path): - await dump_cluster_state(s, [a, b], str(tmp_path), "dump") - with open(f"{tmp_path}/dump.yaml") as fh: - out = yaml.safe_load(fh) - - assert out.keys() == {"scheduler", "workers", "versions"} - assert out["workers"].keys() == s.workers.keys() - - -@gen_cluster() -async def test_dump_cluster_state_unresponsive_local_worker(s, a, b, tmp_path): - a.stop() - await dump_cluster_state(s, [a, b], str(tmp_path), "dump") - with open(f"{tmp_path}/dump.yaml") as fh: - out = yaml.safe_load(fh) - - assert out.keys() == {"scheduler", "workers", "versions"} - assert isinstance(out["workers"][a.address], dict) - assert isinstance(out["workers"][b.address], dict) - - -@pytest.mark.slow -@gen_cluster(client=True, Worker=Nanny) -async def test_dump_cluster_unresponsive_remote_worker(c, s, a, b, tmp_path): - await c.run(lambda dask_worker: dask_worker.stop(), workers=[a.worker_address]) - - await dump_cluster_state(s, [a, b], str(tmp_path), "dump") - with open(f"{tmp_path}/dump.yaml") as fh: - out = yaml.safe_load(fh) - - assert out.keys() == {"scheduler", "workers", "versions"} - assert isinstance(out["workers"][b.worker_address], dict) - assert out["workers"][a.worker_address].startswith( - "OSError('Timed out trying to connect to" - ) - - # Note: WINDOWS constant doesn't work with `mypy --platform win32` if sys.platform == "win32": TERM_SIGNALS = (signal.SIGTERM, signal.SIGINT) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 1fd59b5525..05cd9382b0 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -38,7 +38,6 @@ from dask.typing import Key from distributed import Event, Scheduler, system -from distributed import versions as version_module from distributed.batched import BatchedSend from distributed.client import Client, _global_clients, default_client from distributed.comm import Comm @@ -878,7 +877,7 @@ def gen_cluster( clean_kwargs: dict[str, Any] | None = None, # FIXME: distributed#8054 allow_unclosed: bool = True, - cluster_dump_directory: str | Literal[False] = "test_cluster_dump", + cluster_dump_directory: str | Literal[False] = False, ) -> Callable[[Callable], Callable]: from distributed import Client @@ -901,6 +900,11 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture start end """ + if cluster_dump_directory: + warnings.warn( + "The `cluster_dump_directory` argument is being ignored and will be removed in a future version.", + DeprecationWarning, + ) if nthreads is None: nthreads = [ ("127.0.0.1", 1), @@ -1019,14 +1023,6 @@ async def async_fn(): # This stack indicates where the coro/test is suspended task.print_stack(file=buffer) - if cluster_dump_directory: - await dump_cluster_state( - s=s, - ws=workers, - output_dir=cluster_dump_directory, - func_name=func.__name__, - ) - task.cancel() while not task.cancelled(): await asyncio.sleep(0.01) @@ -1048,18 +1044,6 @@ async def async_fn(): except pytest.xfail.Exception: raise - except Exception: - if cluster_dump_directory and not has_pytestmark( - test_func, "xfail" - ): - await dump_cluster_state( - s=s, - ws=workers, - output_dir=cluster_dump_directory, - func_name=func.__name__, - ) - raise - try: c = default_client() except ValueError: @@ -1122,41 +1106,6 @@ async def async_fn_outer(): return _ -async def dump_cluster_state( - s: Scheduler, ws: list[ServerNode], output_dir: str, func_name: str -) -> None: - """A variant of Client.dump_cluster_state, which does not rely on any of the below - to work: - - - Having a client at all - - Client->Scheduler comms - - Scheduler->Worker comms (unless using Nannies) - """ - scheduler_info = s._to_dict() - workers_info: dict[str, Any] - versions_info = version_module.get_versions() - - if not ws or isinstance(ws[0], Worker): - workers_info = {w.address: w._to_dict() for w in ws} - else: - workers_info = await s.broadcast(msg={"op": "dump_state"}, on_error="return") - workers_info = { - k: repr(v) if isinstance(v, Exception) else v - for k, v in workers_info.items() - } - - state = { - "scheduler": scheduler_info, - "workers": workers_info, - "versions": versions_info, - } - os.makedirs(output_dir, exist_ok=True) - fname = os.path.join(output_dir, func_name) + ".yaml" - with open(fname, "w") as fh: - yaml.safe_dump(state, fh) # Automatically convert tuples to lists - print(f"Dumped cluster state to {fname}") - - def validate_state(*servers: Scheduler | Worker | Nanny) -> None: """Run validate_state() on the Scheduler and all the Workers of the cluster. Excludes workers wrapped by Nannies and workers manually started by the test. @@ -1505,8 +1454,6 @@ def new_config_file(c: dict[str, Any]) -> Iterator[None]: """ Temporarily change configuration file to match dictionary *c*. """ - import yaml - old_file = os.environ.get("DASK_CONFIG") fd, path = tempfile.mkstemp(prefix="dask-config") with os.fdopen(fd, "w") as f: