Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into delayed-partition
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Dec 1, 2023
2 parents a6cbdcd + 6d1e133 commit 9627f27
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2248,32 +2248,36 @@ async def test_closing_scheduler_closes_workers(s, a, b):
assert time() < start + 2


@gen_cluster(
client=True, nthreads=[("127.0.0.1", 1)], worker_kwargs={"resources": {"A": 1}}
)
async def test_resources_reset_after_cancelled_task(c, s, w):
@gen_cluster(client=True, nthreads=[("", 1)], worker_kwargs={"resources": {"A": 1}})
async def test_resources_reset_after_cancelled_task(c, s, a):
lock = Lock()
await lock.acquire()

def block(lock):
with lock:
return

await lock.acquire()
future = c.submit(block, lock, resources={"A": 1})
assert s.workers[a.address].used_resources == {"A": 0}
assert a.state.available_resources == {"A": 1}

while not w.state.executing_count:
await asyncio.sleep(0.01)
future = c.submit(block, lock, key="x", resources={"A": 1})
await wait_for_state("x", "executing", a)
assert s.workers[a.address].used_resources == {"A": 1}
assert a.state.available_resources == {"A": 0}

await future.cancel()
await lock.release()

while w.state.executing_count:
await asyncio.sleep(0.01)
await wait_for_state("x", "cancelled", a)
assert s.workers[a.address].used_resources == {"A": 0}
assert a.state.available_resources == {"A": 0}

assert not s.workers[w.address].used_resources["A"]
assert w.state.available_resources == {"A": 1}
await lock.release()
await async_poll_for(lambda: not a.state.tasks, timeout=5)
assert s.workers[a.address].used_resources == {"A": 0}
assert a.state.available_resources == {"A": 1}

await c.submit(inc, 1, resources={"A": 1})
assert await c.submit(inc, 1, resources={"A": 1}) == 2
assert s.workers[a.address].used_resources == {"A": 0}
assert a.state.available_resources == {"A": 1}


@gen_cluster(client=True)
Expand Down

0 comments on commit 9627f27

Please sign in to comment.