-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask Futures performance as the number of tasks increases #5715
Comments
Thanks Marcin! 😄 I've simplified the script a bit in the process of playing with it locally just on my laptop. Also picked an Script:import time
from distributed import Client, wait
# params
ntasks = 10_000
nreps = 10
# initialize data
item = None
data = [item] * ntasks
labels = list(map(str, range(ntasks)))
# run workload
with Client() as c:
for irep in range(nreps):
tic = time.time()
futures = c.map(lambda e: e, data, key=labels)
wait(futures, return_when="ALL_COMPLETED")
print("time in seconds", time.time() - tic)
del futures As an interesting note when I run this locally (on my MacBook Pro (15-inch, 2018)), I see the following Results:
Interestingly it looks like the runtime here is variable. Admittedly I also have a whole bunch of browser tabs open and other applications running in the background. So this might not be the cleanest result, but interesting to notice the variability of the runtimes. Looks like you are not seeing this though assuming the black lines on each bar represent the range. |
Generally over the past year or so, we have been doing work to optimize the scheduler. See this blogpost for an overview. One of the key observations is Dask graphs track a lot of small details as users build them up. These then add overhead throughout in handling all of the tasks in this detailed way. To address this a proposal was made to use High Level Graphs (or HLGs for short), which would effectively aggregating many nodes into one. So data loading or creation could be a single node independent of how many operations this contains. Similarly other operations performed could be represented with HLGs. The result is an HLG could be built on the client, shipped to the scheduler, and converted to tasks for workers on-demand. Going down this path has involved a fair bit of engineering to transform existing operations to HLG based ones. Here's a good blogpost though work is ongoing. As moving to HLGs would increase the workload on the Scheduler, we spent time benchmarking the Scheduler and looking for slow points. So we spent time profiling workloads and optimizing the Scheduler in particular by using Cython annotations ( for example #4302 ). Once sufficiently optimized on that front, we determined communication was the primary bottleneck ( #4443 ). Work on this has continued on a few fronts. First aggregating communication into fewer, larger messages ( #4526 ). Second optimizations in serialization to traverse complex structures more efficiently ( #4699 , ongoing). Third optimizations in communication itself by moving to As noted some of this work is more recent or ongoing, a good recap is in this presentation. At this point think we are in a good place to benchmark this work a bit more. Think a benchmark that leverages HLGs in one of the builtin collections like Arrays or DataFrames would be a good starting place. |
Black lines are 95% confidence intervals. Looking at the raw timings, I see similar behavior. It is rare for 2048 tasks (between 0.07s and 0.47s) but quite common for 4096 tasks (0.16s - 1.08s). I have the scheduler running on a dedicated node of a Cray XC-40 here. Results for 4096 tasks:
|
Something else worth exploring here may be just using a custom |
Anecdotally, this high variability seems pretty familiar to me. See some similarly noisy results in #5036 (comment). Have you tried profiling these? I'd recommend https://github.com/gjoseph92/scheduler-profilers using py-spy with from scheduler_profilers import pyspy_on_scheduler, pyspy
with pyspy_on_scheduler("scheduler.json", native=True), pyspy("workers", native=True):
futures = client.map(dummy, data, key=labels)
wait(futures,return_when='ALL_COMPLETED') |
Curious if you have had a chance to try any of these things or if you have other questions @mrogowski 🙂 No worries if not. Just wanted to check-in |
Thanks, @jakirkham and @gjoseph92 and sorry for the late reply! I profiled as @gjoseph92 suggested, and it seems like the variability originates from |
@mrogowski if you used pyspy, can you share the full profile? |
Sure, here it is: pyspy.json.zip |
I have been running a performance comparison of Dask Futures and other solutions and found unexpected behavior when it comes to the throughput as I increase the number of tasks.
In this test, I am running on 6 nodes (client, scheduler and 4 workers, 1 thread per worker); however, the behavior is consistent for 2-2048 workers. Each task is a dummy function returning its argument. I am timing this:
where
data
is a list ofNone
,labels
is a unique key, and thedummy
function simply returns its argument. Complete code is attached.Based on 30 repetitions for each configuration and after discarding the first timing to account for any lazy initialization, I am getting the following results:
Is this characteristic something you would expect given such a trivial task? Is the scheduler not able to keep up with this many tasks?
Thank you for any insights!
Environment:
Complete code:
latency.py.txt
The text was updated successfully, but these errors were encountered: