From 9eb77e31878487f2f5b8a0d3a96911b7d08431d3 Mon Sep 17 00:00:00 2001 From: Ahmed Ezzat Date: Mon, 18 Sep 2023 17:57:49 +0300 Subject: [PATCH] Retire pending workers --- .../operator/controller/controller.py | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 5cc38c484..4f3896a0f 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -549,15 +549,13 @@ async def daskworkergroup_replica_update( # Replica updates can come in quick succession and the changes must be applied atomically to ensure # the number of workers ends in the correct state async with worker_group_scale_locks[f"{namespace}/{name}"]: - current_workers = len( - await kr8s.asyncio.get( - "deployments", - namespace=namespace, - label_selector={"dask.org/workergroup-name": name}, - ) + current_workers = await kr8s.asyncio.get( + "deployments", + namespace=namespace, + label_selector={"dask.org/workergroup-name": name}, ) desired_workers = new - workers_needed = desired_workers - current_workers + workers_needed = desired_workers - len(current_workers) labels = _get_labels(meta) annotations = _get_annotations(meta) worker_spec = spec["worker"] @@ -597,17 +595,28 @@ async def daskworkergroup_replica_update( ) logger.info(f"Scaled worker group {name} up to {desired_workers} workers.") if workers_needed < 0: - worker_ids = await retire_workers( - n_workers=-workers_needed, - scheduler_service_name=f"{cluster_name}-scheduler", - worker_group_name=name, - namespace=namespace, - logger=logger, - ) - logger.info(f"Workers to close: {worker_ids}") - for wid in worker_ids: - worker_deployment = await Deployment(wid, namespace=namespace) - await worker_deployment.delete() + pending_workers = [] + for worker in current_workers: + if not worker.ready() and len(pending_workers) <= -workers_needed: + pending_workers.append(worker) + + for worker in pending_workers: + await worker.delete() + + n_workers = -workers_needed - len(pending_workers) + if n_workers > 0: + worker_ids = await retire_workers( + n_workers=n_workers, + scheduler_service_name=f"{cluster_name}-scheduler", + worker_group_name=name, + namespace=namespace, + logger=logger, + ) + logger.info(f"Workers to close: {worker_ids}") + for wid in worker_ids: + worker_deployment = await Deployment(wid, namespace=namespace) + await worker_deployment.delete() + logger.info( f"Scaled worker group {name} down to {desired_workers} workers." )