Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a separate thread to watch for dead processes #81

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 41 additions & 27 deletions taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import signal
from dataclasses import dataclass
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
from typing import Any, Callable, List
from typing import Any, Callable, List, NoReturn

from watchdog.observers import Observer

Expand Down Expand Up @@ -172,7 +173,32 @@ def prepare_workers(self) -> None:
)
self.workers.append(work_proc)

def start(self) -> None: # noqa: C901, WPS213
def start_worker_watcher(self) -> None:
"""
Start worker watcher thread.

The worker watcher thread periodically checks
that the worker processes are alive and schedules
a reload for ones that failed.
"""

def watcher(
workers: List[Process],
action_queue: Queue[ProcessActionBase],
) -> NoReturn:
logging.info("Started the worker watcher thread.")
while True: # noqa: WPS457
sleep(1.0)
for worker_num, worker in enumerate(workers):
if not worker.is_alive():
logger.info(f"{worker.name} is dead. Scheduling reload.")
action_queue.put(ReloadOneAction(worker_num=worker_num))

self._watcher = Thread(target=watcher, args=(self.workers, self.action_queue))
self._watcher.daemon = True
self._watcher.start()

def start(self) -> None:
"""
Start managing child processes.

Expand All @@ -199,29 +225,17 @@ def start(self) -> None: # noqa: C901, WPS213
some reason, it schedules a restart for dead process.
"""
self.prepare_workers()
self.start_worker_watcher()
while True:
sleep(1)
reloaded_workers = set()
# We bulk_process all pending events.
while not self.action_queue.empty():
action = self.action_queue.get()
logging.debug(f"Got event: {action}")
if isinstance(action, ReloadAllAction):
action.handle(
workers_num=len(self.workers),
action_queue=self.action_queue,
)
elif isinstance(action, ReloadOneAction):
# If we just reloaded this worker, skip handling.
if action.worker_num in reloaded_workers:
continue
action.handle(self.workers, self.args, self.worker_function)
reloaded_workers.add(action.worker_num)
elif isinstance(action, ShutdownAction):
logger.debug("Process manager closed.")
return

for worker_num, worker in enumerate(self.workers):
if not worker.is_alive():
logger.info(f"{worker.name} is dead. Scheduling reload.")
self.action_queue.put(ReloadOneAction(worker_num=worker_num))
action = self.action_queue.get()
logging.debug(f"Got event: {action}")
if isinstance(action, ReloadAllAction):
action.handle(
workers_num=len(self.workers),
action_queue=self.action_queue,
)
elif isinstance(action, ReloadOneAction):
action.handle(self.workers, self.args, self.worker_function)
elif isinstance(action, ShutdownAction):
logger.debug("Process manager closed.")
return