Skip to content

Commit

Permalink
Improve scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
BitTheByte committed Sep 18, 2023
1 parent 156e9e4 commit f0bf687
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,29 @@ async def daskworkergroup_replica_update(
# Replica updates can come in quick succession and the changes must be applied atomically to ensure
# the number of workers ends in the correct state
async with worker_group_scale_locks[f"{namespace}/{name}"]:
current_workers = await kr8s.asyncio.get(
current_workers = []

for deployment in await kr8s.asyncio.get(
"deployments",
namespace=namespace,
label_selector={"dask.org/workergroup-name": name},
)
):
deployment_pods = deployment.pods()
assert (
len(deployment_pods) <= 1
), "Deployment cannot have more than 1 replica pod"
if not deployment_pods:
continue

if deployment_pods[0].status.phase in [
"Pending",
"Running",
"Terminating",
"Initializing",
"ContainerCreating",
]:
current_workers.append(deployment)

desired_workers = new
workers_needed = desired_workers - len(current_workers)
labels = _get_labels(meta)
Expand Down Expand Up @@ -611,14 +629,18 @@ async def daskworkergroup_replica_update(
for idx in range(-workers_needed):
if idx > len_current_workers:
break

worker = current_workers[idx]
kube_pods = await worker.pods()
assert (
len(kube_pods) <= 1
), "Deployment cannot have more than 1 replica pod"
worker_pod = await worker.pods()

if not worker_pod:
continue

worker_pod = kube_pods[0]
if worker_pod.status.phase.lower() == "pending":
worker_pod = worker_pod[0]
if (
worker_pod.status.phase not in ["Running", "ContainerCreating"]
and not worker.ready()
):
await worker.delete()
deleted_workers.append(worker_pod.name)

Expand Down

0 comments on commit f0bf687

Please sign in to comment.