Skip to content

Commit

Permalink
Prioritize pending deployment removal on scaling down
Browse files Browse the repository at this point in the history
  • Loading branch information
BitTheByte committed Jan 8, 2024
1 parent 807ea0c commit e76558c
Showing 1 changed file with 15 additions and 21 deletions.
36 changes: 15 additions & 21 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from uuid import uuid4

import aiohttp
import anyio
import dask.config
import kopf
import kr8s
Expand Down Expand Up @@ -606,27 +607,20 @@ async def daskworkergroup_replica_update(
)
logger.info(f"Scaled worker group {name} up to {desired_workers} workers.")
if workers_needed < 0:
len_current_workers = len(current_workers)
deleted_workers = []
for idx in range(-workers_needed):
if idx > len_current_workers:
break
worker = current_workers[idx]

is_running = False
for pod in await worker.pods():
if pod.status.phase == "Running":
is_running = True
break

if not is_running:
deleted_workers.append(worker)
await worker.delete()

if deleted_workers:
logger.info(f"Deleted pending {len(deleted_workers)} workers")

n_workers = -workers_needed - len(deleted_workers)
unready_deployments = [
current_workers[idx]
for idx in range(-workers_needed)
if idx < len(current_workers) and not current_workers[idx].ready()
]

async with anyio.create_task_group() as tg:
for deployment in unready_deployments:
tg.start_soon(deployment.delete)

if unready_deployments:
logger.info(f"Deleted unready {len(unready_deployments)} workers.")

n_workers = -workers_needed - len(unready_deployments)
if n_workers > 0:
worker_ids = await retire_workers(
n_workers=n_workers,
Expand Down

0 comments on commit e76558c

Please sign in to comment.