Skip to content

Commit

Permalink
implement limited WorkerPool size
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Jan 25, 2021
1 parent 116637a commit 165b60e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
28 changes: 25 additions & 3 deletions execnet/gateway_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,29 @@ def reraise(cls, val, tb):
# def log_extra(*msg):
# f.write(" ".join([str(x) for x in msg]) + "\n")

if sys.version_info >= (3, 7):
from contextlib import nullcontext
else:
class nullcontext(object):
"""Context manager that does no additional processing.
Used as a stand-in for a normal context manager, when a particular
block of code is only sometimes used with a normal context manager:
cm = optional_cm if condition else nullcontext()
with cm:
# Perform operation, using optional_cm if condition is True
"""

def __init__(self, enter_result=None):
self.enter_result = enter_result

def __enter__(self):
return self.enter_result

class EmptySemaphore:
def __exit__(self, *excinfo):
pass


class EmptySemaphore(nullcontext):
acquire = release = lambda self: None


Expand Down Expand Up @@ -238,13 +259,14 @@ class WorkerPool(object):
when the pool received a trigger_shutdown().
"""

def __init__(self, execmodel, hasprimary=False):
def __init__(self, execmodel, hasprimary=False, size=None):
""" by default allow unlimited number of spawns. """
self.execmodel = execmodel
self._running_lock = self.execmodel.Lock()
self._running = set()
self._shuttingdown = False
self._waitall_events = []
self._semaphore = self.execmodel.Semaphore(size)
if hasprimary:
if self.execmodel.backend != "thread":
raise ValueError("hasprimary=True requires thread model")
Expand Down Expand Up @@ -307,7 +329,7 @@ def spawn(self, func, *args, **kwargs):
of the given func(*args, **kwargs).
"""
reply = Reply((func, args, kwargs), self.execmodel)
with self._running_lock:
with self._semaphore, self._running_lock:
if self._shuttingdown:
raise ValueError("pool is shutting down")
self._running.add(reply)
Expand Down
1 change: 0 additions & 1 deletion testing/test_threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def test_waitfinish_on_reply(pool):
pytest.raises(ZeroDivisionError, reply.get)


@pytest.mark.xfail(reason="WorkerPool does not implement limited size")
def test_limited_size(execmodel):
pool = WorkerPool(execmodel, size=1)
q = execmodel.queue.Queue()
Expand Down

0 comments on commit 165b60e

Please sign in to comment.