From a7cbd740e74f501e148f51616872486c27902b92 Mon Sep 17 00:00:00 2001 From: jonded94 Date: Fri, 1 Mar 2024 16:50:11 +0100 Subject: [PATCH 1/6] Fix issue of duplicated environment variables (#869) * Fix issue of replicated environment variables * Test for non-replicated environment variables --------- Co-authored-by: Jonas Dedden --- .../operator/controller/controller.py | 55 ++++++++++--------- .../controller/tests/test_controller.py | 8 +++ 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 00c47aeed..f8a300085 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -1,4 +1,5 @@ import asyncio +import copy import time from collections import defaultdict from contextlib import suppress @@ -79,14 +80,15 @@ def build_scheduler_deployment_spec( "labels": labels, "annotations": annotations, } - spec = {} - spec["replicas"] = 1 - spec["selector"] = { - "matchLabels": labels, - } - spec["template"] = { - "metadata": metadata, - "spec": pod_spec, + spec = { + "replicas": 1, + "selector": { + "matchLabels": labels, + }, + "template": { + "metadata": metadata, + "spec": pod_spec, + }, } return { "apiVersion": "apps/v1", @@ -132,14 +134,15 @@ def build_worker_deployment_spec( "labels": labels, "annotations": annotations, } - spec = {} - spec["replicas"] = 1 # make_worker_spec returns dict with a replicas key? - spec["selector"] = { - "matchLabels": labels, - } - spec["template"] = { - "metadata": metadata, - "spec": pod_spec, + spec = { + "replicas": 1, + "selector": { + "matchLabels": labels, + }, + "template": { + "metadata": metadata, + "spec": copy.deepcopy(pod_spec), + }, } deployment_spec = { "apiVersion": "apps/v1", @@ -157,13 +160,11 @@ def build_worker_deployment_spec( "value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786", }, ] - for i in range(len(deployment_spec["spec"]["template"]["spec"]["containers"])): - if "env" in deployment_spec["spec"]["template"]["spec"]["containers"][i]: - deployment_spec["spec"]["template"]["spec"]["containers"][i]["env"].extend( - env - ) + for container in deployment_spec["spec"]["template"]["spec"]["containers"]: + if "env" in container: + container["env"].extend(env) else: - deployment_spec["spec"]["template"]["spec"]["containers"][i]["env"] = env + container["env"] = env return deployment_spec @@ -187,7 +188,7 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, lab "labels": labels, "annotations": annotations, }, - "spec": spec, + "spec": copy.deepcopy(spec), } env = [ { @@ -195,11 +196,11 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, lab "value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786", }, ] - for i in range(len(pod_spec["spec"]["containers"])): - if "env" in pod_spec["spec"]["containers"][i]: - pod_spec["spec"]["containers"][i]["env"].extend(env) + for container in pod_spec["spec"]["containers"]: + if "env" in container: + container["env"].extend(env) else: - pod_spec["spec"]["containers"][i]["env"] = env + container["env"] = env return pod_spec diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index 0bc3b0997..e4f3d5112 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -636,6 +636,14 @@ async def test_object_dask_cluster(k8s_cluster, kopf_runner, gen_cluster): wg = worker_groups[0] assert isinstance(wg, DaskWorkerGroup) + # Test for non-replicated environment variables; Fix for https://github.com/dask/dask-kubernetes/issues/841 + for deployment in await wg.deployments(): + env_vars = deployment.spec["template"]["spec"]["containers"]["env"] + env_var_names = [env_var["name"] for env_var in env_vars] + assert len(env_var_names) == len(set(env_var_names)) + assert "DASK_WORKER_NAME" in env_var_names + assert "DASK_SCHEDULER_ADDRESS" in env_var_names + scheduler_pod = await cluster.scheduler_pod() assert isinstance(scheduler_pod, Pod) From 1efdc519a7615edec9fc0b54110ab30d5514e3d6 Mon Sep 17 00:00:00 2001 From: Johanna Date: Fri, 1 Mar 2024 16:55:31 +0100 Subject: [PATCH 2/6] Validate Dask Cluster Names (#871) * fix(validation): Validate Dask Cluster Names This commit introduces cluster name validation in order to avoid the invalid state in which a `DaskCluster` resource with a too-long or RFC-1123-noncompliant name is created but cannot be deleted while the operator retries infinitely to create a scheduler service (see https://github.com/dask/dask-kubernetes/issues/826 for more details on this bug). Issues fixed: https://github.com/dask/dask-kubernetes/issues/870 https://github.com/dask/dask-kubernetes/issues/826 * Actually, stop removing the dask cluster automatically. It can be manually deleted. * Move the cluster name validation into a common module, add it to KubeCluster init, and add tests --------- Co-authored-by: Johanna Goergen --- dask_kubernetes/common/objects.py | 21 ++++++- dask_kubernetes/common/tests/test_objects.py | 26 +++++++- dask_kubernetes/constants.py | 10 +++ dask_kubernetes/exceptions.py | 7 +++ .../operator/controller/controller.py | 15 ++++- .../controller/tests/test_controller.py | 63 +++++++++++++++++-- .../operator/kubecluster/kubecluster.py | 2 + .../kubecluster/tests/test_kubecluster.py | 20 +++++- 8 files changed, 153 insertions(+), 11 deletions(-) diff --git a/dask_kubernetes/common/objects.py b/dask_kubernetes/common/objects.py index 44c90fd56..1eedeebd7 100644 --- a/dask_kubernetes/common/objects.py +++ b/dask_kubernetes/common/objects.py @@ -1,6 +1,7 @@ """ Convenience functions for creating pod templates. """ + import copy import json from collections import namedtuple @@ -8,7 +9,12 @@ from kubernetes import client from kubernetes.client.configuration import Configuration -from dask_kubernetes.constants import KUBECLUSTER_CONTAINER_NAME +from dask_kubernetes.constants import ( + KUBECLUSTER_CONTAINER_NAME, + MAX_CLUSTER_NAME_LEN, + VALID_CLUSTER_NAME, +) +from dask_kubernetes.exceptions import ValidationError _FakeResponse = namedtuple("_FakeResponse", ["data"]) @@ -365,3 +371,16 @@ def clean_pdb_template(pdb_template): pdb_template.spec.selector = client.V1LabelSelector() return pdb_template + + +def validate_cluster_name(cluster_name: str) -> None: + """Raise exception if cluster name is too long and/or has invalid characters""" + if not VALID_CLUSTER_NAME.match(cluster_name): + raise ValidationError( + message=( + f"The DaskCluster {cluster_name} is invalid: a lowercase RFC 1123 subdomain must " + "consist of lower case alphanumeric characters, '-' or '.', and must start " + "and end with an alphanumeric character. DaskCluster name must also be under " + f"{MAX_CLUSTER_NAME_LEN} characters." + ) + ) diff --git a/dask_kubernetes/common/tests/test_objects.py b/dask_kubernetes/common/tests/test_objects.py index 61ef99b3f..23e318961 100644 --- a/dask_kubernetes/common/tests/test_objects.py +++ b/dask_kubernetes/common/tests/test_objects.py @@ -1,5 +1,8 @@ -from dask_kubernetes.common.objects import make_pod_from_dict -from dask_kubernetes.constants import KUBECLUSTER_CONTAINER_NAME +import pytest + +from dask_kubernetes.common.objects import make_pod_from_dict, validate_cluster_name +from dask_kubernetes.constants import KUBECLUSTER_CONTAINER_NAME, MAX_CLUSTER_NAME_LEN +from dask_kubernetes.exceptions import ValidationError def test_make_pod_from_dict(): @@ -64,3 +67,22 @@ def test_make_pod_from_dict_default_container_name(): assert pod.spec.containers[0].name == "dask-0" assert pod.spec.containers[1].name == "sidecar" assert pod.spec.containers[2].name == "dask-2" + + +@pytest.mark.parametrize( + "cluster_name", + [ + (MAX_CLUSTER_NAME_LEN + 1) * "a", + "invalid.chars.in.name", + ], +) +def test_validate_cluster_name_raises_on_invalid_name( + cluster_name, +): + + with pytest.raises(ValidationError): + validate_cluster_name(cluster_name) + + +def test_validate_cluster_name_success_on_valid_name(): + assert validate_cluster_name("valid-cluster-name-123") is None diff --git a/dask_kubernetes/constants.py b/dask_kubernetes/constants.py index f22c804a5..5133e2de7 100644 --- a/dask_kubernetes/constants.py +++ b/dask_kubernetes/constants.py @@ -1 +1,11 @@ +import re + KUBECLUSTER_CONTAINER_NAME = "dask-container" +KUBERNETES_MAX_RESOURCE_NAME_LENGTH = 63 +SCHEDULER_NAME_TEMPLATE = "{cluster_name}-scheduler" +MAX_CLUSTER_NAME_LEN = KUBERNETES_MAX_RESOURCE_NAME_LENGTH - len( + SCHEDULER_NAME_TEMPLATE.format(cluster_name="") +) +VALID_CLUSTER_NAME = re.compile( + rf"^(?=.{{,{MAX_CLUSTER_NAME_LEN}}}$)[a-z0-9]([-a-z0-9]*[a-z0-9])?$" +) diff --git a/dask_kubernetes/exceptions.py b/dask_kubernetes/exceptions.py index d501aab48..dc107c8c3 100644 --- a/dask_kubernetes/exceptions.py +++ b/dask_kubernetes/exceptions.py @@ -4,3 +4,10 @@ class CrashLoopBackOffError(Exception): class SchedulerStartupError(Exception): """Scheduler failed to start.""" + + +class ValidationError(Exception): + """Manifest validation exception""" + + def __init__(self, message: str) -> None: + self.message = message diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index f8a300085..7bb70a09c 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -15,6 +15,9 @@ from importlib_metadata import entry_points from kr8s.asyncio.objects import Deployment, Pod, Service +from dask_kubernetes.common.objects import validate_cluster_name +from dask_kubernetes.constants import SCHEDULER_NAME_TEMPLATE +from dask_kubernetes.exceptions import ValidationError from dask_kubernetes.operator._objects import ( DaskAutoscaler, DaskCluster, @@ -76,7 +79,7 @@ def build_scheduler_deployment_spec( } ) metadata = { - "name": f"{cluster_name}-scheduler", + "name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name), "labels": labels, "annotations": annotations, } @@ -109,7 +112,7 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): "apiVersion": "v1", "kind": "Service", "metadata": { - "name": f"{cluster_name}-scheduler", + "name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name), "labels": labels, "annotations": annotations, }, @@ -274,6 +277,12 @@ async def daskcluster_create(name, namespace, logger, patch, **kwargs): This allows us to track that the operator is running. """ logger.info(f"DaskCluster {name} created in {namespace}.") + try: + validate_cluster_name(name) + except ValidationError as validation_exc: + patch.status["phase"] = "Error" + raise kopf.PermanentError(validation_exc.message) + patch.status["phase"] = "Created" @@ -600,7 +609,7 @@ async def daskworkergroup_replica_update( if workers_needed < 0: worker_ids = await retire_workers( n_workers=-workers_needed, - scheduler_service_name=f"{cluster_name}-scheduler", + scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format(cluster_name), worker_group_name=name, namespace=namespace, logger=logger, diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index e4f3d5112..144865955 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -11,6 +11,7 @@ from dask.distributed import Client from kr8s.asyncio.objects import Deployment, Pod, Service +from dask_kubernetes.constants import MAX_CLUSTER_NAME_LEN from dask_kubernetes.operator._objects import DaskCluster, DaskJob, DaskWorkerGroup from dask_kubernetes.operator.controller import ( KUBERNETES_DATETIME_FORMAT, @@ -22,17 +23,32 @@ _EXPECTED_ANNOTATIONS = {"test-annotation": "annotation-value"} _EXPECTED_LABELS = {"test-label": "label-value"} +DEFAULT_CLUSTER_NAME = "simple" @pytest.fixture() -def gen_cluster(k8s_cluster, ns): +def gen_cluster_manifest(tmp_path): + def factory(cluster_name=DEFAULT_CLUSTER_NAME): + original_manifest_path = os.path.join(DIR, "resources", "simplecluster.yaml") + with open(original_manifest_path, "r") as original_manifest_file: + manifest = yaml.safe_load(original_manifest_file) + + manifest["metadata"]["name"] = cluster_name + new_manifest_path = tmp_path / "cluster.yaml" + new_manifest_path.write_text(yaml.safe_dump(manifest)) + return tmp_path + + return factory + + +@pytest.fixture() +def gen_cluster(k8s_cluster, ns, gen_cluster_manifest): """Yields an instantiated context manager for creating/deleting a simple cluster.""" @asynccontextmanager - async def cm(): - cluster_path = os.path.join(DIR, "resources", "simplecluster.yaml") - cluster_name = "simple" + async def cm(cluster_name=DEFAULT_CLUSTER_NAME): + cluster_path = gen_cluster_manifest(cluster_name) # Create cluster resource k8s_cluster.kubectl("apply", "-n", ns, "-f", cluster_path) while cluster_name not in k8s_cluster.kubectl( @@ -695,3 +711,42 @@ async def test_object_dask_job(k8s_cluster, kopf_runner, gen_job): cluster = await job.cluster() assert isinstance(cluster, DaskCluster) + + +async def _get_cluster_status(k8s_cluster, ns, cluster_name): + """ + Will loop infinitely in search of non-falsey cluster status. + Make sure there is a timeout on any test which calls this. + """ + while True: + cluster_status = k8s_cluster.kubectl( + "get", + "-n", + ns, + "daskcluster.kubernetes.dask.org", + cluster_name, + "-o", + "jsonpath='{.status.phase}'", + ).strip("'") + if cluster_status: + return cluster_status + await asyncio.sleep(0.1) + + +@pytest.mark.timeout(180) +@pytest.mark.anyio +@pytest.mark.parametrize( + "cluster_name,expected_status", + [ + ("valid-name", "Created"), + ((MAX_CLUSTER_NAME_LEN + 1) * "a", "Error"), + ("invalid.chars.in.name", "Error"), + ], +) +async def test_create_cluster_validates_name( + cluster_name, expected_status, k8s_cluster, kopf_runner, gen_cluster +): + with kopf_runner: + async with gen_cluster(cluster_name=cluster_name) as (_, ns): + actual_status = await _get_cluster_status(k8s_cluster, ns, cluster_name) + assert expected_status == actual_status diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index d5891ab4a..5b2b540d0 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -29,6 +29,7 @@ from rich.table import Table from tornado.ioloop import IOLoop +from dask_kubernetes.common.objects import validate_cluster_name from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError from dask_kubernetes.operator._objects import ( DaskAutoscaler, @@ -258,6 +259,7 @@ def __init__( name = name.format( user=getpass.getuser(), uuid=str(uuid.uuid4())[:10], **os.environ ) + validate_cluster_name(name) self._instances.add(self) self._rich_spinner = Spinner("dots", speed=0.5) self._startup_component_status: dict = {} diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index 931f24b90..662e711fa 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -2,7 +2,8 @@ from dask.distributed import Client from distributed.utils import TimeoutError -from dask_kubernetes.exceptions import SchedulerStartupError +from dask_kubernetes.constants import MAX_CLUSTER_NAME_LEN +from dask_kubernetes.exceptions import SchedulerStartupError, ValidationError from dask_kubernetes.operator import KubeCluster, make_cluster_spec @@ -202,3 +203,20 @@ def test_typo_resource_limits(ns): }, namespace=ns, ) + + +@pytest.mark.parametrize( + "cluster_name", + [ + (MAX_CLUSTER_NAME_LEN + 1) * "a", + "invalid.chars.in.name", + ], +) +def test_invalid_cluster_name_fails(cluster_name, kopf_runner, docker_image, ns): + with kopf_runner: + with pytest.raises(ValidationError): + KubeCluster( + name=cluster_name, + namespace=ns, + image=docker_image, + ) From b668cc62944dc3eac225dc4c4ff01f2b3743c9da Mon Sep 17 00:00:00 2001 From: Johanna Date: Mon, 4 Mar 2024 18:32:27 +0100 Subject: [PATCH 3/6] Fix: KeyError on Building Scheduler Name in Replica Count Decrease (#872) --- dask_kubernetes/operator/controller/controller.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 7bb70a09c..efbd3887b 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -609,7 +609,9 @@ async def daskworkergroup_replica_update( if workers_needed < 0: worker_ids = await retire_workers( n_workers=-workers_needed, - scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format(cluster_name), + scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format( + cluster_name=cluster_name + ), worker_group_name=name, namespace=namespace, logger=logger, From 93fe171b8daf7c6b0eaf875a48d1f9454a623620 Mon Sep 17 00:00:00 2001 From: jonded94 Date: Tue, 2 Apr 2024 14:57:59 +0200 Subject: [PATCH 4/6] Enable overwrites of default environment variables (#874) * Enable overwrites of default environment variables * Black formatting * Include test for additional worker group; test overriding of environment variables * Black --------- Co-authored-by: Jonas Dedden --- .../operator/controller/controller.py | 32 ++++--- .../tests/resources/simpleworkergroup.yaml | 4 +- .../controller/tests/test_controller.py | 89 +++++++++++++++---- 3 files changed, 93 insertions(+), 32 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index efbd3887b..13702d97e 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -153,21 +153,25 @@ def build_worker_deployment_spec( "metadata": metadata, "spec": spec, } - env = [ - { - "name": "DASK_WORKER_NAME", - "value": worker_name, - }, - { - "name": "DASK_SCHEDULER_ADDRESS", - "value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786", - }, - ] + worker_env = { + "name": "DASK_WORKER_NAME", + "value": worker_name, + } + scheduler_env = { + "name": "DASK_SCHEDULER_ADDRESS", + "value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786", + } for container in deployment_spec["spec"]["template"]["spec"]["containers"]: - if "env" in container: - container["env"].extend(env) - else: - container["env"] = env + if "env" not in container: + container["env"] = [worker_env, scheduler_env] + continue + + container_env_names = [env_item["name"] for env_item in container["env"]] + + if "DASK_WORKER_NAME" not in container_env_names: + container["env"].append(worker_env) + if "DASK_SCHEDULER_ADDRESS" not in container_env_names: + container["env"].append(scheduler_env) return deployment_spec diff --git a/dask_kubernetes/operator/controller/tests/resources/simpleworkergroup.yaml b/dask_kubernetes/operator/controller/tests/resources/simpleworkergroup.yaml index cd7da0e92..e99ebe608 100644 --- a/dask_kubernetes/operator/controller/tests/resources/simpleworkergroup.yaml +++ b/dask_kubernetes/operator/controller/tests/resources/simpleworkergroup.yaml @@ -5,7 +5,7 @@ metadata: spec: cluster: simple worker: - replicas: 2 + replicas: 1 spec: containers: - name: worker @@ -23,3 +23,5 @@ spec: env: - name: WORKER_ENV value: hello-world # We dont test the value, just the name + - name: DASK_WORKER_NAME + value: test-worker diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index 144865955..33abf9c4f 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -20,7 +20,6 @@ DIR = pathlib.Path(__file__).parent.absolute() - _EXPECTED_ANNOTATIONS = {"test-annotation": "annotation-value"} _EXPECTED_LABELS = {"test-label": "label-value"} DEFAULT_CLUSTER_NAME = "simple" @@ -47,7 +46,6 @@ def gen_cluster(k8s_cluster, ns, gen_cluster_manifest): @asynccontextmanager async def cm(cluster_name=DEFAULT_CLUSTER_NAME): - cluster_path = gen_cluster_manifest(cluster_name) # Create cluster resource k8s_cluster.kubectl("apply", "-n", ns, "-f", cluster_path) @@ -95,6 +93,36 @@ async def cm(job_file): yield cm +@pytest.fixture() +def gen_worker_group(k8s_cluster, ns): + """Yields an instantiated context manager for creating/deleting a worker group.""" + + @asynccontextmanager + async def cm(worker_group_file): + worker_group_path = os.path.join(DIR, "resources", worker_group_file) + with open(worker_group_path) as f: + worker_group_name = yaml.load(f, yaml.Loader)["metadata"]["name"] + + # Create cluster resource + k8s_cluster.kubectl("apply", "-n", ns, "-f", worker_group_path) + while worker_group_name not in k8s_cluster.kubectl( + "get", "daskworkergroups.kubernetes.dask.org", "-n", ns + ): + await asyncio.sleep(0.1) + + try: + yield worker_group_name, ns + finally: + # Test: remove the wait=True, because I think this is blocking the operator + k8s_cluster.kubectl("delete", "-n", ns, "-f", worker_group_path) + while worker_group_name in k8s_cluster.kubectl( + "get", "daskworkergroups.kubernetes.dask.org", "-n", ns + ): + await asyncio.sleep(0.1) + + yield cm + + def test_customresources(k8s_cluster): assert "daskclusters.kubernetes.dask.org" in k8s_cluster.kubectl("get", "crd") assert "daskworkergroups.kubernetes.dask.org" in k8s_cluster.kubectl("get", "crd") @@ -671,32 +699,59 @@ async def test_object_dask_cluster(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio -async def test_object_dask_worker_group(k8s_cluster, kopf_runner, gen_cluster): +async def test_object_dask_worker_group( + k8s_cluster, kopf_runner, gen_cluster, gen_worker_group +): with kopf_runner: - async with gen_cluster() as (cluster_name, ns): + async with ( + gen_cluster() as (cluster_name, ns), + gen_worker_group("simpleworkergroup.yaml") as ( + additional_workergroup_name, + _, + ), + ): cluster = await DaskCluster.get(cluster_name, namespace=ns) + additional_workergroup = await DaskWorkerGroup.get( + additional_workergroup_name, namespace=ns + ) worker_groups = [] while not worker_groups: worker_groups = await cluster.worker_groups() await asyncio.sleep(0.1) assert len(worker_groups) == 1 # Just the default worker group - wg = worker_groups[0] - assert isinstance(wg, DaskWorkerGroup) + worker_groups = worker_groups + [additional_workergroup] - pods = [] - while not pods: - pods = await wg.pods() - await asyncio.sleep(0.1) - assert all([isinstance(p, Pod) for p in pods]) + for wg in worker_groups: + assert isinstance(wg, DaskWorkerGroup) - deployments = [] - while not deployments: - deployments = await wg.deployments() - await asyncio.sleep(0.1) - assert all([isinstance(d, Deployment) for d in deployments]) + deployments = [] + while not deployments: + deployments = await wg.deployments() + await asyncio.sleep(0.1) + assert all([isinstance(d, Deployment) for d in deployments]) - assert (await wg.cluster()).name == cluster.name + pods = [] + while not pods: + pods = await wg.pods() + await asyncio.sleep(0.1) + assert all([isinstance(p, Pod) for p in pods]) + + assert (await wg.cluster()).name == cluster.name + + for deployment in deployments: + assert deployment.labels["dask.org/cluster-name"] == cluster.name + for env in deployment.spec["template"]["spec"]["containers"][0][ + "env" + ]: + if env["name"] == "DASK_WORKER_NAME": + if wg.name == additional_workergroup_name: + assert env["value"] == "test-worker" + else: + assert env["value"] == deployment.name + if env["name"] == "DASK_SCHEDULER_ADDRESS": + scheduler_service = await cluster.scheduler_service() + assert f"{scheduler_service.name}.{ns}" in env["value"] @pytest.mark.anyio From d3f0ef85f26077f88165d4cd134a84926f23c594 Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Wed, 3 Apr 2024 11:41:05 +0200 Subject: [PATCH 5/6] Implement new container env mechanism also for Dask Jobs (#879) * Implement new container env mechanism also for Dask Jobs * Black fixes --------- Co-authored-by: Jonas Dedden --- .../operator/controller/controller.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 13702d97e..1254ee528 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -197,17 +197,19 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, lab }, "spec": copy.deepcopy(spec), } - env = [ - { - "name": "DASK_SCHEDULER_ADDRESS", - "value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786", - }, - ] + scheduler_env = { + "name": "DASK_SCHEDULER_ADDRESS", + "value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786", + } for container in pod_spec["spec"]["containers"]: - if "env" in container: - container["env"].extend(env) - else: - container["env"] = env + if "env" not in container: + container["env"] = [scheduler_env] + continue + + container_env_names = [env_item["name"] for env_item in container["env"]] + + if "DASK_SCHEDULER_ADDRESS" not in container_env_names: + container["env"].append(scheduler_env) return pod_spec From 258d5562f8c958afda0d3b172bc8053e658f3b2d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:53:48 +0100 Subject: [PATCH 6/6] Bump google.golang.org/protobuf from 1.28.1 to 1.33.0 (#875) Bumps google.golang.org/protobuf from 1.28.1 to 1.33.0. --- updated-dependencies: - dependency-name: google.golang.org/protobuf dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d65b3e48f..708cdb787 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.28.1 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 5715c2565..4a58475fa 100644 --- a/go.sum +++ b/go.sum @@ -596,8 +596,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=