Skip to content

Commit

Permalink
Adapt executor to zimscraperlib
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Nov 5, 2024
1 parent 711deaf commit ae8edb7
Show file tree
Hide file tree
Showing 2 changed files with 329 additions and 33 deletions.
76 changes: 43 additions & 33 deletions src/zimscraperlib/executor.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: ai ts=4 sts=4 et sw=4 nu

import datetime
import queue
import threading
from typing import Callable
from collections.abc import Callable

from .shared import logger
from zimscraperlib import logger

_shutdown = False
# Lock that ensures that new workers are not created while the interpreter is
# shutting down. Must be held while mutating _threads_queues and _shutdown.
_global_shutdown_lock = threading.Lock()
thread_deadline_sec = 60


def excepthook(args):
def excepthook(args): # pragma: no cover
logger.error(f"UNHANDLED Exception in {args.thread.name}: {args.exc_type}")
logger.exception(args.exc_value)


threading.excepthook = excepthook


class SotokiExecutor(queue.Queue):
class ScraperExecutor(queue.Queue):
"""Custom FIFO queue based Executor that's less generic than ThreadPoolExec one
Providing more flexibility for the use cases we're interested about:
Expand All @@ -34,12 +29,19 @@ class SotokiExecutor(queue.Queue):
See: https://github.com/python/cpython/blob/3.8/Lib/concurrent/futures/thread.py
"""

def __init__(self, queue_size: int = 10, nb_workers: int = 1, prefix: str = "T-"):
def __init__(
self,
queue_size: int = 10,
nb_workers: int = 1,
executor_name: str = "executor",
thread_deadline_sec: int = 60,
):
super().__init__(queue_size)
self.prefix = prefix
self.executor_name = executor_name
self._shutdown_lock = threading.Lock()
self.nb_workers = nb_workers
self.exceptions = []
self.thread_deadline_sec = thread_deadline_sec

@property
def exception(self):
Expand All @@ -59,30 +61,38 @@ def submit(self, task: Callable, **kwargs):
with self._shutdown_lock, _global_shutdown_lock:
if not self.alive:
raise RuntimeError("cannot submit task to dead executor")
if self.no_more:
raise RuntimeError(
"cannot submit task to a joined executor, restart it first"
)
if _shutdown:
raise RuntimeError("cannot submit task after " "interpreter shutdown")
raise RuntimeError( # pragma: no cover
"cannot submit task after interpreter shutdown"
)

while True:
try:
self.put((task, kwargs), block=True, timeout=3.0)
except queue.Full:
if self.no_more:
break
# rarely happens except if submit and join are done in different
# threads, but we need this to escape the while loop
break # pragma: no cover
else:
break

def start(self):
"""Enable executor, starting requested amount of workers
Workers are started always, not provisioned dynamicaly"""
Workers are started always, not provisioned dynamically"""
self.drain()
self.release_halt()
self._workers = set()
self._workers: set[threading.Thread] = set()
self.no_more = False
self._shutdown = False
self.exceptions[:] = []

for n in range(self.nb_workers):
t = threading.Thread(target=self.worker, name=f"{self.prefix}{n}")
t = threading.Thread(target=self.worker, name=f"{self.executor_name}-{n}")
t.daemon = True
t.start()
self._workers.add(t)
Expand All @@ -95,7 +105,7 @@ def worker(self):
if self.no_more:
break
continue
except TypeError:
except TypeError: # pragma: no cover
# received None from the queue. most likely shuting down
return

Expand All @@ -108,7 +118,7 @@ def worker(self):
except Exception as exc:
logger.error(f"Error processing {func} with {kwargs=}")
logger.exception(exc)
if raises:
if raises: # to cover when raises = False
self.exceptions.append(exc)
self.shutdown()
finally:
Expand All @@ -129,30 +139,30 @@ def drain(self):

def join(self):
"""Await completion of workers, requesting them to stop taking new task"""
logger.debug(f"joining all threads for {self.prefix}")
logger.debug(f"joining all threads for {self.executor_name}")
self.no_more = True
for num, t in enumerate(self._workers):
deadline = datetime.datetime.now() + datetime.timedelta(
seconds=thread_deadline_sec
deadline = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(
seconds=self.thread_deadline_sec
)
logger.debug(
f"Giving {self.executor_name}-{num} {self.thread_deadline_sec}s to join"
)
logger.debug(f"Giving {self.prefix}{num} {thread_deadline_sec}s to join")
e = threading.Event()
while t.is_alive() and datetime.datetime.now() < deadline:
while t.is_alive() and datetime.datetime.now(tz=datetime.UTC) < deadline:
t.join(1)
e.wait(timeout=2)
if t.is_alive():
logger.debug(f"Thread {self.prefix}{num} is not joining. Skipping…")
logger.debug(
f"Thread {self.executor_name}-{num} is not joining. Skipping…"
)
else:
logger.debug(f"Thread {self.prefix}{num} joined")
logger.debug(f"all threads joined for {self.prefix}")

def release_halt(self):
"""release the `no_more` flag preventing workers from taking up tasks"""
self.no_more = False
logger.debug(f"Thread {self.executor_name}-{num} joined")
logger.debug(f"all threads joined for {self.executor_name}")

def shutdown(self, wait=True):
def shutdown(self, *, wait=True):
"""stop the executor, either somewhat immediately or awaiting completion"""
logger.debug(f"shutting down executor {self.prefix} with {wait=}")
logger.debug(f"shutting down {self.executor_name} with {wait=}")
with self._shutdown_lock:
self._shutdown = True

Expand Down
Loading

0 comments on commit ae8edb7

Please sign in to comment.