You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Some workloads get bogged down because they generate data faster than they clean it up. This is commonly caused by very fast access to I/O, but relatively slower access to inter-worker communication, such as occurs in large reductions.
This has come up several times, notably in #2602 and ocean-transport/coiled_collaboration#3 . There are a few different things that we can do to solve these issues. This issue contains one part of a solution, tracking memory generation/cleaning for tasks, and slowing down the workers if necessary.
Identifying this situation, and identifying which tasks are good to run / should be slowed down, is hard. It's hard because it requires data tracking and decision making that crosses the scheduler and workers. I'll try to lay out a plan below.
Data collection
We should track how much data tasks create. Actually, we already do this on a TaskState level (TaskState.nbytes), but we need to generalize this on the TaskGroups (already at TaskGroup.nbytes_in_memory) and the TaskPrefix level (not there currently). TaskPrefixes are generally used for scheduling decisions, so maybe we should add it there. We'd like to be able to answer how much data each task is likely to create/consume.
We would also like to know how much data these tasks tend to clean up. I recommend that we also add new attributes like nbytes_freed onto TaskGroup/TaskPrefix , and modify that value in transition_processing_memory (this has been pushed into the _add_to_memory function).
This way we can track both how much memory we expect the outputs of various tasks to consume, and also how optimistic we can be around how much they might allow us to free. For example read_parquet or from_zarr are probably bad tasks in this regard, but sum or save are probably good.
This work can be done in isolation, and mostly involves modifying the TaskGroup/TaskPrefix classes, with suitable changes in transition methods, mostly just transition_processing_memory I think. There are still some tricky questions about how to measure averages/totals. For a while I was getting clever and trying to add time in there too, treating byte-seconds as the thing to minimize, but decided against it.
@TomAugspurger had an early attempt here: #2847 . I personally prefer tracking both the positive and the negative rather than the delta (as was done in that PR) in the spirit of collecting general purpose data, rather than data for specific applications (I hope that we find other uses for tracking the average output side of each task, for example).
Passing information to the worker
The Scheduler has the information about dependencies, and so is much better at learning a relationship about releasing data. But who makes the decision on which task to prioritize (maybe we have a sum and a read_parquet in the same worker)? And who makes the decision to stop computation on a worker? We have two choices
The scheduler (who is separated but has all of the memory information) makes this decision, and sends a signal down to the worker. This requires us to set up new signals around pausing/unpausing. It also requires the scheduler to know a bit more about how the worker is prioritizing
The worker (who is on the ground, but has less high level information) makes this decision. This requires us to send averages about memory creation/releasing to the worker, possibly when we send the task itself, or alternatively in some periodic information update in the heartbeat.
Currently option 2 feels slightly nicer to me at the moment. I have low certainty here. I think that we could accomplish this by sending the expected memory delta to the worker when we send the new task. When the worker identified that it was under a set of conditions then it would choose to pause. A first pass at those conditions follow:
Memory is above a certain amount
We have ongoing communications, either in or out (want to avoid the whole cluster seizing up)
???
I'm also curious if the expected memory delta should override the dask.order prioritization in the worker. I suspect that they're already pretty similar.
Next steps
There are many possible options in the decision making process that we could consider. I expect this to take some experimentation.
However I do think that tracking some more data around memory consumption and freeing would be a good start. It's also an easy-ish starter issue that gets folks into this part of the codebase.
The text was updated successfully, but these errors were encountered:
mrocklin
changed the title
Deprioritize/pause tasks that generate memory
Deprioritize/pause tasks that consume memory
Jun 8, 2021
I'm inclined to wait on this until after we observe the impact of neighbor/ordering/sibling scheduling on critical workloads. If this isn't necessary for those workloads then I would be happy to deprioritize this.
Some workloads get bogged down because they generate data faster than they clean it up. This is commonly caused by very fast access to I/O, but relatively slower access to inter-worker communication, such as occurs in large reductions.
This has come up several times, notably in #2602 and ocean-transport/coiled_collaboration#3 . There are a few different things that we can do to solve these issues. This issue contains one part of a solution, tracking memory generation/cleaning for tasks, and slowing down the workers if necessary.
Identifying this situation, and identifying which tasks are good to run / should be slowed down, is hard. It's hard because it requires data tracking and decision making that crosses the scheduler and workers. I'll try to lay out a plan below.
Data collection
We should track how much data tasks create. Actually, we already do this on a
TaskState
level (TaskState.nbytes
), but we need to generalize this on theTaskGroup
s (already atTaskGroup.nbytes_in_memory
) and theTaskPrefix
level (not there currently). TaskPrefixes are generally used for scheduling decisions, so maybe we should add it there. We'd like to be able to answer how much data each task is likely to create/consume.We would also like to know how much data these tasks tend to clean up. I recommend that we also add new attributes like
nbytes_freed
onto TaskGroup/TaskPrefix , and modify that value intransition_processing_memory
(this has been pushed into the_add_to_memory
function).This way we can track both how much memory we expect the outputs of various tasks to consume, and also how optimistic we can be around how much they might allow us to free. For example
read_parquet
orfrom_zarr
are probably bad tasks in this regard, butsum
orsave
are probably good.This work can be done in isolation, and mostly involves modifying the TaskGroup/TaskPrefix classes, with suitable changes in transition methods, mostly just transition_processing_memory I think. There are still some tricky questions about how to measure averages/totals. For a while I was getting clever and trying to add time in there too, treating byte-seconds as the thing to minimize, but decided against it.
@TomAugspurger had an early attempt here: #2847 . I personally prefer tracking both the positive and the negative rather than the delta (as was done in that PR) in the spirit of collecting general purpose data, rather than data for specific applications (I hope that we find other uses for tracking the average output side of each task, for example).
Passing information to the worker
The Scheduler has the information about dependencies, and so is much better at learning a relationship about releasing data. But who makes the decision on which task to prioritize (maybe we have a
sum
and aread_parquet
in the same worker)? And who makes the decision to stop computation on a worker? We have two choicesCurrently option 2 feels slightly nicer to me at the moment. I have low certainty here. I think that we could accomplish this by sending the expected memory delta to the worker when we send the new task. When the worker identified that it was under a set of conditions then it would choose to pause. A first pass at those conditions follow:
I'm also curious if the expected memory delta should override the dask.order prioritization in the worker. I suspect that they're already pretty similar.
Next steps
There are many possible options in the decision making process that we could consider. I expect this to take some experimentation.
However I do think that tracking some more data around memory consumption and freeing would be a good start. It's also an easy-ish starter issue that gets folks into this part of the codebase.
The text was updated successfully, but these errors were encountered: