Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow retire_workers to run concurrently #8056

Merged
merged 6 commits into from
Oct 2, 2023

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jul 31, 2023

Closes #8053

The added test is rather superficial but it is simple and conveys what we want to work. There is also a little heavier test already available that does a similar thing

@pytest.mark.slow
@pytest.mark.parametrize("use_ReduceReplicas", [False, True])
@gen_cluster(
client=True,
nthreads=[("", 1)] * 10,
config={
"distributed.scheduler.active-memory-manager.start": True,
"distributed.scheduler.active-memory-manager.interval": 0.1,
"distributed.scheduler.active-memory-manager.measure": "managed",
"distributed.scheduler.active-memory-manager.policies": [
{"class": "distributed.active_memory_manager.ReduceReplicas"},
],
},
scheduler_kwargs={"transition_counter_max": 500_000},
worker_kwargs={"transition_counter_max": 500_000},
)
async def test_RetireWorker_stress(c, s, *workers, use_ReduceReplicas):
"""It is safe to retire the best part of a cluster in the middle of a computation"""
if not use_ReduceReplicas:
s.extensions["amm"].policies.clear()
addrs = list(s.workers)
random.shuffle(addrs)
print(f"Removing all workers except {addrs[-1]}")
# Note: Scheduler._lock effectively prevents multiple calls to retire_workers from
# running at the same time. However, the lock only exists for the benefit of legacy
# (non-AMM) rebalance() and replicate() methods. Once the lock is removed, these
# calls will become parallel and the test *should* continue working.
tasks = [asyncio.create_task(tensordot_stress(c))]
await asyncio.sleep(1)
tasks.append(asyncio.create_task(c.retire_workers(addrs[0:2])))
await asyncio.sleep(1)
tasks.append(asyncio.create_task(c.retire_workers(addrs[2:5])))
await asyncio.sleep(1)
tasks.append(asyncio.create_task(c.retire_workers(addrs[5:9])))
await asyncio.gather(*tasks)
assert set(s.workers) == {addrs[9]}

running those locally didn't reveal any problems. if CI is not sadder than usual, I'm fine with this

Regarding the conditions: This will prohibit running replicate/rebalance while retire_workers is running and will prohibit running anything else while replicate/rebalance is running.
The alternative is to nuke these APIs entirely but the way I see it this complexity is not too bad. I considered factoring this out but this abstraction would look aweful since the semantics we need here are hard to generalize. This just felt unnecessary. Happy to discuss either option.

@fjetter
Copy link
Member Author

fjetter commented Jul 31, 2023

FWIW I believe we also have pretty good consensus to nuke replicate/rebalance. Happy either way (from my POV this was actually less work but I'm happy if somebody comes around deleting code)

@github-actions
Copy link
Contributor

github-actions bot commented Jul 31, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       20 files   -        1         20 suites   - 1   9h 36m 3s ⏱️ - 1h 16m 47s
  3 821 tests +       6    3 707 ✔️ +       3     107 💤 ±  0    7 +3 
35 347 runs   - 1 516  33 624 ✔️  - 1 427  1 712 💤  - 92  11 +3 

For more details on these failures, see this check.

Results for commit 9b63ce6. ± Comparison against base commit b14f6ae.

♻️ This comment has been updated with latest results.

@crusaderky
Copy link
Collaborator

I like this, but I think the synchronisation primitive is complex and hard to read enough to warrant a bit of encapsulation. I'm refactoring it now.

@crusaderky crusaderky self-assigned this Sep 22, 2023
@crusaderky crusaderky marked this pull request as ready for review September 22, 2023 14:48
@crusaderky
Copy link
Collaborator

ready for review (assuming green tests)

distributed/asyncio.py Outdated Show resolved Hide resolved
distributed/asyncio.py Outdated Show resolved Hide resolved
distributed/tests/test_scheduler.py Show resolved Hide resolved
@crusaderky
Copy link
Collaborator

@fjetter all review comments have been addressed; ready for review again

@fjetter
Copy link
Member Author

fjetter commented Oct 2, 2023

LGTM (I cannot approve since I'm the original author)

@crusaderky crusaderky merged commit dd0f5b7 into dask:main Oct 2, 2023
18 of 28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Concurrent Scheduler.retire_workers
2 participants