diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 811f699c9ab..0a668f0a0ef 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2248,32 +2248,36 @@ async def test_closing_scheduler_closes_workers(s, a, b): assert time() < start + 2 -@gen_cluster( - client=True, nthreads=[("127.0.0.1", 1)], worker_kwargs={"resources": {"A": 1}} -) -async def test_resources_reset_after_cancelled_task(c, s, w): +@gen_cluster(client=True, nthreads=[("", 1)], worker_kwargs={"resources": {"A": 1}}) +async def test_resources_reset_after_cancelled_task(c, s, a): lock = Lock() + await lock.acquire() def block(lock): with lock: return - await lock.acquire() - future = c.submit(block, lock, resources={"A": 1}) + assert s.workers[a.address].used_resources == {"A": 0} + assert a.state.available_resources == {"A": 1} - while not w.state.executing_count: - await asyncio.sleep(0.01) + future = c.submit(block, lock, key="x", resources={"A": 1}) + await wait_for_state("x", "executing", a) + assert s.workers[a.address].used_resources == {"A": 1} + assert a.state.available_resources == {"A": 0} await future.cancel() - await lock.release() - - while w.state.executing_count: - await asyncio.sleep(0.01) + await wait_for_state("x", "cancelled", a) + assert s.workers[a.address].used_resources == {"A": 0} + assert a.state.available_resources == {"A": 0} - assert not s.workers[w.address].used_resources["A"] - assert w.state.available_resources == {"A": 1} + await lock.release() + await async_poll_for(lambda: not a.state.tasks, timeout=5) + assert s.workers[a.address].used_resources == {"A": 0} + assert a.state.available_resources == {"A": 1} - await c.submit(inc, 1, resources={"A": 1}) + assert await c.submit(inc, 1, resources={"A": 1}) == 2 + assert s.workers[a.address].used_resources == {"A": 0} + assert a.state.available_resources == {"A": 1} @gen_cluster(client=True)