Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support adaptive scaling in Helm cluster #277

Closed
Jeffwan opened this issue Nov 1, 2020 · 8 comments
Closed

Support adaptive scaling in Helm cluster #277

Jeffwan opened this issue Nov 1, 2020 · 8 comments

Comments

@Jeffwan
Copy link

Jeffwan commented Nov 1, 2020

As I understand, the different between KubeCluster and HelmCluster is

  1. KubeCluster start scheduler at where the client runs, the worker resources come from Kubernetes.
  2. HelmCluster has a long running scheduler pod in Kubernetes cluster.

My requirement is, I hope there's a long running scheduler in the cluster and multiple clients can connects this scheduler to submit tasks, the worker resources can come from same kubernetes cluster as scheduler and they can be scale up and down based on the load like what KubeCluster provides.

Seems it's a combination of KubeCluster and HelmCluster. Did community consider this case when we add Kubernetes support? Is there any technical blockers? If that's something reasonable, I can help work on this feature request

@jacobtomlinson
Copy link
Member

Thanks for raising this @Jeffwan.

A couple of corrections to your understanding:

  1. KubeCluster creates a new Dask cluster when it is instantiated. The scheduler can either be local to the Client or remote inside the cluster with the deploy_mode kwarg. But the lifecycle of the cluster is tied to the script that instantiated it. If the script exits the cluster is cleaned up during garbage collection.
  2. HelmCluster is to allow connecting to an existing cluster which was created via the Dask Helm Chart. The goal being to allow folks to access logs and manually scale the cluster in a way that is consistent with other cluster managers. But HelmCluster does not create or destroy the cluster.

In Dask terminology we refer to KubeCluster as being an ephemeral cluster option, and HelmCluster being a fixed cluster option. You may find this blog post of interest.

Running a single cluster with multiple clients connected is generally discouraged as Dask has no concepts to ensure fair usage. Tasks are executed on a first-come-first-served bases and this makes it very possible for a single client to hog a cluster.

Generally for these use cases we recommend each client creates their own ephemeral cluster with KubeCluster and set it to adaptive. One thing we have discussed (but I don't think implemented) is ensuring each additional worker pod created with KubeCluster will have a decreasing priority. This would allow k8s to effectively balance the available resource.

You're not the first to ask about adding adaptive functionality to HelmCluster. However, we have intentionally disabled that functionality due to a technical blocker.

def adapt(self, *args, **kwargs):
"""Turn on adaptivity (Not recommended)."""
raise NotImplementedError(
"It is not recommended to run ``HelmCluster`` in adaptive mode."
"When scaling down workers the decision on which worker to remove is left to Kubernetes, which"
"will not necessarily remove the same worker that Dask would choose. This may result in lost futures and"
"recalculation. It is recommended to manage scaling yourself with the ``HelmCluster.scale`` method."
)
async def _adapt(self, *args, **kwargs):
return super().adapt(*args, **kwargs)

The problem is that Dask workers are stateful, and the Dask scheduler manages that state. When an adaptive cluster decides to scale down state is intentionally removed from a worker before removing it, similar to draining a k8s node before removing it. However for the Helm Chart we use a Deployment resource for workers and can only set the desired number of replicas. So when removing a worker Dask would drain the state from one Pod, tell the worker process to exit, and then decrement the number of replicas in the deployment. Typically this causes a race condition where k8s can restart the worker before the new number of replicas updates and then remove a different pod when it does, resulting in lost state.

This is not a factor in KubeCluster because it creates Pod resources directly and manages their lifecycle fully.

I would be really interested to hear your thoughts on this. Every assumption here is definitely up for debate. It would also be great to hear more about your use case so we can take that into consideration.

@omarsumadi
Copy link

omarsumadi commented Jan 7, 2021

Hi @jacobtomlinson - I wanted to piggyback off of this exact question to perhaps add some clarity towards people who are looking for Dask as a small-business solution to schedule workflows. By the way, thanks for everything you have done - we need more people like you.

I am at a crossroads for my small business to deploy Dask as a way for our projected ~10 analysts to execute long-running Python computations. Here's the workflow that I run:

  • Someone submits their code through our Admin interface
  • That code is sent to our Django Webserver pod running inside of Kubernetes
  • Code is to be processed, depending on what the user specifies, by either threads or processes depending on if the GIL is released (such as a Dask-DF operation)
  • The Number of Workers is known beforehand (our analysts have to specify how many processes/threads they want)

My Attempts:
I initially have three ways towards setting up our infrastructure:

  1. Launch the Dask-Helm chart and enable Horizontal Autoscaling by setting a metric to scale off of CPU as shown in articles like these: https://levelup.gitconnected.com/deploy-and-scale-your-dask-cluster-with-kubernetes-9e43f7e24b04
  1. Launch the Dask-Helm chart and use my database to keep a count of how many workers I need and how many workers are active (so a Database Push before and After each Dask Process) and manually scale that way using client.cluster.scale(). Problem is, workers are again not terminated gracefully and a running task could be terminated instead.
  2. Using Dask-Kubernetes as you've outlined in this post and as I try and see if its right for us below.

The Actual Question:
I was wondering if this was the right way to do it, starting from where I left off using KubeCluster:

  • Code is sent to my Django Webserver inside of a Kubernetes pod
  • Create a new KubeCluster using a worker-spec for that specific task, and in that case I can define whether I want larger workers for more threads or small workers for more processes, using something like this:
pod_spec = make_pod_spec(image='daskdev/dask:latest',
                          memory_limit='4G', memory_request='4G',
                          cpu_limit=1, cpu_request=1,
                          env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'})
cluster = KubeCluster(pod_spec)
cluster.scale(10)
  • Scale the Kube Cluster to how much resources was defined by our analyst.
  • Let Google Kubernetes Engine handle scaling nodes to create space for the Kube Cluster
  • Close the Kube Cluster by calling cluster.close() and client.close() when that task is done.
  • Therefore, we don't handle scaling to Kubernetes, but keep it all within Dask.

Will spread the love if this is answered and I've understood that the last implementation as I outlined is the way to go! If I wrote something confusing, I'll be more than happy to correct myself.

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Jan 8, 2021

Thanks for the comment @omarsumadi (although it should probably be a new issue).

Thanks for the praise! And to answer your question, yes that workflow sounds totally reasonable. Although I would like to point you to Prefect which is a workflow management tool built on Dask. It sounds like you are trying to build the same thing.

cc @jcrist

@omarsumadi
Copy link

omarsumadi commented Jan 8, 2021

@jacobtomlinson Hey Jacob - ok great, that takes a lot of weight off my shoulders.

I'm new to Github, so I didn't want to make a new issue because it didn't fall under any of the categories of:

  • Bug Report, Feature Request, Security Vulnerability. General questions linked to Stack Overflow, so not sure what I would have classified this 'question' under any of the issue types.

What would you suggest doing next time if something like this came up? Also, you should let people sponsor you on Github!

Thanks!

Oh and about Perfect - I'll look into this. I use the word 'potential' analysts very strongly, as in, we are looking to get some people on board but still reaching out to funding. I'll reach out when the time comes hopefully, but nothing is in the bag right now!

@jacobtomlinson
Copy link
Member

Sure I understand that. This is probably the kind of question that would be asked on a forum, which we have discussed creating in the past.

Also, you should let people sponsor you on Github!

That's very kind of you, but I'll leave my employer to do the sponsoring 😄.

If you want to give back to the Dask community you can donate via NumFocus.

@bnaul
Copy link
Contributor

bnaul commented Mar 27, 2021

@jacobtomlinson I've been thinking about what adaptive scaling for HelmCluster could look like, curious what you think about a hybrid approach of the two current options (and also whether it overlaps or is even totally redundant with what you're thinking about in #318 dask/distributed#4605 etc).

Basically I was imagining that the scheduler would be managed externally by Helm, but would start with either no workers or a very limited static worker deployment. Then the cluster object would connect to the existing scheduler like HelmCluster but create+autoscale pods like KubeCluster. For us the benefits of this compared to just using KubeCluster would be 1) we can still use our existing simple helm setup in cases where we don't care about autoscaling, and 2) we have some tasks that require worker resources and getting autoscaling working for that seems extra complicated, so I was thinking this would let us autoscale "simple" non-resource-constrained work and manually add workers with resources when need.

@jacobtomlinson
Copy link
Member

Thanks for sharing your interest here @bnaul.

I think rather than trying to build some hybrid the best way to move forward would be a Dask operator for Kubernetes. We could have a DaskCluster resource which behaves much like a deployment but with intelligence around worker state.

Then we could shuffle both the Helm Chart and KubeCluster over to use this same resource. There is already some work in dask-gateway on an operator so I would imagine extending that. This would hugely reduce complexity in this project and allow much better code reuse than we have today.

@jacobtomlinson
Copy link
Member

The HelmCluster was removed in #890. Closing.

@jacobtomlinson jacobtomlinson closed this as not planned Won't fix, can't repro, duplicate, stale Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants