From 4d348fac6521f656e0fd0658e63792adfc6dc709 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 26 Apr 2023 16:45:06 -0500 Subject: [PATCH] Use ProcessPool workers --- tests/conftest.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index b5f2c47c4c..7a5fee69fc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,7 @@ import threading import time import uuid +from concurrent.futures import ProcessPoolExecutor from functools import lru_cache import dask @@ -93,6 +94,7 @@ def get_coiled_runtime_version(): { "coiled.account": "dask-benchmarks", "distributed.admin.system-monitor.gil.enabled": True, + "distributed.worker.daemon": False, } ) @@ -514,6 +516,15 @@ def small_client( small_cluster.scale(n_workers) client.wait_for_workers(n_workers) + # Swap out the default threadpool for a processpool on workers + class ProcessPoolWorkers(WorkerPlugin): + def setup(self, worker): + worker.executors["default"] = ProcessPoolExecutor( + max_workers=worker.state.nthreads + ) + + client.register_worker_plugin(ProcessPoolWorkers()) + with upload_cluster_dump(client): log_on_scheduler(client, "Finished client setup of %s", test_label)