Skip to content

Commit

Permalink
TST: Use safer context for ProcessPoolExecutor
Browse files Browse the repository at this point in the history
When running `test_process_executor_kills_process` just by itself, I get
a warning that threads have been used and that the `fork`
multiprocessing context is unsafe. If I run the full test suite, then
the same test hangs.

I don't know where the threads are coming from (as I didn't use xdist),
but explicitly use the safer `spawn` multiprocess context to avoid these
issues.
  • Loading branch information
QuLogic committed Jun 21, 2024
1 parent adcb045 commit 885355b
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
7 changes: 4 additions & 3 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import importlib
import itertools
import logging
import multiprocessing
import os
import random
import sys
Expand Down Expand Up @@ -2199,7 +2200,7 @@ async def test_bad_executor_annotation(c, s, a, b):

@gen_cluster(client=True)
async def test_process_executor(c, s, a, b):
with ProcessPoolExecutor() as e:
with ProcessPoolExecutor(mp_context=multiprocessing.get_context("spawn")) as e:
a.executors["processes"] = e
b.executors["processes"] = e

Expand Down Expand Up @@ -2231,7 +2232,7 @@ def kill_process():

@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
async def test_process_executor_kills_process(c, s, a):
with ProcessPoolExecutor() as e:
with ProcessPoolExecutor(mp_context=multiprocessing.get_context("spawn")) as e:
a.executors["processes"] = e
with dask.annotate(executor="processes", retries=1):
future = c.submit(kill_process)
Expand All @@ -2254,7 +2255,7 @@ def raise_exc():

@gen_cluster(client=True)
async def test_process_executor_raise_exception(c, s, a, b):
with ProcessPoolExecutor() as e:
with ProcessPoolExecutor(mp_context=multiprocessing.get_context("spawn")) as e:
a.executors["processes"] = e
b.executors["processes"] = e
with dask.annotate(executor="processes", retries=1):
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_worker_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import multiprocessing
import time
from collections.abc import Collection, Hashable
from concurrent.futures import ProcessPoolExecutor
Expand Down Expand Up @@ -141,7 +142,7 @@ async def test_custom_executor(c, s, a):
"""Don't try to acquire in-thread metrics when the executor is a ProcessPoolExecutor
or a custom, arbitrary executor.
"""
with ProcessPoolExecutor(1) as e:
with ProcessPoolExecutor(1, mp_context=multiprocessing.get_context("spawn")) as e:
# Warm up executor - this can take up to 2s in Windows and MacOSX
e.submit(inc, 1).result()

Expand Down

0 comments on commit 885355b

Please sign in to comment.