From 4c34980940bc6271a43db2ce5e49f2b2b88297ef Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 21 Mar 2023 13:50:16 +0400 Subject: [PATCH 1/2] =?UTF-8?q?Refa=D1=81tored=20process=20manager.=20(#64?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .flake8 | 2 + taskiq/cli/watcher.py | 18 +- taskiq/cli/worker/async_task_runner.py | 4 +- taskiq/cli/worker/process_manager.py | 226 +++++++++++++++++++++++++ taskiq/cli/worker/run.py | 173 ++++--------------- 5 files changed, 270 insertions(+), 153 deletions(-) create mode 100644 taskiq/cli/worker/process_manager.py diff --git a/.flake8 b/.flake8 index 1590469..f06163b 100644 --- a/.flake8 +++ b/.flake8 @@ -8,6 +8,8 @@ docstring_style=sphinx ignore = ; Found a line that starts with a dot WPS348, + ; Found overly complex type annotation + WPS234, ; `noqa` comments overuse )))) WPS402, ; Found `%` string formatting diff --git a/taskiq/cli/watcher.py b/taskiq/cli/watcher.py index 30bafe3..3b94b13 100644 --- a/taskiq/cli/watcher.py +++ b/taskiq/cli/watcher.py @@ -1,23 +1,28 @@ +from logging import getLogger from pathlib import Path -from typing import Callable +from typing import Any, Callable from gitignore_parser import parse_gitignore from watchdog.events import FileSystemEvent +logger = getLogger("taskiq.worker") + class FileWatcher: # pragma: no cover """Filewatcher that watchs for filesystem changes.""" def __init__( self, - callback: Callable[[], None], + callback: Callable[..., None], use_gitignore: bool = True, + **callback_kwargs: Any, ) -> None: self.callback = callback self.gitignore = None gpath = Path("./.gitignore") if use_gitignore and gpath.exists(): self.gitignore = parse_gitignore(gpath) + self.callback_kwargs = callback_kwargs def dispatch(self, event: FileSystemEvent) -> None: """ @@ -30,12 +35,11 @@ def dispatch(self, event: FileSystemEvent) -> None: """ if event.is_directory: return - if event.event_type == "closed": - return - if ".pytest_cache" in event.src_path: + if event.event_type in {"opened", "closed"}: return - if "__pycache__" in event.src_path: + if ".git" in event.src_path: return if self.gitignore and self.gitignore(event.src_path): return - self.callback() + logger.debug(f"File changed. Event: {event}") + self.callback(**self.callback_kwargs) diff --git a/taskiq/cli/worker/async_task_runner.py b/taskiq/cli/worker/async_task_runner.py index 80246e1..d30cf55 100644 --- a/taskiq/cli/worker/async_task_runner.py +++ b/taskiq/cli/worker/async_task_runner.py @@ -21,9 +21,9 @@ async def async_listen_messages( :param broker: broker to listen to. :param cli_args: CLI arguments for worker. """ - logger.info("Runing startup event.") + logger.debug("Runing startup event.") await broker.startup() - logger.info("Inicialized receiver.") + logger.debug("Initialized receiver.") receiver = Receiver(broker, cli_args) logger.info("Listening started.") tasks = set() diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py new file mode 100644 index 0000000..ce612d1 --- /dev/null +++ b/taskiq/cli/worker/process_manager.py @@ -0,0 +1,226 @@ +import logging +import signal +from dataclasses import dataclass +from multiprocessing import Process, Queue +from time import sleep +from typing import Any, Callable, List + +from watchdog.observers import Observer + +from taskiq.cli.watcher import FileWatcher +from taskiq.cli.worker.args import WorkerArgs + +logger = logging.getLogger("taskiq.process-manager") + + +class ProcessActionBase: + """Base for all process actions. Used for types.""" + + +@dataclass +class ReloadAllAction(ProcessActionBase): + """This action triggers reload of all workers.""" + + def handle( + self, + workers_num: int, + action_queue: "Queue[ProcessActionBase]", + ) -> None: + """ + Handle reload all action. + + This action sends N reloadOne actions in a queue, + where N is a number of worker processes. + + :param workers_num: number of currently active workers. + :param action_queue: queue to send events to. + """ + for worker_id in range(workers_num): + action_queue.put(ReloadOneAction(worker_num=worker_id)) + + +@dataclass +class ReloadOneAction(ProcessActionBase): + """This action reloads single worker with particular id.""" + + worker_num: int + + def handle( + self, + workers: List[Process], + args: WorkerArgs, + worker_func: Callable[[WorkerArgs], None], + ) -> None: + """ + This action reloads a single process. + + :param workers: known children processes. + :param args: args for new process. + :param worker_func: function that is used to start worker processes. + """ + if self.worker_num < 0 or self.worker_num >= len(workers): + logger.warning("Unknown worker id.") + return + worker = workers[self.worker_num] + try: + worker.terminate() + except ValueError: + logger.debug(f"Process {worker.name} is already terminated.") + # Waiting worker shutdown. + worker.join() + new_process = Process( + target=worker_func, + kwargs={"args": args}, + name=f"worker-{self.worker_num}", + daemon=True, + ) + new_process.start() + workers[self.worker_num] = new_process + + +@dataclass +class ShutdownAction(ProcessActionBase): + """This action shuts down process manager loop.""" + + +def schedule_workers_reload( + action_queue: "Queue[ProcessActionBase]", +) -> None: + """ + Function to schedule workers to restart. + + It simply send FULL_RELOAD event, which is handled + in the mainloop. + + :param action_queue: queue to send events to. + """ + action_queue.put(ReloadAllAction()) + logger.info("Scheduled workers reload.") + + +def get_signal_handler( + action_queue: "Queue[ProcessActionBase]", +) -> Callable[[int, Any], None]: + """ + Generate singnal handler for main process. + + The signal handler will just put the SHUTDOWN event in + the action queue. + + :param action_queue: event queue. + :returns: actual signal handler. + """ + + def _signal_handler(signum: int, _frame: Any) -> None: + logger.debug(f"Got signal {signum}.") + action_queue.put(ShutdownAction()) + logger.warn("Workers are scheduled for shutdown.") + + return _signal_handler + + +class ProcessManager: + """ + Process manager for taskiq. + + This class spawns multiple processes, + and maintains their states. If process + is down, it tries to restart it. + """ + + def __init__( + self, + args: WorkerArgs, + observer: Observer, + worker_function: Callable[[WorkerArgs], None], + ) -> None: + self.worker_function = worker_function + self.action_queue: "Queue[ProcessActionBase]" = Queue(-1) + self.args = args + if args.reload: + observer.schedule( + FileWatcher( + callback=schedule_workers_reload, + use_gitignore=not args.no_gitignore, + action_queue=self.action_queue, + ), + path=".", + recursive=True, + ) + + signal_handler = get_signal_handler(self.action_queue) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + self.workers: List[Process] = [] + + def prepare_workers(self) -> None: + """Spawn multiple processes.""" + for process in range(self.args.workers): + work_proc = Process( + target=self.worker_function, + kwargs={"args": self.args}, + name=f"worker-{process}", + daemon=True, + ) + logger.info( + "Started process worker-%d with pid %s ", + process, + work_proc.pid, + ) + work_proc.start() + self.workers.append(work_proc) + + def start(self) -> None: # noqa: C901, WPS213 + """ + Start managing child processes. + + This function is an endless loop, + which listens to new events from different sources. + + Every second it checks for new events and + current states of child processes. + + If there are new events it handles them. + Manager can handle 3 types of events: + + 1. `ReloadAllAction` - when we want to restart all child processes. + It checks for running processes and generates RELOAD_ONE event for + any process. + + 2. `ReloadOneAction` - this event restarts one single child process. + + 3. `ShutdownAction` - exits the loop. Since all child processes are + daemons, they will be automatically terminated using signals. + + After all events are handled, it iterates over all child processes and + checks that all processes are healthy. If process was terminated for + some reason, it schedules a restart for dead process. + """ + self.prepare_workers() + while True: + sleep(1) + reloaded_workers = set() + # We bulk_process all pending events. + while not self.action_queue.empty(): + action = self.action_queue.get() + logging.debug(f"Got event: {action}") + if isinstance(action, ReloadAllAction): + action.handle( + workers_num=len(self.workers), + action_queue=self.action_queue, + ) + elif isinstance(action, ReloadOneAction): + # If we just reloaded this worker, skip handling. + if action.worker_num in reloaded_workers: + continue + action.handle(self.workers, self.args, self.worker_function) + reloaded_workers.add(action.worker_num) + elif isinstance(action, ShutdownAction): + logger.debug("Process manager closed.") + return + + for worker_num, worker in enumerate(self.workers): + if not worker.is_alive(): + logger.info(f"{worker.name} is dead. Scheduling reload.") + self.action_queue.put(ReloadOneAction(worker_num=worker_num)) diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index 3eeb4d2..80e7d52 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -1,20 +1,18 @@ import asyncio -import os +import logging import signal import sys -from logging import StreamHandler, basicConfig, getLevelName, getLogger from logging.handlers import QueueHandler, QueueListener -from multiprocessing import Process, Queue -from time import sleep -from typing import Any, List +from multiprocessing import Queue +from typing import Any from watchdog.observers import Observer from taskiq.abc.broker import AsyncBroker from taskiq.cli.utils import import_object, import_tasks -from taskiq.cli.watcher import FileWatcher from taskiq.cli.worker.args import WorkerArgs from taskiq.cli.worker.async_task_runner import async_listen_messages +from taskiq.cli.worker.process_manager import ProcessManager try: import uvloop # noqa: WPS433 @@ -22,58 +20,7 @@ uvloop = None # type: ignore -logger = getLogger("taskiq.worker") - - -restart_workers = True -worker_processes: List[Process] = [] -observer = Observer() -reload_queue: "Queue[bool]" = Queue(-1) - - -def signal_handler(_signal: int, _frame: Any) -> None: - """ - This handler is used only by main process. - - If the OS sent you SIGINT or SIGTERM, - we should kill all spawned processes. - - :param _signal: incoming signal. - :param _frame: current execution frame. - """ - global restart_workers # noqa: WPS420 - global worker_processes # noqa: WPS420 - - restart_workers = False # noqa: WPS442 - for process in worker_processes: - # This is how we kill children, - # by sending SIGINT to child processes. - if process.pid is None: - continue - try: - os.kill(process.pid, signal.SIGINT) - except ProcessLookupError: - continue - process.join() - if observer.is_alive(): - observer.stop() - observer.join() - - -def schedule_workers_reload() -> None: - """ - Function to schedule workers to restart. - - This function adds worker ids to the queue. - - This queue is later read in watcher loop. - """ - global worker_processes # noqa: WPS420 - global reload_queue # noqa: WPS420 - - reload_queue.put(True) - logger.info("Scheduled workers reload.") - reload_queue.join() +logger = logging.getLogger("taskiq.worker") async def shutdown_broker(broker: AsyncBroker, timeout: float) -> None: @@ -103,7 +50,7 @@ async def shutdown_broker(broker: AsyncBroker, timeout: float) -> None: ) -def start_listen(args: WorkerArgs) -> None: # noqa: C901 +def start_listen(args: WorkerArgs) -> None: # noqa: C901, WPS213 """ This function starts actual listening process. @@ -135,17 +82,18 @@ def start_listen(args: WorkerArgs) -> None: # noqa: C901 # times. And it may interrupt the broker's shutdown process. shutting_down = False - def interrupt_handler(_signum: int, _frame: Any) -> None: + def interrupt_handler(signum: int, _frame: Any) -> None: """ Signal handler. This handler checks if process is already terminating and if it's true, it does nothing. - :param _signum: received signal number. + :param signum: received signal number. :param _frame: current execution frame. :raises KeyboardInterrupt: if termiation hasn't begun. """ + logger.debug(f"Got signal {signum}.") nonlocal shutting_down # noqa: WPS420 if shutting_down: return @@ -153,6 +101,7 @@ def interrupt_handler(_signum: int, _frame: Any) -> None: raise KeyboardInterrupt signal.signal(signal.SIGINT, interrupt_handler) + signal.signal(signal.SIGTERM, interrupt_handler) loop = asyncio.get_event_loop() try: @@ -162,59 +111,6 @@ def interrupt_handler(_signum: int, _frame: Any) -> None: loop.run_until_complete(shutdown_broker(broker, args.shutdown_timeout)) -def watcher_loop(args: WorkerArgs) -> None: # noqa: C901, WPS213 - """ - Infinate loop for main process. - - This loop restarts worker processes - if they exit with error returncodes. - - Also it reads process ids from reload_queue - and reloads workers if they were scheduled to reload. - - :param args: cli arguements. - """ - global worker_processes # noqa: WPS420 - global restart_workers # noqa: WPS420 - - while worker_processes and restart_workers: - # List of processes to remove. - sleep(1) - process_to_remove = [] - if not reload_queue.empty(): - while not reload_queue.empty(): - reload_queue.get() - reload_queue.task_done() - - for worker_id, worker in enumerate(worker_processes): - worker.terminate() - worker.join() - worker_processes[worker_id] = Process( - target=start_listen, - kwargs={"args": args}, - name=f"worker-{worker_id}", - ) - worker_processes[worker_id].start() - - for worker_id, worker in enumerate(worker_processes): - if worker.is_alive(): - continue - if worker.exitcode is not None and worker.exitcode > 0 and restart_workers: - logger.info("Trying to restart the worker-%s", worker_id) - worker_processes[worker_id] = Process( - target=start_listen, - kwargs={"args": args}, - name=f"worker-{worker_id}", - ) - worker_processes[worker_id].start() - else: - logger.info("Worker-%s terminated.", worker_id) - process_to_remove.append(worker) - - for dead_process in process_to_remove: - worker_processes.remove(dead_process) - - def run_worker(args: WorkerArgs) -> None: # noqa: WPS213 """ This function starts worker processes. @@ -225,43 +121,32 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213 :param args: CLI arguments. """ logging_queue = Queue(-1) # type: ignore - listener = QueueListener(logging_queue, StreamHandler(sys.stdout)) - basicConfig( - level=getLevelName(args.log_level), - format="[%(asctime)s][%(levelname)-7s][%(processName)s] %(message)s", + listener = QueueListener(logging_queue, logging.StreamHandler(sys.stdout)) + logging.basicConfig( + level=logging.getLevelName(args.log_level), + format="[%(asctime)s][%(name)s][%(levelname)-7s][%(processName)s] %(message)s", handlers=[QueueHandler(logging_queue)], ) + logging.getLogger("watchdog.observers.inotify_buffer").setLevel(level=logging.INFO) listener.start() logger.info("Starting %s worker processes.", args.workers) - global worker_processes # noqa: WPS420 - - for process in range(args.workers): - work_proc = Process( - target=start_listen, - kwargs={"args": args}, - name=f"worker-{process}", - ) - work_proc.start() - logger.debug( - "Started process worker-%d with pid %s ", - process, - work_proc.pid, - ) - worker_processes.append(work_proc) + observer = Observer() if args.reload: - observer.schedule( - FileWatcher( - callback=schedule_workers_reload, - use_gitignore=not args.no_gitignore, - ), - path=".", - recursive=True, - ) observer.start() - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + args.workers = 1 + logging.warning( + "Reload on chage enabled. Number of worker processes set to 1.", + ) - watcher_loop(args=args) + manager = ProcessManager(args=args, observer=observer, worker_function=start_listen) + + manager.start() + + if observer.is_alive(): + if args.reload: + logger.info("Stopping watching files.") + observer.stop() + logger.info("Stopping logging thread.") listener.stop() From c09ab949e6b3cae4d618a400d35778fc5bb6e21c Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 21 Mar 2023 13:55:39 +0400 Subject: [PATCH 2/2] Version bumped to 0.2.1. Signed-off-by: Pavel Kirilin --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 33ce55c..bb4332d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq" -version = "0.2.0" +version = "0.2.1" description = "Distributed task queue with full async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "]