diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index e4c57181..0229ddfd 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -29,9 +29,6 @@ ) from dask_kubernetes.operator.networking import get_scheduler_address -logging.getLogger("httpx").setLevel(logging.WARNING) -logging.getLogger("aiohttp").setLevel(logging.WARNING) - _ANNOTATION_NAMESPACES_TO_IGNORE = ( "kopf.zalando.org", "kubectl.kubernetes.io", @@ -290,16 +287,25 @@ async def startup(settings: kopf.OperatorSettings, logger, **kwargs): # https://kopf.readthedocs.io/en/latest/configuration/#networking-timeouts settings.networking.request_timeout = 10 - show_config = lambda config: logger.info( - f"{config}: {dask.config.get(config, None)}" + controller_parameters = ( + # "default" / "careful" careful refuses to cancel workers that are still + # processing any tasks, this causes fewer reshuffles especially with p2p + "kubernetes.controller.autoscaler.method", + # cooldown period between autoscaler activations in seconds + "kubernetes.controller.autoscaler.cooldown", + # delay until autoscaling is attempted again in seconds + "kubernetes.controller.autoscaler.retry-delay", + # port to contact the dashboard + "kubernetes.controller.worker.dashboard-port", + # batch size of workers to scale up at once + "kubernetes.controller.worker-allocation.batch-size", + # delay between worker scale up cycles + "kubernetes.controller.worker-allocation.delay", ) - logger.info("- configuration -") - show_config("kubernetes.controller.autoscaler.method") - show_config("kubernetes.controller.autoscaler.cooldown") - show_config("kubernetes.controller.autoscaler.retry-delay") - show_config("kubernetes.controller.worker.dashboard-port") - logger.info("---") + logger.info("controller related configuration:") + for parameter in controller_parameters: + logger.info(f"{parameter}: {dask.config.get(parameter, None)}") # There may be useful things for us to expose via the liveness probe @@ -415,7 +421,10 @@ async def daskworkergroup_create(body, namespace, logger, **kwargs): ) -async def get_workers_to_close(n_workers, scheduler_service_name, namespace, logger): +async def get_workers_to_close(n_workers, scheduler_service_name, namespace): + """Get the names of workers the scheduler recommends to close""" + # This endpoint does not exist in the distributed HTTP API - RPC it is. + comm_address = await get_scheduler_address( scheduler_service_name, namespace, @@ -432,31 +441,31 @@ async def get_workers_to_close(n_workers, scheduler_service_name, namespace, log async def retire_workers( workers_to_close, scheduler_service_name, worker_group_name, namespace, logger ): - # # Try gracefully retiring via the HTTP API - # dashboard_address = await get_scheduler_address( - # scheduler_service_name, - # namespace, - # port_name="http-dashboard", - # allow_external=False, - # ) - # async with aiohttp.ClientSession() as session: - # url = f"{dashboard_address}/api/v1/retire_workers" - # params = {"n": n_workers} - # async with session.post(url, json=params) as resp: - # if resp.status <= 300: - # retired_workers = await resp.json() - # logger.info("Retired workers %s", retired_workers) - # return [retired_workers[w]["name"] for w in retired_workers.keys()] - # logger.debug( - # "Received %d response from scheduler API with body %s", - # resp.status, - # await resp.text(), - # ) - - # # Otherwise try gracefully retiring via the RPC - # logger.debug( - # f"Scaling {worker_group_name} failed via the HTTP API, falling back to the Dask RPC" - # ) + # Try gracefully retiring via the HTTP API + dashboard_address = await get_scheduler_address( + scheduler_service_name, + namespace, + port_name="http-dashboard", + allow_external=False, + ) + async with aiohttp.ClientSession() as session: + url = f"{dashboard_address}/api/v1/retire_workers" + params = {"workers": workers_to_close} + async with session.post(url, json=params) as resp: + if resp.status <= 300: + retired_workers = await resp.json() + logger.info("Retired workers %s", retired_workers) + return [retired_workers[w]["name"] for w in retired_workers.keys()] + logger.debug( + "Received %d response from scheduler API with body %s", + resp.status, + await resp.text(), + ) + + # Otherwise try gracefully retiring via the RPC + logger.debug( + f"Scaling {worker_group_name} failed via the HTTP API, falling back to the Dask RPC" + ) # Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways with suppress(Exception): comm_address = await get_scheduler_address( @@ -473,6 +482,15 @@ async def retire_workers( await scheduler_comm.retire_workers(names=workers_to_close, remove=True) return workers_to_close + if ( + dask.config.get("kubernetes.controller.autoscaler.method", "default") + == "careful" + ): + logger.error( + "Could not retire workers gracefully, refusing to scale down arbitrary workers." + ) + raise kopf.PermanentError("Failed to gracefully retire workers.") + # Finally fall back to last-in-first-out scaling logger.warning( f"Scaling {worker_group_name} failed via the HTTP API and the Dask RPC, falling back to LIFO scaling. " @@ -556,7 +574,7 @@ def idle_since(dask_scheduler=None): async def get_desired_workers(scheduler_service_name, namespace, logger): - # Try gracefully retiring via the HTTP API + # ask for desired worker count via the HTTP API dashboard_address = await get_scheduler_address( scheduler_service_name, namespace, @@ -570,7 +588,7 @@ async def get_desired_workers(scheduler_service_name, namespace, logger): desired_workers = await resp.json() return desired_workers["workers"] - # Otherwise try gracefully retiring via the RPC + # Otherwise ask worker count using RPC # Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways try: comm_address = await get_scheduler_address( @@ -678,7 +696,6 @@ async def determine_worker_state( async def daskworkergroup_replica_update( name, namespace, meta, spec, new, body, logger, **kwargs ): - logger.info("replica update") cluster_name = spec["cluster"] wg = await DaskWorkerGroup(body, namespace=namespace) try: @@ -745,23 +762,22 @@ async def daskworkergroup_replica_update( logger.info( f"Attempting to downscale {name} by -{workers_not_needed} workers." ) - worker_ids = await get_workers_to_close( - n_workers=-workers_needed, + worker_ids_to_retire = await get_workers_to_close( + n_workers=workers_not_needed, scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format( cluster_name=cluster_name ), namespace=namespace, - logger=logger, ) - logger.info(f"Workers to close: {worker_ids}") + logger.info(f"Workers to close: {worker_ids_to_retire}") if ( dask.config.get("kubernetes.controller.autoscaler.method", "default") != "careful" ): await retire_workers( - workers_to_close=worker_ids, + workers_to_close=worker_ids_to_retire, scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format( cluster_name=cluster_name ), @@ -769,7 +785,7 @@ async def daskworkergroup_replica_update( namespace=namespace, logger=logger, ) - for wid in worker_ids: + for wid in worker_ids_to_retire: worker_deployment = await Deployment(wid, namespace=namespace) await worker_deployment.delete() logger.info( @@ -777,7 +793,8 @@ async def daskworkergroup_replica_update( ) else: deployments = [ - await Deployment(wid, namespace=namespace) for wid in worker_ids + await Deployment(wid, namespace=namespace) + for wid in worker_ids_to_retire ] # if we don't wait for ready deployment.pods() will fail with: @@ -786,7 +803,9 @@ async def daskworkergroup_replica_update( readiness = [await d.ready() for d in deployments] ready_deployments, not_ready_deployments = [], [] - for deployment, ready, wid in zip(deployments, readiness, worker_ids): + for deployment, ready, wid in zip( + deployments, readiness, worker_ids_to_retire + ): (ready_deployments if ready else not_ready_deployments).append( (deployment, wid) ) @@ -1044,7 +1063,7 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): logger.debug("Autoscaler for %s is in cooldown", spec["cluster"]) return - # Ask the scheduler for the desired number of worker + # Ask the scheduler for the desired number of workers try: desired_workers = await get_desired_workers( scheduler_service_name=f"{spec['cluster']}-scheduler",