diff --git a/README.rst b/README.rst index 5b353a4..bdbb444 100644 --- a/README.rst +++ b/README.rst @@ -102,6 +102,31 @@ Sit back and watch the worker process it in the background: [INFO] Job 1b92xle0 returned: (1, 2, 3) +Enqueue a job to be processed in-order with other jobs with a particular key: + +.. code-block:: python + + # Import the blocking function + from my_module import my_func + + # Initialize a queue + from kq import Queue + q = Queue() + + # Enqueue the function call as Job + import uuid, time + from kq import Job + job = Job( + str(uuid.uuid4()), + timestamp=int(time.time()), + func=my_func, + args=(1, 2), + kwargs={'baz': 3}, + key="task_category_1" + ) + q.enqueue(job) + + Check out the full documentation_ for more details! .. _documentation: http://kq.readthedocs.io/en/master/ diff --git a/kq/job.py b/kq/job.py index 85ad55e..97ab0e1 100644 --- a/kq/job.py +++ b/kq/job.py @@ -13,6 +13,9 @@ 'func', # Job function/callable 'args', # Job function arguments 'kwargs', # Job function keyword arguments - 'timeout' # Job timeout threshold in seconds + 'timeout', # Job timeout threshold in seconds + 'key' # Jobs of the same key end up in same partition ] ) +# Make 'key' None by defauly to support older Jobs +Job.__new__.__defaults__ = (None,) diff --git a/kq/queue.py b/kq/queue.py index 4317bc5..3e919c5 100644 --- a/kq/queue.py +++ b/kq/queue.py @@ -198,13 +198,19 @@ def enqueue(self, obj, *args, **kwargs): :param kwargs: Keyword arguments for the function. Ignored if a KQ job instance is given as the first argument instead. :type kwargs: dict + :param key: Queue the job with a key. Jobs queued with a specific key + are processed in order they were queued. Setting it to None (default) + disables this behaviour. + :type key: str | unicode :return: The job that was enqueued :rtype: kq.job.Job """ + key = None if isinstance(obj, Job): func = obj.func args = obj.args kwargs = obj.kwargs + key = obj.key else: func = obj @@ -219,9 +225,10 @@ def enqueue(self, obj, *args, **kwargs): func=func, args=args, kwargs=kwargs, - timeout=self._timeout + timeout=self._timeout, + key=key ) - self._producer.send(self._topic, dill.dumps(job)) + self._producer.send(self._topic, dill.dumps(job), key=key) self._logger.info('Enqueued: {}'.format(job)) return job diff --git a/tests/test_queue.py b/tests/test_queue.py index 304e3e6..bca5714 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -83,7 +83,7 @@ def test_enqueue_call(producer, logger): assert job.kwargs == {'c': [3, 4, 5]} assert job.timeout == 300 - producer_inst.send.assert_called_with('foo', dill.dumps(job)) + producer_inst.send.assert_called_with('foo', dill.dumps(job), key=None) logger.info.assert_called_once_with('Enqueued: {}'.format(job)) @@ -128,7 +128,7 @@ def test_enqueue_job(producer, logger): assert new_job.kwargs == {'a': 3} assert new_job.timeout == 300 - producer_inst.send.assert_called_with('foo', dill.dumps(new_job)) + producer_inst.send.assert_called_with('foo', dill.dumps(new_job), key=None) logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))