Skip to content

Commit

Permalink
add ability to set priority for workers
Browse files Browse the repository at this point in the history
  • Loading branch information
muraty committed Jun 6, 2024
1 parent 1356591 commit ee5569a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
5 changes: 5 additions & 0 deletions kuyruk/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def main() -> None:
type=int,
help='gracefully shutdown worker after this duration in seconds, '
'set to 0 for running forever')
parser_worker.add_argument(
'--priority',
type=int,
help='sets priority for the worker'
'Default is 0. Larger numbers indicate higher priority, and both positive and negative numbers can be used.')
parser_worker.add_argument(
'-l',
'--logging-level',
Expand Down
3 changes: 3 additions & 0 deletions kuyruk/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class attributes. Additional attributes may be added by extensions.
WORKER_RECONNECT_INTERVAL = 5
"""Number of seconds to wait after a connection error."""

WORKER_PRIORITY = None
"""Sets worker priority. Larger number means higher priority."""

def from_object(self, obj: Union[str, Any]) -> None:
"""Load values from an object."""
if isinstance(obj, str):
Expand Down
14 changes: 13 additions & 1 deletion kuyruk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def add_host(queue: str) -> str:
if self._max_load == -1:
self._max_load == multiprocessing.cpu_count()

self._priority = app.config.WORKER_PRIORITY
if args.priority is not None:
self._priority = args.priority

self._reconnect_interval = app.config.WORKER_RECONNECT_INTERVAL

self._threads: List[threading.Thread] = []
Expand Down Expand Up @@ -183,7 +187,15 @@ def _consume_queues(self, ch: amqp.Channel) -> None:
self.consuming = True
for queue in self.queues:
logger.debug("basic_consume: %s", queue)
ch.basic_consume(queue=queue, consumer_tag=self._consumer_tag(queue), callback=self._process_message)

arguments = {}
if self._priority:
arguments['x-priority'] = self._priority

ch.basic_consume(queue=queue,
consumer_tag=self._consumer_tag(queue),
callback=self._process_message,
arguments=arguments)

def _cancel_queues(self, ch: amqp.Channel) -> None:
self.consuming = False
Expand Down

0 comments on commit ee5569a

Please sign in to comment.