Skip to content

Commit

Permalink
Removed startup event. (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius authored Jan 30, 2024
1 parent 4fe5f06 commit d708468
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
9 changes: 4 additions & 5 deletions taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def handle(
self,
workers: List[Process],
args: WorkerArgs,
worker_func: Callable[[WorkerArgs, EventType], None],
worker_func: Callable[[WorkerArgs], None],
) -> None:
"""
This action reloads a single process.
Expand All @@ -79,7 +79,7 @@ def handle(
event: EventType = Event()
new_process = Process(
target=worker_func,
kwargs={"args": args, "event": event},
kwargs={"args": args},
name=f"worker-{self.worker_num}",
daemon=True,
)
Expand Down Expand Up @@ -152,9 +152,8 @@ class ProcessManager:
def __init__(
self,
args: WorkerArgs,
worker_function: Callable[[WorkerArgs, EventType], None],
worker_function: Callable[[WorkerArgs], None],
observer: Optional[Observer] = None, # type: ignore[valid-type]
max_restarts: Optional[int] = None,
) -> None:
self.worker_function = worker_function
self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)
Expand Down Expand Up @@ -183,7 +182,7 @@ def prepare_workers(self) -> None:
event = Event()
work_proc = Process(
target=self.worker_function,
kwargs={"args": self.args, "event": event},
kwargs={"args": self.args},
name=f"worker-{process}",
daemon=True,
)
Expand Down
8 changes: 2 additions & 6 deletions taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import signal
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import set_start_method
from multiprocessing.synchronize import Event
from sys import platform
from typing import Any, Optional, Type

Expand Down Expand Up @@ -68,7 +67,7 @@ def get_receiver_type(args: WorkerArgs) -> Type[Receiver]:
return receiver_type


def start_listen(args: WorkerArgs, event: Event) -> None:
def start_listen(args: WorkerArgs) -> None:
"""
This function starts actual listening process.
Expand Down Expand Up @@ -109,9 +108,6 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
signal.signal(signal.SIGINT, interrupt_handler)
signal.signal(signal.SIGTERM, interrupt_handler)

# Notify parent process, worker is ready
event.set()

if uvloop is not None:
logger.debug("UVLOOP found. Using it as async runner")
loop = uvloop.new_event_loop() # type: ignore
Expand Down Expand Up @@ -165,7 +161,7 @@ def run_worker(args: WorkerArgs) -> Optional[int]:
:returns: Optional status code.
"""
if platform == "darwin":
set_start_method("fork")
set_start_method("spawn")
if args.configure_logging:
logging.basicConfig(
level=logging.getLevelName(args.log_level),
Expand Down

0 comments on commit d708468

Please sign in to comment.