Skip to content

Commit

Permalink
Rework join to wait once for all threads and not once per thread
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Nov 5, 2024
1 parent ae8edb7 commit fd5c04a
Showing 1 changed file with 24 additions and 18 deletions.
42 changes: 24 additions & 18 deletions src/zimscraperlib/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def start(self):
"""Enable executor, starting requested amount of workers
Workers are started always, not provisioned dynamically"""
logger.debug(f"Starting {self.executor_name} with {self.nb_workers} threads")

self.drain()
self._workers: set[threading.Thread] = set()
self.no_more = False
Expand Down Expand Up @@ -139,26 +141,30 @@ def drain(self):

def join(self):
"""Await completion of workers, requesting them to stop taking new task"""
logger.debug(f"joining all threads for {self.executor_name}")
logger.debug(
f"joining all threads for {self.executor_name}; threads have "
f"{self.thread_deadline_sec}s to join before we give-up waiting for them"
)
self.no_more = True
for num, t in enumerate(self._workers):
deadline = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(
seconds=self.thread_deadline_sec
)
logger.debug(
f"Giving {self.executor_name}-{num} {self.thread_deadline_sec}s to join"
)
deadline = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(
seconds=self.thread_deadline_sec
)
alive_threads = list(filter(lambda t: t.is_alive(), self._workers))
for t in filter(lambda t: t not in alive_threads, self._workers):
logger.debug(f"Thread {t.name} is already dead. Skipping…")
while (
len(alive_threads) > 0 and datetime.datetime.now(tz=datetime.UTC) < deadline
):
e = threading.Event()
while t.is_alive() and datetime.datetime.now(tz=datetime.UTC) < deadline:
t.join(1)
e.wait(timeout=2)
if t.is_alive():
logger.debug(
f"Thread {self.executor_name}-{num} is not joining. Skipping…"
)
else:
logger.debug(f"Thread {self.executor_name}-{num} joined")
logger.debug(f"all threads joined for {self.executor_name}")
for t in alive_threads:
t.join(0.1) # just indicate to the thread that we want to stop
e.wait(2) # wait a bit more to let things cool down
for t in filter(lambda t: not t.is_alive(), alive_threads):
logger.debug(f"Thread {t.name} joined")
alive_threads.remove(t)
for t in alive_threads:
logger.debug(f"Thread {t.name} never joined. Skipping…")
logger.debug(f"join completed for {self.executor_name}")

def shutdown(self, *, wait=True):
"""stop the executor, either somewhat immediately or awaiting completion"""
Expand Down

0 comments on commit fd5c04a

Please sign in to comment.