-
-
Notifications
You must be signed in to change notification settings - Fork 149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create worker pods through Deployments #730
Conversation
35f4392
to
69423d7
Compare
Can we merge this after #711 pending review? |
if this gets merged and new release of dask-k8s is out, this solves 90% of our problems 😍 Thanks for all the hard work! |
CI failures are releated to #738. Merging from main. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! I pulled it locally and tested it out and it seems to work great.
I'm keen to switch out kubernetes_asyncio
for kr8s
in daskworkergroup_replica_update
but we can do that in a follow up to avoid creeping the scope of this PR.
I have suggested a change to the test that uses kr8s
, but again we could just do this later.
with kopf_runner as runner: | ||
async with gen_cluster() as (cluster_name, ns): | ||
scheduler_deployment_name = "simple-scheduler" | ||
worker_deployment_name = "simple-default-worker" | ||
service_name = "simple-scheduler" | ||
while scheduler_deployment_name not in k8s_cluster.kubectl( | ||
"get", "pods", "-n", ns | ||
): | ||
await asyncio.sleep(0.1) | ||
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns): | ||
await asyncio.sleep(0.1) | ||
while worker_deployment_name not in k8s_cluster.kubectl( | ||
"get", "pods", "-n", ns | ||
): | ||
await asyncio.sleep(0.1) | ||
k8s_cluster.kubectl( | ||
"delete", | ||
"pods", | ||
"-l", | ||
"dask.org/cluster-name=simple,dask.org/component=worker", | ||
"-n", | ||
ns, | ||
) | ||
k8s_cluster.kubectl( | ||
"wait", | ||
"--for=condition=Ready", | ||
"-l", | ||
"dask.org/cluster-name=simple,dask.org/component=worker", | ||
"pod", | ||
"-n", | ||
ns, | ||
"--timeout=60s", | ||
) | ||
assert worker_deployment_name in k8s_cluster.kubectl( | ||
"get", "pods", "-n", ns | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent some time tinkering with this test. I wrote this implementation which uses kr8s
and tests a little more thoroughly that the Pods get created, become Ready, get deleted, get recreated and get back to Ready.
This has also inspired a few tweaks to kr8s so I may modify this a little further after it is merged.
with kopf_runner as runner: | |
async with gen_cluster() as (cluster_name, ns): | |
scheduler_deployment_name = "simple-scheduler" | |
worker_deployment_name = "simple-default-worker" | |
service_name = "simple-scheduler" | |
while scheduler_deployment_name not in k8s_cluster.kubectl( | |
"get", "pods", "-n", ns | |
): | |
await asyncio.sleep(0.1) | |
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns): | |
await asyncio.sleep(0.1) | |
while worker_deployment_name not in k8s_cluster.kubectl( | |
"get", "pods", "-n", ns | |
): | |
await asyncio.sleep(0.1) | |
k8s_cluster.kubectl( | |
"delete", | |
"pods", | |
"-l", | |
"dask.org/cluster-name=simple,dask.org/component=worker", | |
"-n", | |
ns, | |
) | |
k8s_cluster.kubectl( | |
"wait", | |
"--for=condition=Ready", | |
"-l", | |
"dask.org/cluster-name=simple,dask.org/component=worker", | |
"pod", | |
"-n", | |
ns, | |
"--timeout=60s", | |
) | |
assert worker_deployment_name in k8s_cluster.kubectl( | |
"get", "pods", "-n", ns | |
) | |
api = await kr8s.asyncio.api() | |
with kopf_runner as runner: | |
async with gen_cluster() as (cluster_name, ns): | |
# Wait for worker Pods to be created | |
while True: | |
pods = await api.get( | |
"pods", | |
namespace=ns, | |
label_selector=f"dask.org/cluster-name={cluster_name},dask.org/component=worker", | |
) | |
if not pods: | |
await asyncio.sleep(0.1) | |
continue | |
break | |
# Store number of workers | |
n_pods = len(pods) | |
# Wait for worker Pods to be ready | |
await asyncio.gather( | |
*[pod.wait(conditions="condition=Ready", timeout=60) for pod in pods] | |
) | |
# Delete a worker Pod | |
await pods[0].delete() | |
# Wait for Pods to be recreated | |
while True: | |
pods = await api.get( | |
"pods", | |
namespace=ns, | |
label_selector=f"dask.org/cluster-name={cluster_name},dask.org/component=worker", | |
) | |
if len(pods) < n_pods: | |
await asyncio.sleep(0.1) | |
continue | |
break | |
# Wait for worker Pods to be ready | |
await asyncio.gather( | |
*[pod.wait(conditions="condition=Ready", timeout=60) for pod in pods] | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm going to merge this and then update things in a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up is #743
@jacobtomlinson Would it be possible to create a release now that this is in? This would also really help us in case of spot node preemption |
@bstadlbauer sure! |
@briceruzand please don't comment on closed PRs. Open new issues instead. |
Closes #603. Currently, if worker group loses a pod, isn't recreated by the operator. This PR solves that problem by create deployment for each worker. The deployment then creates the worker pod. The advantage of a deployment is that we can define the number of replicas (1 replica/scheduler in this case) and Kubernetes will recreate the worker pod.