Skip to content

Commit

Permalink
Retire pending workers
Browse files Browse the repository at this point in the history
  • Loading branch information
BitTheByte committed Sep 18, 2023
1 parent 92714da commit 9eb77e3
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,15 +549,13 @@ async def daskworkergroup_replica_update(
# Replica updates can come in quick succession and the changes must be applied atomically to ensure
# the number of workers ends in the correct state
async with worker_group_scale_locks[f"{namespace}/{name}"]:
current_workers = len(
await kr8s.asyncio.get(
"deployments",
namespace=namespace,
label_selector={"dask.org/workergroup-name": name},
)
current_workers = await kr8s.asyncio.get(
"deployments",
namespace=namespace,
label_selector={"dask.org/workergroup-name": name},
)
desired_workers = new
workers_needed = desired_workers - current_workers
workers_needed = desired_workers - len(current_workers)
labels = _get_labels(meta)
annotations = _get_annotations(meta)
worker_spec = spec["worker"]
Expand Down Expand Up @@ -597,17 +595,28 @@ async def daskworkergroup_replica_update(
)
logger.info(f"Scaled worker group {name} up to {desired_workers} workers.")
if workers_needed < 0:
worker_ids = await retire_workers(
n_workers=-workers_needed,
scheduler_service_name=f"{cluster_name}-scheduler",
worker_group_name=name,
namespace=namespace,
logger=logger,
)
logger.info(f"Workers to close: {worker_ids}")
for wid in worker_ids:
worker_deployment = await Deployment(wid, namespace=namespace)
await worker_deployment.delete()
pending_workers = []
for worker in current_workers:
if not worker.ready() and len(pending_workers) <= -workers_needed:
pending_workers.append(worker)

for worker in pending_workers:
await worker.delete()

n_workers = -workers_needed - len(pending_workers)
if n_workers > 0:
worker_ids = await retire_workers(
n_workers=n_workers,
scheduler_service_name=f"{cluster_name}-scheduler",
worker_group_name=name,
namespace=namespace,
logger=logger,
)
logger.info(f"Workers to close: {worker_ids}")
for wid in worker_ids:
worker_deployment = await Deployment(wid, namespace=namespace)
await worker_deployment.delete()

logger.info(
f"Scaled worker group {name} down to {desired_workers} workers."
)
Expand Down

0 comments on commit 9eb77e3

Please sign in to comment.