Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload CPU intensive sections of update graph to unblock event loop #8049

Merged
merged 10 commits into from
Aug 4, 2023

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jul 28, 2023

Closes #7980

Depends on #8047

The changes newly introduced in this PR are rather minimal besides moving things around. I'll prepare this PR in a way to have two separate commits, possibly breaking it out into two PRs.

  1. The actual refactoring where things are logically regrouped
  2. The actual offloading (very minimal once the code is rearranged)

@fjetter fjetter changed the title Update graph split static part Offload CPU intensive sections of update graph to unblock event loop Jul 28, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Jul 28, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       20 files  ±       0         20 suites  ±0   11h 18m 26s ⏱️ + 51m 34s
  3 752 tests  -        4    3 643 ✔️ +       1     104 💤  -   4  5 ±0 
36 477 runs  +1 410  34 805 ✔️ +1 458  1 664 💤  - 48  8 +1 

For more details on these failures, see this check.

Results for commit 8e7c690. ± Comparison against base commit 4633e6e.

This pull request removes 4 tests.
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---nanny]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---no-nanny]
distributed.protocol.tests.test_numpy
distributed.shuffle.tests.test_rechunk

♻️ This comment has been updated with latest results.

@fjetter fjetter force-pushed the update_graph_split_static_part branch from 0762797 to a2f1094 Compare August 1, 2023 12:45
@fjetter

This comment was marked as outdated.

distributed/_graph.py Outdated Show resolved Hide resolved
if isinstance(annotations, ToPickle):
annotations = annotations.data # type: ignore[unreachable]
start = time()
async with self._update_graph_lock:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: add a test for this

Maybe one of

  • Multiple clients doing a thing where order matters
  • same client submitting two things where one deserialization is slow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock is there to ensure ordering between subsequent update_graph calls. It is not strictly required since the fact that the offload TPE only has a single worker should ensure ordering but I wouldn't want to rely on this alone. The lock is cheap and this is the only place we're using it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some more review, I'm pretty confident that this lock is not required.

  • Ensure stream messages are always ordered #8059 ensures that same client messages are always ordered, i.e. entire update_graph calls will be ordered regardless of what kind of concurrency is applied within
  • As for the matter of multiple clients, I was trying to come up with ways where clients can share tasks. With the exception of somebody literally hard coding future keys in their graphs, this is possible using Variables, publish_dataset. To publish that future and retrieve it from another client, we'd also need two ops from the first client update_graph + set_var which is guaranteed to be ordered properly. Therefore, as soon as another client is able to interact with the task/future it is already guaranteed to be registered to the scheduler (completed update_graph)

If I missed anything, please let me know.

@fjetter fjetter force-pushed the update_graph_split_static_part branch from 8444397 to 802f67f Compare August 3, 2023 08:58
@fjetter fjetter mentioned this pull request Aug 3, 2023
distributed/scheduler.py Show resolved Hide resolved
Comment on lines 4546 to 4547
graph = deserialize(graph_header, graph_frames).data
del graph_header, graph_frames
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't understood this well but apparently something in deserialize is offloading as well causing a deadlock when this is offloaded.
I don't truly understand where this is happening yet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the offload thread pool has only a single thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it does. I haven't understood where we're even offloading anything when walking down this. Particularly the test I encountered this basically just submitted an (inc, 1) which should not trigger our ordinary offloading threshold

@fjetter fjetter force-pushed the update_graph_split_static_part branch from 7dfe5cf to c002bd9 Compare August 3, 2023 17:35
@fjetter
Copy link
Member Author

fjetter commented Aug 3, 2023

That update_graph now allows for ticks to go through allows for interesting timing changes. Nothing critical but some tests may be sensitive to this. For instance, test_steal_more_attractive_tasks was getting more flaky because it basically relied on all tasks being registered in the same tick but with the offloading this could not be guaranteed.

We may need to only offload graphs of a certain size. I'll run a couple of benchmarks next to see if I can measure any difference of small graphs and will base this decision on the data I'll get from that.

@mrocklin
Copy link
Member

mrocklin commented Aug 3, 2023

That update_graph now allows for ticks to go through allows for interesting timing changes. Nothing critical but some tests may be sensitive to this.

I'll say some things that you already know I'm sure, but just to be pedantic.

The scheduler state is not threadsafe. If we're modifying state in the offload thread then I would expect many subtle bugs. When offloading I would expect the offload thread to do lots of processing of stuff, but I would then expect it to deliver those things to the main event loop thread, where that thread would update scheduler state all at once (necessarily blocking a bit)

@fjetter
Copy link
Member Author

fjetter commented Aug 4, 2023

The scheduler state is not threadsafe. If we're modifying state in the offload thread then I would expect many subtle bugs. When offloading I would expect the offload thread to do lots of processing of stuff, but I would then expect it to deliver those things to the main event loop thread, where that thread would update scheduler state all at once (necessarily blocking a bit)

Yes, I am aware of the thread safety issues. This is why I lead with a refactoring that rearranged the code. The issue I am talking about is much more subtle and an issue about asyncio concurrency (and ordering to a certain extend). I expanded the comment in the test test_steal_more_attractive_tasks to explain the situation more thoroughly in case you are interested.

# The submits below are all individual update_graph calls which are very
# likely submitted in the same batch.
# Prior to https://github.com/dask/distributed/pull/8049, the entire batch
# would be processed by the scheduler in the same event loop tick.
# Therefore, the first PC `stealing.balance` call would be guaranteed to see
# all the tasks and make the correct decision.
# After the PR, the batch is processed in multiple event loop ticks, so the
# first PC `stealing.balance` call would potentially only see the first
# tasks and would try to rebalance them instead of the slow and heavy one.
# To guarantee that the stealing extension sees all tasks, we're stopping
# the callback and are calling balance ourselves once we are certain the
# tasks are all on the scheduler.
# Related https://github.com/dask/distributed/pull/5443
await ext.stop()

We likely stumbled over the same problem in the past already #5443

@fjetter fjetter marked this pull request as ready for review August 4, 2023 07:14
@fjetter
Copy link
Member Author

fjetter commented Aug 4, 2023

I haven't touched any actual logic of update_graph in this PR. Apologies for the messy diff

@hendrikmakait hendrikmakait self-requested a review August 4, 2023 11:45
Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some nits around readability. I haven't checked every last detail of the reordering but CI doesn't complain.

distributed/scheduler.py Outdated Show resolved Hide resolved
Comment on lines +656 to +668
# The submits below are all individual update_graph calls which are very
# likely submitted in the same batch.
# Prior to https://github.com/dask/distributed/pull/8049, the entire batch
# would be processed by the scheduler in the same event loop tick.
# Therefore, the first PC `stealing.balance` call would be guaranteed to see
# all the tasks and make the correct decision.
# After the PR, the batch is processed in multiple event loop ticks, so the
# first PC `stealing.balance` call would potentially only see the first
# tasks and would try to rebalance them instead of the slow and heavy one.
# To guarantee that the stealing extension sees all tasks, we're stopping
# the callback and are calling balance ourselves once we are certain the
# tasks are all on the scheduler.
# Related https://github.com/dask/distributed/pull/5443
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the context!

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
@hendrikmakait
Copy link
Member

I don't remember seeing test_decide_worker_rootish_while_last_worker_is_retiring fail, we should investigate that before merging.

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_decide_worker_rootish_while_last_worker_is_retiring timed out 8/400 times on your PR but never on main. Please investigate. The test might very well be timing-sensitive, but I would like to confirm that.

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for investigating; I can confirm that the fix works locally on my machine (2000 runs).

Ready to merge, assuming CI doesn't bring up anything new.

try:
deps = dependencies[key]
except KeyError:
deps = {ts.key for ts in self.tasks[key].dependencies}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit scary. I added type annotations and this was flagged as having incompatible types, which is true! self.tasks[key].dependencies are TaskState objects and this would quite naturally be incompatible with basically everything that is using this below. All is in checks will therefore yield false positives.

No idea what this will do...

However, this may explain why the line below is actually not covered on main

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. the earlier version was just self.tasks[key].dependencies but I converted to the str version

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't flagged in main because I only annotated dependencies with dict instead of dict[str, set[str]]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type hints FTW!

if dep in dependents:
child_deps = dependents[dep]
elif dep in self.tasks:
child_deps = {ts.key for ts in self.tasks[key].dependencies}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@fjetter
Copy link
Member Author

fjetter commented Aug 4, 2023

For the sake of clarity, this now includes an unintentional logic change #8049 (comment)

@fjetter
Copy link
Member Author

fjetter commented Aug 4, 2023

CI seems to be happy (considering the circumstances). There are shuffle errors (didn't merge in henrdiks PR, yet) and there is one failure test_gather_dep_one_worker_always_busy that goes back to the gilknocker timeout. All other things are known offenders.

I wouldn't be surprised to see this introducing a couple of flakes but these are typically easy to handle

@fjetter fjetter merged commit b0d38a1 into dask:main Aug 4, 2023
18 of 25 checks passed
@fjetter fjetter deleted the update_graph_split_static_part branch August 4, 2023 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Offload CPU intensive sections of update_graph
3 participants