-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Queue up non-rootish tasks if they break priority ordering #7526
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible here that tasks that are queued but have restrictions could block up the queue.
Say 1 task slot opens up on worker A, so in stimulus_queue_slots_maybe_opened
we peek the first task on the queue and transition it to processing
. That task is restricted to only run on worker B, so we don't schedule it. But next in the queue is a task without restrictions. We should have run that immediately, but instead, worker A's thread will just remain unused. The restricted task remains at the front of the queue.
The next time a task slot opens (say on worker C), we'll peek 2 tasks, so the second, unrestricted task on the queue does get scheduled. But the first, restricted task stays at the front of the queue.
In general, we'll only schedule task_slots_available - num_unrunnable_queued_tasks
each time, when we should be scheduling task_slots_available
tasks. The restricted tasks will slowly pile up at the front of the queue.
We'd have to switch from a peekn
here to a linear search through the queue, like I mentioned in #7496 (comment). In that case, a HeapSet is probably the wrong data structure and we'd want a SortedSet instead. But also, when there are a lot of unrunnable tasks on the queue (which I think there would be in the shuffling case), an operation that's currently O(1) in the typical case would get more expensive. It could be worth implementing this and trying it out—maybe in reality the performance cost isn't so bad—but theoretically it does seem slower.
distributed/scheduler.py
Outdated
self.queued.discard(ts) | ||
worker_msgs = self._add_to_processing(ts, ws) | ||
# If no worker, task just stays `queued` | ||
if self.is_rootish(ts): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change does not enable queuing for restricted tasks directly but only for lower priority tasks, i.e. the peek still works. I think this argument is still valid but I suspect you'd need to write a pretty custom graph to actually trigger something like this. |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 24 files ± 0 24 suites ±0 10h 31m 58s ⏱️ + 5m 58s For more details on these failures, see this check. Results for commit 7f6e80c. ± Comparison against base commit 63ae1db. ♻️ This comment has been updated with latest results. |
Yes, the change does guarantee that there are some tasks ahead of the restricted task when it's inserted. But at some point, all those will be completed and it'll end up at the front of the queue. At that point, the restricted task will take up space at the front of the queue for a while until the right worker opens. Unless I'm missing some mechanism that would pull the restricted tasks out from the middle of the queue and schedule them? I imagine this works okay in the context of P2P. I'm more worried about other cases where people are already using restrictions. For example, if you have some huge 30min tasks that use resource restrictions to only run on a couple workers, once those workers are full, other tasks with those restrictions could clog up the front of the queue for 30min until a viable slot opens again. The problem I'm describing is almost a scheduler corollary of #6136, which is kind of interesting. |
There is no explicit mechanism from prohibiting the queue to block up. However, if a task was popped that can not run, it should be moved back to waiting/constrained and would not block an actual slot. whenever the next task finishes, we'd then schedule two queued tasks instead of one. Considering how unlikely this is, I'm not worried about it, yet. Before I'll engage on any further theoretical back and forth I'll check what kind of tests are failing |
For some reason our CI runs froze during |
Interestingly, I found a case where queuing breaks depth first even without any restrictions. Currently investigating |
b70bcc1
to
7f6e80c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In one of our test cases (test_dataframe_set_index_sync
) I encountered that tasks were queued up because they would break priority ordering but the queued up tasks didn't have any restrictions. I haven't entirely understood what's happening here, yet.
What's even more interesting is that the behavior is very easy to trigger with cient.compute but not with dask.compute
Code to reproduce (This is the inner workings of set_index to calc quantiles from the test case. I haven't tried reproducing this with a simpler example
import dask
from dask.utils import M
from distributed import Client
with Client() as client:
df = dask.datasets.timeseries(
start="2000",
end="2001",
dtypes={"value": float, "name": str, "id": int},
seed=1,
)
partition_col = df["name"]
divisions = partition_col._repartition_quantiles(df.npartitions)
mins = partition_col.map_partitions(M.min)
maxes = partition_col.map_partitions(M.max)
# client.gather(client.compute([mins, maxes, divisions]))
dask.compute([mins, maxes, divisions])
Rendering a smaller version of this graph shows one anomaly (left hand side, number 28)
I can see how this node is identified as being of lower priority than other root tasks ( which in this specific case is not true but possibly for larger graphs) although I would argue this is wrong since its dependency has to be held in memory way longer. I'm not sure if dask.order considers fan-out dependents to be smaller/more important than their dependencies.
Indeed we can see that with these modifications, the quantile computation has a higher memory footprint because we'd push back 28 into the queue instead of running it eagerly once possible.
I have no idea why I don't see this problem with dask.compute
if self._is_rootish_no_restrictions(ts): | ||
assert not self.idle_task_count, (ts, self.idle_task_count) | ||
self._validate_ready(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an earlier version I asserted that if a task is not rootish, it would have restrictions. This caused CI to fail very hard.
It turns out that running non-rootish tasks with lower priority than queued tasks is not impossible
I have a couple of very interesting results from benchmarking then comparing this branch to #7531 i.e. the benchmarking results include exclusively the changes corresponding to queuing up tasks that would otherwise execute too early given their priority https://github.com/coiled/coiled-runtime/actions/runs/4172251035 There are a couple of minor/mediocre speedups in wall time for some tests but that's not very exciting. However, the memory footprint goes through the roof! These are rather early results but this lets me think that a major contributing factor to the success of queuing is in fact that we are breaking priority ordering in a very specific way (This is not the only contributor). |
This is interesting. I some research on queuing and priority ordering a couple months ago, when trying to add co-assignment. This has motivated me to finally post that writeup (in unedited, too-long form). @fjetter you might find the videos and discussion in the middle interesting: #7555. |
Thanks. I'll have a look as soon as I find the time. I'm closing this PR. Not following priorities strictly is apparently an unintended(?) perk of task queuing so this PR obviously goes in the wrong direction. However, I believe this motivates a couple of interesting experiments |
First of all, I hate this but it's a rather pragmatic solution that should only have any real effect if something bad happens
Closes #7496
Queuing does not support any worker restrictions and can therefore break priority task ordering by assigning non-queued, lower prio tasks to workers before it gets a chance to de-queue the queued up tasks.
This is more or less a relict of how queuing is enabled since it prefers scheduling non-queued tasks first. Basically we're calling
stimulus_queue_slots_maybe_opened
after transitions which causes the non-queued tasks to be transitioned first before checking on queued tasks.From this perspective, what I'm proposing here is a bit of an ugly work around but it's pretty straight forward. So far, I only verified it on the actual P2P problem presented in #7496 and it works as expected. Will need to look into a test now