Skip to content

Commit

Permalink
retire workers with http, add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sil-lnagel committed Mar 28, 2024
1 parent 55b59af commit b5381e0
Showing 1 changed file with 68 additions and 49 deletions.
117 changes: 68 additions & 49 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
)
from dask_kubernetes.operator.networking import get_scheduler_address

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("aiohttp").setLevel(logging.WARNING)

_ANNOTATION_NAMESPACES_TO_IGNORE = (
"kopf.zalando.org",
"kubectl.kubernetes.io",
Expand Down Expand Up @@ -290,16 +287,25 @@ async def startup(settings: kopf.OperatorSettings, logger, **kwargs):
# https://kopf.readthedocs.io/en/latest/configuration/#networking-timeouts
settings.networking.request_timeout = 10

show_config = lambda config: logger.info(
f"{config}: {dask.config.get(config, None)}"
controller_parameters = (
# "default" / "careful" careful refuses to cancel workers that are still
# processing any tasks, this causes fewer reshuffles especially with p2p
"kubernetes.controller.autoscaler.method",
# cooldown period between autoscaler activations in seconds
"kubernetes.controller.autoscaler.cooldown",
# delay until autoscaling is attempted again in seconds
"kubernetes.controller.autoscaler.retry-delay",
# port to contact the dashboard
"kubernetes.controller.worker.dashboard-port",
# batch size of workers to scale up at once
"kubernetes.controller.worker-allocation.batch-size",
# delay between worker scale up cycles
"kubernetes.controller.worker-allocation.delay",
)

logger.info("- configuration -")
show_config("kubernetes.controller.autoscaler.method")
show_config("kubernetes.controller.autoscaler.cooldown")
show_config("kubernetes.controller.autoscaler.retry-delay")
show_config("kubernetes.controller.worker.dashboard-port")
logger.info("---")
logger.info("controller related configuration:")
for parameter in controller_parameters:
logger.info(f"{parameter}: {dask.config.get(parameter, None)}")


# There may be useful things for us to expose via the liveness probe
Expand Down Expand Up @@ -415,7 +421,10 @@ async def daskworkergroup_create(body, namespace, logger, **kwargs):
)


async def get_workers_to_close(n_workers, scheduler_service_name, namespace, logger):
async def get_workers_to_close(n_workers, scheduler_service_name, namespace):
"""Get the names of workers the scheduler recommends to close"""
# This endpoint does not exist in the distributed HTTP API - RPC it is.

comm_address = await get_scheduler_address(
scheduler_service_name,
namespace,
Expand All @@ -432,31 +441,31 @@ async def get_workers_to_close(n_workers, scheduler_service_name, namespace, log
async def retire_workers(
workers_to_close, scheduler_service_name, worker_group_name, namespace, logger
):
# # Try gracefully retiring via the HTTP API
# dashboard_address = await get_scheduler_address(
# scheduler_service_name,
# namespace,
# port_name="http-dashboard",
# allow_external=False,
# )
# async with aiohttp.ClientSession() as session:
# url = f"{dashboard_address}/api/v1/retire_workers"
# params = {"n": n_workers}
# async with session.post(url, json=params) as resp:
# if resp.status <= 300:
# retired_workers = await resp.json()
# logger.info("Retired workers %s", retired_workers)
# return [retired_workers[w]["name"] for w in retired_workers.keys()]
# logger.debug(
# "Received %d response from scheduler API with body %s",
# resp.status,
# await resp.text(),
# )

# # Otherwise try gracefully retiring via the RPC
# logger.debug(
# f"Scaling {worker_group_name} failed via the HTTP API, falling back to the Dask RPC"
# )
# Try gracefully retiring via the HTTP API
dashboard_address = await get_scheduler_address(
scheduler_service_name,
namespace,
port_name="http-dashboard",
allow_external=False,
)
async with aiohttp.ClientSession() as session:
url = f"{dashboard_address}/api/v1/retire_workers"
params = {"workers": workers_to_close}
async with session.post(url, json=params) as resp:
if resp.status <= 300:
retired_workers = await resp.json()
logger.info("Retired workers %s", retired_workers)
return [retired_workers[w]["name"] for w in retired_workers.keys()]
logger.debug(
"Received %d response from scheduler API with body %s",
resp.status,
await resp.text(),
)

# Otherwise try gracefully retiring via the RPC
logger.debug(
f"Scaling {worker_group_name} failed via the HTTP API, falling back to the Dask RPC"
)
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
with suppress(Exception):
comm_address = await get_scheduler_address(
Expand All @@ -473,6 +482,15 @@ async def retire_workers(
await scheduler_comm.retire_workers(names=workers_to_close, remove=True)
return workers_to_close

if (
dask.config.get("kubernetes.controller.autoscaler.method", "default")
== "careful"
):
logger.error(
"Could not retire workers gracefully, refusing to scale down arbitrary workers."
)
raise kopf.PermanentError("Failed to gracefully retire workers.")

# Finally fall back to last-in-first-out scaling
logger.warning(
f"Scaling {worker_group_name} failed via the HTTP API and the Dask RPC, falling back to LIFO scaling. "
Expand Down Expand Up @@ -556,7 +574,7 @@ def idle_since(dask_scheduler=None):


async def get_desired_workers(scheduler_service_name, namespace, logger):
# Try gracefully retiring via the HTTP API
# ask for desired worker count via the HTTP API
dashboard_address = await get_scheduler_address(
scheduler_service_name,
namespace,
Expand All @@ -570,7 +588,7 @@ async def get_desired_workers(scheduler_service_name, namespace, logger):
desired_workers = await resp.json()
return desired_workers["workers"]

# Otherwise try gracefully retiring via the RPC
# Otherwise ask worker count using RPC
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
try:
comm_address = await get_scheduler_address(
Expand Down Expand Up @@ -678,7 +696,6 @@ async def determine_worker_state(
async def daskworkergroup_replica_update(
name, namespace, meta, spec, new, body, logger, **kwargs
):
logger.info("replica update")
cluster_name = spec["cluster"]
wg = await DaskWorkerGroup(body, namespace=namespace)
try:
Expand Down Expand Up @@ -745,39 +762,39 @@ async def daskworkergroup_replica_update(
logger.info(
f"Attempting to downscale {name} by -{workers_not_needed} workers."
)
worker_ids = await get_workers_to_close(
n_workers=-workers_needed,
worker_ids_to_retire = await get_workers_to_close(
n_workers=workers_not_needed,
scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format(
cluster_name=cluster_name
),
namespace=namespace,
logger=logger,
)

logger.info(f"Workers to close: {worker_ids}")
logger.info(f"Workers to close: {worker_ids_to_retire}")

if (
dask.config.get("kubernetes.controller.autoscaler.method", "default")
!= "careful"
):
await retire_workers(
workers_to_close=worker_ids,
workers_to_close=worker_ids_to_retire,
scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format(
cluster_name=cluster_name
),
worker_group_name=name,
namespace=namespace,
logger=logger,
)
for wid in worker_ids:
for wid in worker_ids_to_retire:
worker_deployment = await Deployment(wid, namespace=namespace)
await worker_deployment.delete()
logger.info(
f"Scaled worker group {name} down to {desired_workers} workers."
)
else:
deployments = [
await Deployment(wid, namespace=namespace) for wid in worker_ids
await Deployment(wid, namespace=namespace)
for wid in worker_ids_to_retire
]

# if we don't wait for ready deployment.pods() will fail with:
Expand All @@ -786,7 +803,9 @@ async def daskworkergroup_replica_update(
readiness = [await d.ready() for d in deployments]

ready_deployments, not_ready_deployments = [], []
for deployment, ready, wid in zip(deployments, readiness, worker_ids):
for deployment, ready, wid in zip(
deployments, readiness, worker_ids_to_retire
):
(ready_deployments if ready else not_ready_deployments).append(
(deployment, wid)
)
Expand Down Expand Up @@ -1044,7 +1063,7 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
logger.debug("Autoscaler for %s is in cooldown", spec["cluster"])
return

# Ask the scheduler for the desired number of worker
# Ask the scheduler for the desired number of workers
try:
desired_workers = await get_desired_workers(
scheduler_service_name=f"{spec['cluster']}-scheduler",
Expand Down

0 comments on commit b5381e0

Please sign in to comment.