Skip to content

Commit

Permalink
Hack to avoid pausing
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 3, 2023
1 parent b0f0ba9 commit 2114a7b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
4 changes: 4 additions & 0 deletions distributed/spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class AsyncBufferProto(Protocol, Collection[str]):
This is public API.
"""

#: Currently running / queued futures handling asynchronous spill/unspill activity
#: This is a temporary part of the protocol and will be removed soon.
futures: set[asyncio.Future]

def __setitem__(self, key: str, value: object) -> None:
...

Expand Down
13 changes: 13 additions & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2389,6 +2389,19 @@ async def _prepare_args_for_execution(
# This may potentially take many seconds if it involves unspilling
if isinstance(self.data, AsyncBufferProto):
with context_meter.meter("zict-offload"):
# FIXME Hack around TODO
# Avoid reaching the pause threshold by pushing through a lot of tasks
# that don't need to unspill dependencies. If you have 1
# async_evict_until_below_target future and 1 execute() / gather_dep()
# future spilling their outputs, or 2 execute() / gather_dep() spilling
# outputs, wait before you start running this task, even if all
# dependencies are in fast (or there are no dependencies).
while len(self.data.futures) > 1:
await asyncio.wait(
self.data.futures, return_when=asyncio.FIRST_COMPLETED
)
# End hack

data.update(await self.data.async_get(keys))
else:
for k in keys:
Expand Down

0 comments on commit 2114a7b

Please sign in to comment.