Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client.map() keys as a list breaks Dask queue system #8671

Open
arneyjfs opened this issue Jun 5, 2024 · 6 comments
Open

Client.map() keys as a list breaks Dask queue system #8671

arneyjfs opened this issue Jun 5, 2024 · 6 comments

Comments

@arneyjfs
Copy link

arneyjfs commented Jun 5, 2024

Describe the issue:
This could not be a bug, but if not then it is unclear to me from the docs why dask behaves like this.
When passing a list of values to client.map() to use as keys, the queueing system seems to break.

In the reproducible example:

  • If keys is empty or a constant string and I submit 1000 simple jobs, 282 move to processing (i have 282 cores), the rest are queued.
  • If keys are a list (length matching the input iterables), they all instantly move to processing.

Minimal Complete Verifiable Example:

from distributed import Client, progress


def simple_task_wait_60s(id: int, logger=None):
    from time import sleep
    sleep(10)
    return 0


if __name__ == '__main__':
    n = 15000
    iterabs = [str(i) for i in range(1000)]

    c = Client(address='tcp://xxx.xxx.xxx.xxx:8786')

    data = c.map(simple_task_wait_60s,
                 iterabs,
                 pure=False,
                 # key=iterabs
                 )
    progress(data)
    results = c.gather(data)

The above produces the following output on the dashboard. Notice the number of jobs queued vs. processing:
image

Uncommenting the line key=iterabs produces the following instead. Notice queued=0:
image

Anything else we need to know?:

Environment:

  • Dask version: dask=2024.5.2, distributed=2024.5.2
  • Python version: 3.10
  • Operating System: jobs submitted from macOS 14.5, scheduler and worker running on Ubuntu 22.04.3 LTS
  • Install method (conda, pip, source): pip (poetry)
@fjetter
Copy link
Member

fjetter commented Jun 5, 2024

Well, I think this is neither a bug nor a feature. It's a bit of both I would say. Dask is internally relying on the structure of the keys and if you change this structure by providing your own keys, some assumptions are broken.
You see this, for instance, in the dashboard not only because the tasks are not queued but also because there is now a progressbar for each task individually. Dask essentially no longer notices that all of those tasks are similar and treats them as individual groups. Internally these are TaskPrefixes and in a nutshell tasks are grouped based on the string prefixes of the keys (keys can also be tuples which is why this definition is a little more complex in reality)

if you do something like iterabs = ["foo-" + str(i) for i in range(1000)] instead, it will just work.

The task queuing unfortunately relies on task groups which is why this isn't working.

@arneyjfs
Copy link
Author

arneyjfs commented Jun 5, 2024

This is a great bit of information, thank you! I like your solution to use iterabs = ["foo-" + str(i) for i in range(1000)], but doesn't this also mean that if users are submitting jobs separately, then unless they all choose the same prefix, their tasks will be queued separately?

I.e. if i have 1 worker with 200 cores

  • Bob queues ["foo-" + str(i) for i in range(1000)]
  • Alice queues ["bar-" + str(i) for i in range(1000)]

Then the worker will still be trying to process 200 from each queue and will be overloaded?

What I was hoping for was the 2 sets of tasks to have different priorities, and be queued together, then all the highest priority tasks would be done first (from whichever set)

@fjetter
Copy link
Member

fjetter commented Jun 5, 2024

Queuing is not exactly what you are imagining I think. The queuing refers to when we are submitting a task to a worker and not when that work is actually being executed. To impact the order of execution, there is the priority keyword.
If no priority is provided we are processing tasks in the order in which they were submitted. We consider tasks that are submitted within a short time window of equal priority, see fifo_timeout. This would also happen if two clients would submit things simultaneously

@arneyjfs
Copy link
Author

arneyjfs commented Jun 5, 2024

Ah I see I think I have misundertood that then.
This part of the docs makes it sound like the job will stay on the scheduler until there is space on a worker and then be assigned to that worker. In which case having one queue and making sure a worker doesn't pick up more work than it can handle would be optimal.

From what you are saying though, it sounds like the work is immediately scheduled to the least busy worker, even if there is technically no available resource on that worker. Is that right? If so is there any workaround to make it behave like the above? Essentially I need a prioritised queue, where in/out order doesn't really matter but priority does, and crucially, a higher priority task should be picked up regardless of the time of submission or if it has a different TaskPrefix.

If no way to do that, then would ensuring all tasks have the same prefix achieve this behaviour?

@fjetter
Copy link
Member

fjetter commented Jun 5, 2024

From what you are saying though, it sounds like the work is immediately scheduled to the least busy worker, even if there is technically no available resource on that worker. Is that right?

That is the default behaviour, yes. For most applications this should not concern you. This entire queuing thing was implemented to manage memory pressure for some very specific array workloads. In most situations a user will not care about this. There is also some logic that rebalances tasks between workers if some have too many tasks assigned while otehrs idle.

Essentially I need a prioritised queue, where in/out order doesn't really matter but priority does, and crucially, a higher priority task should be picked up regardless of the time of submission or if it has a different TaskPrefix.

Forget all about "task queuing". "Task queuing" is an internal mechanism that users should rarely bother with and this is not what you are looking for.
You are looking for the priority keyword in Client.submit, Client.map, etc. and you can even use a ctx manager to set priorities using annotations, see https://docs.dask.org/en/stable/api.html#dask.annotate

@arneyjfs
Copy link
Author

arneyjfs commented Jun 5, 2024

There is also some logic that rebalances tasks between workers if some have too many tasks assigned while otehrs idle.

I think this was a key piece of information, thank you. So even if a worker has 'picked up' a task, it essentially does not mean it will be the one to run it (it can still be rebalanced). It also does not mean it will be run imminently (it can still be affected by other higher priority tasks).

This is very useful clarification, I appreciate your time to explain it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants