From e76558c6e7df98e91760c83fd0904f93339dcce7 Mon Sep 17 00:00:00 2001 From: Ahmed Ezzat Date: Mon, 8 Jan 2024 22:32:53 +0200 Subject: [PATCH] Prioritize pending deployment removal on scaling down --- .../operator/controller/controller.py | 36 ++++++++----------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 3bef88219..4c3679395 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -6,6 +6,7 @@ from uuid import uuid4 import aiohttp +import anyio import dask.config import kopf import kr8s @@ -606,27 +607,20 @@ async def daskworkergroup_replica_update( ) logger.info(f"Scaled worker group {name} up to {desired_workers} workers.") if workers_needed < 0: - len_current_workers = len(current_workers) - deleted_workers = [] - for idx in range(-workers_needed): - if idx > len_current_workers: - break - worker = current_workers[idx] - - is_running = False - for pod in await worker.pods(): - if pod.status.phase == "Running": - is_running = True - break - - if not is_running: - deleted_workers.append(worker) - await worker.delete() - - if deleted_workers: - logger.info(f"Deleted pending {len(deleted_workers)} workers") - - n_workers = -workers_needed - len(deleted_workers) + unready_deployments = [ + current_workers[idx] + for idx in range(-workers_needed) + if idx < len(current_workers) and not current_workers[idx].ready() + ] + + async with anyio.create_task_group() as tg: + for deployment in unready_deployments: + tg.start_soon(deployment.delete) + + if unready_deployments: + logger.info(f"Deleted unready {len(unready_deployments)} workers.") + + n_workers = -workers_needed - len(unready_deployments) if n_workers > 0: worker_ids = await retire_workers( n_workers=n_workers,