Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate Dask Cluster Names #871

Merged
merged 3 commits into from
Mar 1, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import re
import time
from collections import defaultdict
from contextlib import suppress
Expand Down Expand Up @@ -31,6 +32,19 @@
KUBERNETES_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION = "kubernetes.dask.org/cooldown-until"
KUBERNETES_MAX_RESOURCE_NAME_LENGTH = 63
SCHEDULER_NAME_TEMPLATE = "{cluster_name}-scheduler"
MAX_CLUSTER_NAME_LEN = KUBERNETES_MAX_RESOURCE_NAME_LENGTH - len(
SCHEDULER_NAME_TEMPLATE.format(cluster_name="")
)
VALID_CLUSTER_NAME = re.compile(
rf"^(?=.{{,{MAX_CLUSTER_NAME_LEN}}}$)[a-z0-9]([-a-z0-9]*[a-z0-9])?$"
)


def _validate_cluster_name(cluster_name: str) -> bool:
return bool(VALID_CLUSTER_NAME.match(cluster_name))


# Load operator plugins from other packages
PLUGINS = []
Expand Down Expand Up @@ -75,7 +89,7 @@ def build_scheduler_deployment_spec(
}
)
metadata = {
"name": f"{cluster_name}-scheduler",
"name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name),
"labels": labels,
"annotations": annotations,
}
Expand Down Expand Up @@ -107,7 +121,7 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels):
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"{cluster_name}-scheduler",
"name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name),
"labels": labels,
"annotations": annotations,
},
Expand Down Expand Up @@ -273,6 +287,16 @@ async def daskcluster_create(name, namespace, logger, patch, **kwargs):
This allows us to track that the operator is running.
"""
logger.info(f"DaskCluster {name} created in {namespace}.")

if not _validate_cluster_name(name):
patch.status["phase"] = "Error"
raise kopf.PermanentError(
f"The DaskCluster {name} is invalid: a lowercase RFC 1123 subdomain must "
"consist of lower case alphanumeric characters, '-' or '.', and must start "
"and end with an alphanumeric character. DaskCluster name must also be under "
f"{MAX_CLUSTER_NAME_LEN} characters."
)

patch.status["phase"] = "Created"


Expand Down Expand Up @@ -599,7 +623,7 @@ async def daskworkergroup_replica_update(
if workers_needed < 0:
worker_ids = await retire_workers(
n_workers=-workers_needed,
scheduler_service_name=f"{cluster_name}-scheduler",
scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format(cluster_name),
worker_group_name=name,
namespace=namespace,
logger=logger,
Expand Down
Loading