Skip to content

Commit

Permalink
Merge branch 'release/0.11.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Jun 11, 2024
2 parents d350042 + 945748b commit 3ffecae
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
9 changes: 9 additions & 0 deletions docs/guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ To enable this option simply pass the `--reload` or `-r` option to worker taskiq
Also this option supports `.gitignore` files. If you have such file in your directory, it won't reload worker
when you modify ignored files. To disable this functionality pass `--do-not-use-gitignore` option.

### Graceful reload

To perform graceful reload, send `SIGHUP` signal to the main worker process. This action will reload all workers with new code. It's useful for deployment that requires zero downtime, but don't use orchestration tools like Kubernetes.

```bash
taskiq worker my_module:broker
kill -HUP <main pid>
```

### Other parameters

* `--no-configure-logging` - disables default logging configuration for workers.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskiq"
version = "0.11.3"
version = "0.11.4"
description = "Distributed task queue with full async support"
authors = ["Pavel Kirilin <[email protected]>"]
maintainers = ["Pavel Kirilin <[email protected]>"]
Expand Down
14 changes: 10 additions & 4 deletions taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def schedule_workers_reload(

def get_signal_handler(
action_queue: "Queue[ProcessActionBase]",
action_to_send: ProcessActionBase,
) -> Callable[[int, Any], None]:
"""
Generate signal handler for main process.
Expand All @@ -126,6 +127,7 @@ def get_signal_handler(
the action queue.
:param action_queue: event queue.
:param action_to_send: action that will be sent to the queue on signal.
:returns: actual signal handler.
"""

Expand All @@ -134,7 +136,7 @@ def _signal_handler(signum: int, _frame: Any) -> None:
raise KeyboardInterrupt

logger.debug(f"Got signal {signum}.")
action_queue.put(ShutdownAction())
action_queue.put(action_to_send)
logger.warning("Workers are scheduled for shutdown.")

return _signal_handler
Expand Down Expand Up @@ -169,9 +171,13 @@ def __init__(
recursive=True,
)

signal_handler = get_signal_handler(self.action_queue)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
shutdown_handler = get_signal_handler(self.action_queue, ShutdownAction())
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(
signal.SIGHUP,
get_signal_handler(self.action_queue, ReloadAllAction()),
)

self.workers: List[Process] = []

Expand Down
2 changes: 2 additions & 0 deletions taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import signal
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import set_start_method
Expand Down Expand Up @@ -172,6 +173,7 @@ def run_worker(args: WorkerArgs) -> Optional[int]:
)
logging.getLogger("taskiq").setLevel(level=logging.getLevelName(args.log_level))
logging.getLogger("watchdog.observers.inotify_buffer").setLevel(level=logging.INFO)
logger.info("Pid of a main process: %s", str(os.getpid()))
logger.info("Starting %s worker processes.", args.workers)

observer = None
Expand Down

0 comments on commit 3ffecae

Please sign in to comment.