Skip to content

Commit

Permalink
Merge branch 'release/0.2.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Mar 21, 2023
2 parents 6ef1a22 + c09ab94 commit 0f4d746
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 154 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ docstring_style=sphinx
ignore =
; Found a line that starts with a dot
WPS348,
; Found overly complex type annotation
WPS234,
; `noqa` comments overuse ))))
WPS402,
; Found `%` string formatting
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskiq"
version = "0.2.0"
version = "0.2.1"
description = "Distributed task queue with full async support"
authors = ["Pavel Kirilin <[email protected]>"]
maintainers = ["Pavel Kirilin <[email protected]>"]
Expand Down
18 changes: 11 additions & 7 deletions taskiq/cli/watcher.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
from logging import getLogger
from pathlib import Path
from typing import Callable
from typing import Any, Callable

from gitignore_parser import parse_gitignore
from watchdog.events import FileSystemEvent

logger = getLogger("taskiq.worker")


class FileWatcher: # pragma: no cover
"""Filewatcher that watchs for filesystem changes."""

def __init__(
self,
callback: Callable[[], None],
callback: Callable[..., None],
use_gitignore: bool = True,
**callback_kwargs: Any,
) -> None:
self.callback = callback
self.gitignore = None
gpath = Path("./.gitignore")
if use_gitignore and gpath.exists():
self.gitignore = parse_gitignore(gpath)
self.callback_kwargs = callback_kwargs

def dispatch(self, event: FileSystemEvent) -> None:
"""
Expand All @@ -30,12 +35,11 @@ def dispatch(self, event: FileSystemEvent) -> None:
"""
if event.is_directory:
return
if event.event_type == "closed":
return
if ".pytest_cache" in event.src_path:
if event.event_type in {"opened", "closed"}:
return
if "__pycache__" in event.src_path:
if ".git" in event.src_path:
return
if self.gitignore and self.gitignore(event.src_path):
return
self.callback()
logger.debug(f"File changed. Event: {event}")
self.callback(**self.callback_kwargs)
4 changes: 2 additions & 2 deletions taskiq/cli/worker/async_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ async def async_listen_messages(
:param broker: broker to listen to.
:param cli_args: CLI arguments for worker.
"""
logger.info("Runing startup event.")
logger.debug("Runing startup event.")
await broker.startup()
logger.info("Inicialized receiver.")
logger.debug("Initialized receiver.")
receiver = Receiver(broker, cli_args)
logger.info("Listening started.")
tasks = set()
Expand Down
226 changes: 226 additions & 0 deletions taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import logging
import signal
from dataclasses import dataclass
from multiprocessing import Process, Queue
from time import sleep
from typing import Any, Callable, List

from watchdog.observers import Observer

from taskiq.cli.watcher import FileWatcher
from taskiq.cli.worker.args import WorkerArgs

logger = logging.getLogger("taskiq.process-manager")


class ProcessActionBase:
"""Base for all process actions. Used for types."""


@dataclass
class ReloadAllAction(ProcessActionBase):
"""This action triggers reload of all workers."""

def handle(
self,
workers_num: int,
action_queue: "Queue[ProcessActionBase]",
) -> None:
"""
Handle reload all action.
This action sends N reloadOne actions in a queue,
where N is a number of worker processes.
:param workers_num: number of currently active workers.
:param action_queue: queue to send events to.
"""
for worker_id in range(workers_num):
action_queue.put(ReloadOneAction(worker_num=worker_id))


@dataclass
class ReloadOneAction(ProcessActionBase):
"""This action reloads single worker with particular id."""

worker_num: int

def handle(
self,
workers: List[Process],
args: WorkerArgs,
worker_func: Callable[[WorkerArgs], None],
) -> None:
"""
This action reloads a single process.
:param workers: known children processes.
:param args: args for new process.
:param worker_func: function that is used to start worker processes.
"""
if self.worker_num < 0 or self.worker_num >= len(workers):
logger.warning("Unknown worker id.")
return
worker = workers[self.worker_num]
try:
worker.terminate()
except ValueError:
logger.debug(f"Process {worker.name} is already terminated.")
# Waiting worker shutdown.
worker.join()
new_process = Process(
target=worker_func,
kwargs={"args": args},
name=f"worker-{self.worker_num}",
daemon=True,
)
new_process.start()
workers[self.worker_num] = new_process


@dataclass
class ShutdownAction(ProcessActionBase):
"""This action shuts down process manager loop."""


def schedule_workers_reload(
action_queue: "Queue[ProcessActionBase]",
) -> None:
"""
Function to schedule workers to restart.
It simply send FULL_RELOAD event, which is handled
in the mainloop.
:param action_queue: queue to send events to.
"""
action_queue.put(ReloadAllAction())
logger.info("Scheduled workers reload.")


def get_signal_handler(
action_queue: "Queue[ProcessActionBase]",
) -> Callable[[int, Any], None]:
"""
Generate singnal handler for main process.
The signal handler will just put the SHUTDOWN event in
the action queue.
:param action_queue: event queue.
:returns: actual signal handler.
"""

def _signal_handler(signum: int, _frame: Any) -> None:
logger.debug(f"Got signal {signum}.")
action_queue.put(ShutdownAction())
logger.warn("Workers are scheduled for shutdown.")

return _signal_handler


class ProcessManager:
"""
Process manager for taskiq.
This class spawns multiple processes,
and maintains their states. If process
is down, it tries to restart it.
"""

def __init__(
self,
args: WorkerArgs,
observer: Observer,
worker_function: Callable[[WorkerArgs], None],
) -> None:
self.worker_function = worker_function
self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)
self.args = args
if args.reload:
observer.schedule(
FileWatcher(
callback=schedule_workers_reload,
use_gitignore=not args.no_gitignore,
action_queue=self.action_queue,
),
path=".",
recursive=True,
)

signal_handler = get_signal_handler(self.action_queue)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

self.workers: List[Process] = []

def prepare_workers(self) -> None:
"""Spawn multiple processes."""
for process in range(self.args.workers):
work_proc = Process(
target=self.worker_function,
kwargs={"args": self.args},
name=f"worker-{process}",
daemon=True,
)
logger.info(
"Started process worker-%d with pid %s ",
process,
work_proc.pid,
)
work_proc.start()
self.workers.append(work_proc)

def start(self) -> None: # noqa: C901, WPS213
"""
Start managing child processes.
This function is an endless loop,
which listens to new events from different sources.
Every second it checks for new events and
current states of child processes.
If there are new events it handles them.
Manager can handle 3 types of events:
1. `ReloadAllAction` - when we want to restart all child processes.
It checks for running processes and generates RELOAD_ONE event for
any process.
2. `ReloadOneAction` - this event restarts one single child process.
3. `ShutdownAction` - exits the loop. Since all child processes are
daemons, they will be automatically terminated using signals.
After all events are handled, it iterates over all child processes and
checks that all processes are healthy. If process was terminated for
some reason, it schedules a restart for dead process.
"""
self.prepare_workers()
while True:
sleep(1)
reloaded_workers = set()
# We bulk_process all pending events.
while not self.action_queue.empty():
action = self.action_queue.get()
logging.debug(f"Got event: {action}")
if isinstance(action, ReloadAllAction):
action.handle(
workers_num=len(self.workers),
action_queue=self.action_queue,
)
elif isinstance(action, ReloadOneAction):
# If we just reloaded this worker, skip handling.
if action.worker_num in reloaded_workers:
continue
action.handle(self.workers, self.args, self.worker_function)
reloaded_workers.add(action.worker_num)
elif isinstance(action, ShutdownAction):
logger.debug("Process manager closed.")
return

for worker_num, worker in enumerate(self.workers):
if not worker.is_alive():
logger.info(f"{worker.name} is dead. Scheduling reload.")
self.action_queue.put(ReloadOneAction(worker_num=worker_num))
Loading

0 comments on commit 0f4d746

Please sign in to comment.