Skip to content

Commit

Permalink
Add support for specifying message keys for jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Shashank Mehra authored and joowani committed Dec 13, 2016
1 parent 5f89dd3 commit 3275434
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
25 changes: 25 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,31 @@ Sit back and watch the worker process it in the background:
[INFO] Job 1b92xle0 returned: (1, 2, 3)
Enqueue a job to be processed in-order with other jobs with a particular key:

.. code-block:: python
# Import the blocking function
from my_module import my_func
# Initialize a queue
from kq import Queue
q = Queue()
# Enqueue the function call as Job
import uuid, time
from kq import Job
job = Job(
str(uuid.uuid4()),
timestamp=int(time.time()),
func=my_func,
args=(1, 2),
kwargs={'baz': 3},
key="task_category_1"
)
q.enqueue(job)
Check out the full documentation_ for more details!

.. _documentation: http://kq.readthedocs.io/en/master/
Expand Down
5 changes: 4 additions & 1 deletion kq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
'func', # Job function/callable
'args', # Job function arguments
'kwargs', # Job function keyword arguments
'timeout' # Job timeout threshold in seconds
'timeout', # Job timeout threshold in seconds
'key' # Jobs of the same key end up in same partition
]
)
# Make 'key' None by defauly to support older Jobs
Job.__new__.__defaults__ = (None,)
11 changes: 9 additions & 2 deletions kq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,19 @@ def enqueue(self, obj, *args, **kwargs):
:param kwargs: Keyword arguments for the function. Ignored if a KQ
job instance is given as the first argument instead.
:type kwargs: dict
:param key: Queue the job with a key. Jobs queued with a specific key
are processed in order they were queued. Setting it to None (default)
disables this behaviour.
:type key: str | unicode
:return: The job that was enqueued
:rtype: kq.job.Job
"""
key = None
if isinstance(obj, Job):
func = obj.func
args = obj.args
kwargs = obj.kwargs
key = obj.key
else:
func = obj

Expand All @@ -219,9 +225,10 @@ def enqueue(self, obj, *args, **kwargs):
func=func,
args=args,
kwargs=kwargs,
timeout=self._timeout
timeout=self._timeout,
key=key
)
self._producer.send(self._topic, dill.dumps(job))
self._producer.send(self._topic, dill.dumps(job), key=key)
self._logger.info('Enqueued: {}'.format(job))
return job

Expand Down
4 changes: 2 additions & 2 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_enqueue_call(producer, logger):
assert job.kwargs == {'c': [3, 4, 5]}
assert job.timeout == 300

producer_inst.send.assert_called_with('foo', dill.dumps(job))
producer_inst.send.assert_called_with('foo', dill.dumps(job), key=None)
logger.info.assert_called_once_with('Enqueued: {}'.format(job))


Expand Down Expand Up @@ -128,7 +128,7 @@ def test_enqueue_job(producer, logger):
assert new_job.kwargs == {'a': 3}
assert new_job.timeout == 300

producer_inst.send.assert_called_with('foo', dill.dumps(new_job))
producer_inst.send.assert_called_with('foo', dill.dumps(new_job), key=None)
logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))


Expand Down

0 comments on commit 3275434

Please sign in to comment.