diff --git a/src/zimscraperlib/executor.py b/src/zimscraperlib/executor.py index a043f0a..9beb519 100644 --- a/src/zimscraperlib/executor.py +++ b/src/zimscraperlib/executor.py @@ -85,6 +85,8 @@ def start(self): """Enable executor, starting requested amount of workers Workers are started always, not provisioned dynamically""" + logger.debug(f"Starting {self.executor_name} with {self.nb_workers} threads") + self.drain() self._workers: set[threading.Thread] = set() self.no_more = False @@ -139,26 +141,30 @@ def drain(self): def join(self): """Await completion of workers, requesting them to stop taking new task""" - logger.debug(f"joining all threads for {self.executor_name}") + logger.debug( + f"joining all threads for {self.executor_name}; threads have " + f"{self.thread_deadline_sec}s to join before we give-up waiting for them" + ) self.no_more = True - for num, t in enumerate(self._workers): - deadline = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta( - seconds=self.thread_deadline_sec - ) - logger.debug( - f"Giving {self.executor_name}-{num} {self.thread_deadline_sec}s to join" - ) + deadline = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta( + seconds=self.thread_deadline_sec + ) + alive_threads = list(filter(lambda t: t.is_alive(), self._workers)) + for t in filter(lambda t: t not in alive_threads, self._workers): + logger.debug(f"Thread {t.name} is already dead. Skipping…") + while ( + len(alive_threads) > 0 and datetime.datetime.now(tz=datetime.UTC) < deadline + ): e = threading.Event() - while t.is_alive() and datetime.datetime.now(tz=datetime.UTC) < deadline: - t.join(1) - e.wait(timeout=2) - if t.is_alive(): - logger.debug( - f"Thread {self.executor_name}-{num} is not joining. Skipping…" - ) - else: - logger.debug(f"Thread {self.executor_name}-{num} joined") - logger.debug(f"all threads joined for {self.executor_name}") + for t in alive_threads: + t.join(0.1) # just indicate to the thread that we want to stop + e.wait(2) # wait a bit more to let things cool down + for t in filter(lambda t: not t.is_alive(), alive_threads): + logger.debug(f"Thread {t.name} joined") + alive_threads.remove(t) + for t in alive_threads: + logger.debug(f"Thread {t.name} never joined. Skipping…") + logger.debug(f"join completed for {self.executor_name}") def shutdown(self, *, wait=True): """stop the executor, either somewhat immediately or awaiting completion"""