Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More dask features #959

Merged
merged 15 commits into from
Nov 16, 2023
Merged

More dask features #959

merged 15 commits into from
Nov 16, 2023

Conversation

normanrz
Copy link
Member

@normanrz normanrz commented Oct 25, 2023

Description:

  • Adds SIGINT handling to DaskExecutor
  • Adds support for resources (e.g. mem, cpus) to DaskExecutor
  • Runs tasks in their own process to not block the GIL, which dask workers need internally to communicate with the scheduler; also propagates env variables to the task processes
  • The address for the dask Client can be configured via DASK_ADDRESS env var (should we rename that?)

Todos:

  • Updated Changelog

@normanrz normanrz self-assigned this Oct 25, 2023
@normanrz normanrz marked this pull request as ready for review October 25, 2023 15:54
Copy link
Member

@daniel-wer daniel-wer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code LGTM 👍

Is the DaskExecutor tested in the CI or did you test it manually?

cluster_tools/cluster_tools/executors/dask.py Outdated Show resolved Hide resolved
@normanrz
Copy link
Member Author

Is the DaskExecutor tested in the CI or did you test it manually?

Well, there were tests but the CI didn't run them. Now, they do.

@normanrz normanrz requested a review from daniel-wer November 10, 2023 15:21
Copy link
Member

@daniel-wer daniel-wer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, LGTM 👍

@@ -328,7 +331,8 @@ def run_map(executor: cluster_tools.Executor) -> None:
assert list(result) == [4, 9, 16]

for exc in get_executors():
run_map(exc)
if not isinstance(exc, cluster_tools.DaskExecutor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this specific test excluded for the DaskExecutor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures of the DaskExecutor become invalid when the executor is closed. This makes this test invalid. I was thinking about removing this test or making this test fail for all executors (probably a bit of effort).
cc @philippotto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thoughts:

  • yes, we should strive for similar behavior between dask and slurm. --> (A) the futures should become invalid for the slurm context, too OR (B) (maybe I'd prefer this?) we wrap/copy the results in/into different future objects that survive the context termination. Or is there a benefit in letting the futures die? the copying could be done upon context exit.
  • if we do (A), this would be a breaking change (and likely needs fixing in vx etc). therefore, I'd tackle this in a separate PR.
  • either way, the test itself should not be removed without replacement. I think, what the test intends to assert is that the iterator that is returned by map contains futures that were kicked off before the iterator is consumed (read the comment here). essentially, this is covering an implementation detail, but the overall expected behavior is that that the map call eagerly submits all futures, but lazily awaits its results (so that they don't need to be in RAM all at once). the test exploits the use-futures-after-context-was-shutdown-behavior to test the eager submit (if it was not eager, the test would fail because the submit would be after context exit). if you remove that behavior, the eager submits should still be checked for in my opinion.

I hope this is somewhat comprehensible. If not, let's have a call 🤙

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe dask transfers the data lazily from the workers or scheduler. That doesn't work anymore, once the client closes. We could wrap the futures to eagerly collect the data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could wrap the futures to eagerly collect the data.

Yes, either this, or try to hook into the closing client (so that collection is done in the last moment where it's possible).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave that for a followup and merge this as is. Ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure 👍

@normanrz normanrz merged commit 336e2b6 into master Nov 16, 2023
33 checks passed
@normanrz normanrz deleted the dask-next branch November 16, 2023 13:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants