diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index fcc15ee0e9..3aae49f346 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -19,7 +19,7 @@ ) from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.metrics import time -from distributed.utils_test import async_poll_for, gen_test, slowinc +from distributed.utils_test import async_poll_for, gen_cluster, gen_test, slowinc def test_adaptive_local_cluster(loop): @@ -484,3 +484,26 @@ async def test_adaptive_stopped(): pc = instance.periodic_callback await async_poll_for(lambda: not pc.is_running(), timeout=5) + + +@pytest.mark.parametrize("saturation", [1, float("inf")]) +@gen_cluster( + client=True, + nthreads=[], + config={ + "distributed.scheduler.default-task-durations": {"slowinc": 1000}, + }, +) +async def test_scale_up_large_tasks(c, s, saturation): + s.WORKER_SATURATION = saturation + futures = c.map(slowinc, range(10)) + while not s.tasks: + await asyncio.sleep(0.001) + + assert s.adaptive_target() == 10 + + more_futures = c.map(slowinc, range(200)) + while len(s.tasks) != 200: + await asyncio.sleep(0.001) + + assert s.adaptive_target() == 200 diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ca612cd9ba..4649dc205d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -37,6 +37,7 @@ import psutil from sortedcontainers import SortedDict, SortedSet from tlz import ( + concat, first, groupby, merge, @@ -45,6 +46,7 @@ partition, pluck, second, + take, valmap, ) from tornado.ioloop import IOLoop @@ -8061,15 +8063,23 @@ def adaptive_target(self, target_duration=None): target_duration = parse_timedelta(target_duration) # CPU + queued = take(100, concat([self.queued, self.unrunnable])) + queued_occupancy = 0 + for ts in queued: + if ts.prefix.duration_average == -1: + queued_occupancy += self.UNKNOWN_TASK_DURATION + else: + queued_occupancy += ts.prefix.duration_average + + if len(self.queued) + len(self.unrunnable) > 100: + queued_occupancy *= (len(self.queued) + len(self.unrunnable)) / 100 - # TODO consider any user-specified default task durations for queued tasks - queued_occupancy = len(self.queued) * self.UNKNOWN_TASK_DURATION cpu = math.ceil( (self.total_occupancy + queued_occupancy) / target_duration ) # TODO: threads per worker # Avoid a few long tasks from asking for many cores - tasks_ready = len(self.queued) + tasks_ready = len(self.queued) + len(self.unrunnable) for ws in self.workers.values(): tasks_ready += len(ws.processing)