Skip to content

Commit

Permalink
Use queued tasks in adaptive target (#8037)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin authored Jul 28, 2023
1 parent d6758bd commit 9d9702e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
25 changes: 24 additions & 1 deletion distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from distributed.compatibility import LINUX, MACOS, WINDOWS
from distributed.metrics import time
from distributed.utils_test import async_poll_for, gen_test, slowinc
from distributed.utils_test import async_poll_for, gen_cluster, gen_test, slowinc


def test_adaptive_local_cluster(loop):
Expand Down Expand Up @@ -484,3 +484,26 @@ async def test_adaptive_stopped():
pc = instance.periodic_callback

await async_poll_for(lambda: not pc.is_running(), timeout=5)


@pytest.mark.parametrize("saturation", [1, float("inf")])
@gen_cluster(
client=True,
nthreads=[],
config={
"distributed.scheduler.default-task-durations": {"slowinc": 1000},
},
)
async def test_scale_up_large_tasks(c, s, saturation):
s.WORKER_SATURATION = saturation
futures = c.map(slowinc, range(10))
while not s.tasks:
await asyncio.sleep(0.001)

assert s.adaptive_target() == 10

more_futures = c.map(slowinc, range(200))
while len(s.tasks) != 200:
await asyncio.sleep(0.001)

assert s.adaptive_target() == 200
16 changes: 13 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import psutil
from sortedcontainers import SortedDict, SortedSet
from tlz import (
concat,
first,
groupby,
merge,
Expand All @@ -45,6 +46,7 @@
partition,
pluck,
second,
take,
valmap,
)
from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -8061,15 +8063,23 @@ def adaptive_target(self, target_duration=None):
target_duration = parse_timedelta(target_duration)

# CPU
queued = take(100, concat([self.queued, self.unrunnable]))
queued_occupancy = 0
for ts in queued:
if ts.prefix.duration_average == -1:
queued_occupancy += self.UNKNOWN_TASK_DURATION
else:
queued_occupancy += ts.prefix.duration_average

if len(self.queued) + len(self.unrunnable) > 100:
queued_occupancy *= (len(self.queued) + len(self.unrunnable)) / 100

# TODO consider any user-specified default task durations for queued tasks
queued_occupancy = len(self.queued) * self.UNKNOWN_TASK_DURATION
cpu = math.ceil(
(self.total_occupancy + queued_occupancy) / target_duration
) # TODO: threads per worker

# Avoid a few long tasks from asking for many cores
tasks_ready = len(self.queued)
tasks_ready = len(self.queued) + len(self.unrunnable)
for ws in self.workers.values():
tasks_ready += len(ws.processing)

Expand Down

0 comments on commit 9d9702e

Please sign in to comment.