Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scale Dask worker group from Cluster spec #720

Merged
merged 2 commits into from
May 24, 2023
Merged

Conversation

Matt711
Copy link
Member

@Matt711 Matt711 commented May 23, 2023

Closes #636. This PR adds another handler that scales the default worker group when the worker.replicas field of the cluster spec is changed.

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really nicely written PR! Thanks for picking this up.

Comment on lines +492 to +504
async with kubernetes.client.api_client.ApiClient() as api_client:
custom_objects_api = kubernetes.client.CustomObjectsApi(api_client)
custom_objects_api.api_client.set_default_header(
"content-type", "application/merge-patch+json"
)
await custom_objects_api.patch_namespaced_custom_object_scale(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
name=worker_group_name,
body={"spec": {"replicas": new}},
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm excited about the kr8s changes in #696 because this will be hugely simplified.

Suggested change
async with kubernetes.client.api_client.ApiClient() as api_client:
custom_objects_api = kubernetes.client.CustomObjectsApi(api_client)
custom_objects_api.api_client.set_default_header(
"content-type", "application/merge-patch+json"
)
await custom_objects_api.patch_namespaced_custom_object_scale(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
name=worker_group_name,
body={"spec": {"replicas": new}},
)
worker_group = await DaskWorkerGroup.get(worker_group_name)
await worker_group.scale(new)

Happy to merge this as-is though and I'll update it in a follow-up once kr8s is integrated nicely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, wow! Definitely looking forward to that!

Comment on lines 179 to 185
k8s_cluster.kubectl(
"scale",
"--replicas=3",
"daskcluster.kubernetes.dask.org",
cluster_name,
)
await client.wait_for_workers(3)
Copy link
Member

@jacobtomlinson jacobtomlinson May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wont actually test anything because client.wait_for_workers(3) will just pass when there are 5 workers.

I opened dask/distributed#6377 a while ago to allow you to modify the behaviour for cases like this. I should nudge that PR along.

Maybe we should have a TODO comment here so that we come back and fix it up later.

@jacobtomlinson jacobtomlinson merged commit 5313a5b into dask:main May 24, 2023
@Matt711
Copy link
Member Author

Matt711 commented May 24, 2023

Looks like CI is failing. Looking into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

k8s Operator: DaskCluster spec updates do not propagate to pods
2 participants