Skip to content

Commit

Permalink
Fix issue of duplicated environment variables (#869)
Browse files Browse the repository at this point in the history
* Fix issue of replicated environment variables

* Test for non-replicated environment variables

---------

Co-authored-by: Jonas Dedden <[email protected]>
  • Loading branch information
jonded94 and Jonas Dedden authored Mar 1, 2024
1 parent 39038be commit a7cbd74
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 27 deletions.
55 changes: 28 additions & 27 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import copy
import time
from collections import defaultdict
from contextlib import suppress
Expand Down Expand Up @@ -79,14 +80,15 @@ def build_scheduler_deployment_spec(
"labels": labels,
"annotations": annotations,
}
spec = {}
spec["replicas"] = 1
spec["selector"] = {
"matchLabels": labels,
}
spec["template"] = {
"metadata": metadata,
"spec": pod_spec,
spec = {
"replicas": 1,
"selector": {
"matchLabels": labels,
},
"template": {
"metadata": metadata,
"spec": pod_spec,
},
}
return {
"apiVersion": "apps/v1",
Expand Down Expand Up @@ -132,14 +134,15 @@ def build_worker_deployment_spec(
"labels": labels,
"annotations": annotations,
}
spec = {}
spec["replicas"] = 1 # make_worker_spec returns dict with a replicas key?
spec["selector"] = {
"matchLabels": labels,
}
spec["template"] = {
"metadata": metadata,
"spec": pod_spec,
spec = {
"replicas": 1,
"selector": {
"matchLabels": labels,
},
"template": {
"metadata": metadata,
"spec": copy.deepcopy(pod_spec),
},
}
deployment_spec = {
"apiVersion": "apps/v1",
Expand All @@ -157,13 +160,11 @@ def build_worker_deployment_spec(
"value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786",
},
]
for i in range(len(deployment_spec["spec"]["template"]["spec"]["containers"])):
if "env" in deployment_spec["spec"]["template"]["spec"]["containers"][i]:
deployment_spec["spec"]["template"]["spec"]["containers"][i]["env"].extend(
env
)
for container in deployment_spec["spec"]["template"]["spec"]["containers"]:
if "env" in container:
container["env"].extend(env)
else:
deployment_spec["spec"]["template"]["spec"]["containers"][i]["env"] = env
container["env"] = env
return deployment_spec


Expand All @@ -187,19 +188,19 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, lab
"labels": labels,
"annotations": annotations,
},
"spec": spec,
"spec": copy.deepcopy(spec),
}
env = [
{
"name": "DASK_SCHEDULER_ADDRESS",
"value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786",
},
]
for i in range(len(pod_spec["spec"]["containers"])):
if "env" in pod_spec["spec"]["containers"][i]:
pod_spec["spec"]["containers"][i]["env"].extend(env)
for container in pod_spec["spec"]["containers"]:
if "env" in container:
container["env"].extend(env)
else:
pod_spec["spec"]["containers"][i]["env"] = env
container["env"] = env
return pod_spec


Expand Down
8 changes: 8 additions & 0 deletions dask_kubernetes/operator/controller/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,14 @@ async def test_object_dask_cluster(k8s_cluster, kopf_runner, gen_cluster):
wg = worker_groups[0]
assert isinstance(wg, DaskWorkerGroup)

# Test for non-replicated environment variables; Fix for https://github.com/dask/dask-kubernetes/issues/841
for deployment in await wg.deployments():
env_vars = deployment.spec["template"]["spec"]["containers"]["env"]
env_var_names = [env_var["name"] for env_var in env_vars]
assert len(env_var_names) == len(set(env_var_names))
assert "DASK_WORKER_NAME" in env_var_names
assert "DASK_SCHEDULER_ADDRESS" in env_var_names

scheduler_pod = await cluster.scheduler_pod()
assert isinstance(scheduler_pod, Pod)

Expand Down

0 comments on commit a7cbd74

Please sign in to comment.