Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dump cluster from gen_cluster #8823

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,9 +670,7 @@ def clean(self) -> WorkerState:
)
ws._occupancy_cache = self.occupancy

ws.executing = {
ts.key: duration for ts, duration in self.executing.items() # type: ignore
}
ws.executing = {ts.key: duration for ts, duration in self.executing.items()} # type: ignore
return ws

def __repr__(self) -> str:
Expand Down
95 changes: 0 additions & 95 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import logging
import os
import pathlib
import signal
import socket
Expand All @@ -16,7 +15,6 @@
from unittest import mock

import pytest
import yaml
from tornado import gen

import dask.config
Expand All @@ -41,7 +39,6 @@
check_process_leak,
check_thread_leak,
cluster,
dump_cluster_state,
ensure_no_new_clients,
freeze_batched_send,
gen_cluster,
Expand Down Expand Up @@ -439,40 +436,6 @@ async def ping_pong():
assert await fut == "pong"


@pytest.mark.slow()
def test_dump_cluster_state_timeout(tmp_path):
sleep_time = 30

async def inner_test(c, s, a, b):
await asyncio.sleep(sleep_time)

# This timeout includes cluster startup and teardown which sometimes can
# take a significant amount of time. For this particular test we would like
# to keep the _test timeout_ small because we intend to trigger it but the
# overall timeout large.
test = gen_cluster(client=True, timeout=5, cluster_dump_directory=tmp_path)(
inner_test
)
try:
with pytest.raises(asyncio.TimeoutError) as exc:
test()
assert "inner_test" in str(exc)
assert "await asyncio.sleep(sleep_time)" in str(exc)
except gen.TimeoutError:
pytest.xfail("Cluster startup or teardown took too long")

_, dirs, files = next(os.walk(tmp_path))
assert not dirs
assert files == [inner_test.__name__ + ".yaml"]
import yaml

with open(tmp_path / files[0], "rb") as fd:
state = yaml.load(fd, Loader=yaml.Loader)

assert "scheduler" in state
assert "workers" in state


def test_assert_story():
now = time()
story = [
Expand Down Expand Up @@ -558,64 +521,6 @@ async def test_assert_story_identity(c, s, a, strict):
assert_story(worker_story, scheduler_story, strict=strict)


@gen_cluster()
async def test_dump_cluster_state(s, a, b, tmp_path):
await dump_cluster_state(s, [a, b], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert out["workers"].keys() == {a.address, b.address}


@gen_cluster(nthreads=[])
async def test_dump_cluster_state_no_workers(s, tmp_path):
await dump_cluster_state(s, [], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert out["workers"] == {}


@gen_cluster(Worker=Nanny)
async def test_dump_cluster_state_nannies(s, a, b, tmp_path):
await dump_cluster_state(s, [a, b], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert out["workers"].keys() == s.workers.keys()


@gen_cluster()
async def test_dump_cluster_state_unresponsive_local_worker(s, a, b, tmp_path):
a.stop()
await dump_cluster_state(s, [a, b], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert isinstance(out["workers"][a.address], dict)
assert isinstance(out["workers"][b.address], dict)


@pytest.mark.slow
@gen_cluster(client=True, Worker=Nanny)
async def test_dump_cluster_unresponsive_remote_worker(c, s, a, b, tmp_path):
await c.run(lambda dask_worker: dask_worker.stop(), workers=[a.worker_address])

await dump_cluster_state(s, [a, b], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert isinstance(out["workers"][b.worker_address], dict)
assert out["workers"][a.worker_address].startswith(
"OSError('Timed out trying to connect to"
)


# Note: WINDOWS constant doesn't work with `mypy --platform win32`
if sys.platform == "win32":
TERM_SIGNALS = (signal.SIGTERM, signal.SIGINT)
Expand Down
65 changes: 6 additions & 59 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from dask.typing import Key

from distributed import Event, Scheduler, system
from distributed import versions as version_module
from distributed.batched import BatchedSend
from distributed.client import Client, _global_clients, default_client
from distributed.comm import Comm
Expand Down Expand Up @@ -878,7 +877,7 @@ def gen_cluster(
clean_kwargs: dict[str, Any] | None = None,
# FIXME: distributed#8054
allow_unclosed: bool = True,
cluster_dump_directory: str | Literal[False] = "test_cluster_dump",
cluster_dump_directory: str | Literal[False] = False,
) -> Callable[[Callable], Callable]:
from distributed import Client

Expand All @@ -901,6 +900,11 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture
start
end
"""
if cluster_dump_directory:
warnings.warn(
"The `cluster_dump_directory` argument is being ignored and will be removed in a future version.",
DeprecationWarning,
)
if nthreads is None:
nthreads = [
("127.0.0.1", 1),
Expand Down Expand Up @@ -1019,14 +1023,6 @@ async def async_fn():
# This stack indicates where the coro/test is suspended
task.print_stack(file=buffer)

if cluster_dump_directory:
await dump_cluster_state(
s=s,
ws=workers,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)

task.cancel()
while not task.cancelled():
await asyncio.sleep(0.01)
Expand All @@ -1048,18 +1044,6 @@ async def async_fn():
except pytest.xfail.Exception:
raise

except Exception:
if cluster_dump_directory and not has_pytestmark(
test_func, "xfail"
):
await dump_cluster_state(
s=s,
ws=workers,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)
raise

try:
c = default_client()
except ValueError:
Expand Down Expand Up @@ -1122,41 +1106,6 @@ async def async_fn_outer():
return _


async def dump_cluster_state(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to leave this functionality around but still disable it in gen_cluster. I'm not sure how much value there is to having dump_cluster_state around for diagnosing tricky deadlock situations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functionality itself is part of Client. I'm not entirely sure what this utility function offers but I'd rather not maintain it.

s: Scheduler, ws: list[ServerNode], output_dir: str, func_name: str
) -> None:
"""A variant of Client.dump_cluster_state, which does not rely on any of the below
to work:
- Having a client at all
- Client->Scheduler comms
- Scheduler->Worker comms (unless using Nannies)
"""
scheduler_info = s._to_dict()
workers_info: dict[str, Any]
versions_info = version_module.get_versions()

if not ws or isinstance(ws[0], Worker):
workers_info = {w.address: w._to_dict() for w in ws}
else:
workers_info = await s.broadcast(msg={"op": "dump_state"}, on_error="return")
workers_info = {
k: repr(v) if isinstance(v, Exception) else v
for k, v in workers_info.items()
}

state = {
"scheduler": scheduler_info,
"workers": workers_info,
"versions": versions_info,
}
os.makedirs(output_dir, exist_ok=True)
fname = os.path.join(output_dir, func_name) + ".yaml"
with open(fname, "w") as fh:
yaml.safe_dump(state, fh) # Automatically convert tuples to lists
print(f"Dumped cluster state to {fname}")


def validate_state(*servers: Scheduler | Worker | Nanny) -> None:
"""Run validate_state() on the Scheduler and all the Workers of the cluster.
Excludes workers wrapped by Nannies and workers manually started by the test.
Expand Down Expand Up @@ -1505,8 +1454,6 @@ def new_config_file(c: dict[str, Any]) -> Iterator[None]:
"""
Temporarily change configuration file to match dictionary *c*.
"""
import yaml

old_file = os.environ.get("DASK_CONFIG")
fd, path = tempfile.mkstemp(prefix="dask-config")
with os.fdopen(fd, "w") as f:
Expand Down
Loading