diff --git a/README.rst b/README.rst index bdbb444..5b353a4 100644 --- a/README.rst +++ b/README.rst @@ -102,31 +102,6 @@ Sit back and watch the worker process it in the background: [INFO] Job 1b92xle0 returned: (1, 2, 3) -Enqueue a job to be processed in-order with other jobs with a particular key: - -.. code-block:: python - - # Import the blocking function - from my_module import my_func - - # Initialize a queue - from kq import Queue - q = Queue() - - # Enqueue the function call as Job - import uuid, time - from kq import Job - job = Job( - str(uuid.uuid4()), - timestamp=int(time.time()), - func=my_func, - args=(1, 2), - kwargs={'baz': 3}, - key="task_category_1" - ) - q.enqueue(job) - - Check out the full documentation_ for more details! .. _documentation: http://kq.readthedocs.io/en/master/ diff --git a/docs/callback.rst b/docs/callback.rst index 0e59c7d..e11933d 100644 --- a/docs/callback.rst +++ b/docs/callback.rst @@ -41,8 +41,9 @@ Here is a trivial example: logger.info('In topic: {}'.format(job.topic)) logger.info('Function: {}'.format(job.func)) logger.info('Arguments {}'.format(job.args)) - logger.info('Keyword arguments {}'.format(job.kwargs)) - logger.info('Timeout threshold {}'.format(job.timeout)) + logger.info('Keyword arguments: {}'.format(job.kwargs)) + logger.info('Timeout threshold: {}'.format(job.timeout)) + logger.info('Job message key: {}'.format(job.key)) if status == 'success': logger.info('The job returned: {}'.format(result)) diff --git a/docs/job.rst b/docs/job.rst index 71e3e23..ae3ac8c 100644 --- a/docs/job.rst +++ b/docs/job.rst @@ -16,7 +16,8 @@ KQ encapsulates jobs using namedtuples_. The definition is as follows: 'func', # Job function/callable 'args', # Job function arguments 'kwargs', # Job function keyword arguments - 'timeout' # Job timeout threshold in seconds + 'timeout', # Job timeout threshold in seconds + 'key' # Jobs w/ the same keys end up in the same partition ] ) diff --git a/kq/job.py b/kq/job.py index 97ab0e1..ff35fb0 100644 --- a/kq/job.py +++ b/kq/job.py @@ -3,7 +3,7 @@ from collections import namedtuple -# Named tuple which encapsulates a KQ job +# Namedtuple which encapsulates a KQ job Job = namedtuple( typename='Job', field_names=[ @@ -14,8 +14,7 @@ 'args', # Job function arguments 'kwargs', # Job function keyword arguments 'timeout', # Job timeout threshold in seconds - 'key' # Jobs of the same key end up in same partition + 'key' # Jobs w/ the same keys end up in the same partition ] ) -# Make 'key' None by defauly to support older Jobs Job.__new__.__defaults__ = (None,) diff --git a/kq/queue.py b/kq/queue.py index 3e919c5..2bd6b1b 100644 --- a/kq/queue.py +++ b/kq/queue.py @@ -172,7 +172,7 @@ def timeout(self): return self._timeout def enqueue(self, obj, *args, **kwargs): - """Serialize the function call and place it in the Kafka topic. + """Place the function call (or the job) in the Kafka topic. For example: @@ -198,14 +198,9 @@ def enqueue(self, obj, *args, **kwargs): :param kwargs: Keyword arguments for the function. Ignored if a KQ job instance is given as the first argument instead. :type kwargs: dict - :param key: Queue the job with a key. Jobs queued with a specific key - are processed in order they were queued. Setting it to None (default) - disables this behaviour. - :type key: str | unicode :return: The job that was enqueued :rtype: kq.job.Job """ - key = None if isinstance(obj, Job): func = obj.func args = obj.args @@ -213,11 +208,71 @@ def enqueue(self, obj, *args, **kwargs): key = obj.key else: func = obj + key = None if not callable(func): - raise ValueError( - '{} is not a callable'.format(func) - ) + raise ValueError('{} is not a callable'.format(func)) + + job = Job( + id=str(uuid.uuid4()), + timestamp=int(time.time()), + topic=self._topic, + func=func, + args=args, + kwargs=kwargs, + timeout=self._timeout, + key=key + ) + self._producer.send(self._topic, dill.dumps(job), key=key) + self._logger.info('Enqueued: {}'.format(job)) + return job + + def enqueue_with_key(self, key, obj, *args, **kwargs): + """Place the function call (or the job) in the Kafka topic with key. + + For example: + + .. code-block:: python + + import requests + from kq import Queue + + q = Queue() + + url = 'https://www.google.com' + + # You can queue the function call with its arguments + job = q.enqueue_with_key('my_key', requests.get, url) + + # Or you can queue a kq.job.Job instance directly + q.enqueue_with_key('my_key', job) + + :param key: The key for the Kafka message. Jobs with the same key are + guaranteed to be placed in the same Kafka partition and processed + sequentially. If a job object is enqueued, its key is overwritten. + :type key: str + :param obj: Function or the job object to enqueue. If a function is + given, the function *must* be pickle-able. + :type obj: callable | kq.job.Job + :param args: Arguments for the function. Ignored if a KQ job object + is given for the first argument instead. + :type args: list + :param kwargs: Keyword arguments for the function. Ignored if a KQ + job instance is given as the first argument instead. + :type kwargs: dict + :return: The job that was enqueued + :rtype: kq.job.Job + """ + if isinstance(obj, Job): + func = obj.func + args = obj.args + kwargs = obj.kwargs + else: + func = obj + + if not callable(func): + raise ValueError('{} is not a callable'.format(func)) + job = Job( id=str(uuid.uuid4()), timestamp=int(time.time()), diff --git a/kq/version.py b/kq/version.py index 1dea037..9cff1bc 100644 --- a/kq/version.py +++ b/kq/version.py @@ -1 +1 @@ -VERSION = '1.0.1' +VERSION = '1.1.0' diff --git a/kq/worker.py b/kq/worker.py index 4743f92..e54a094 100644 --- a/kq/worker.py +++ b/kq/worker.py @@ -124,11 +124,6 @@ def __init__(self, def __del__(self): """Commit the Kafka consumer offsets and close the consumer.""" if hasattr(self, '_consumer'): - try: - self._logger.info('Committing offsets ...') - self._consumer.commit() - except Exception as e: # pragma: no cover - self._logger.warning('Failed to commit offsets: {}'.format(e)) try: self._logger.info('Closing consumer ...') self._consumer.close() diff --git a/tests/test_misc.py b/tests/test_misc.py index bde0eee..a8ca47c 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -11,7 +11,7 @@ def test_version(): def test_job(): - job = Job(1, 2, 3, 4, 5, 6, 7) + job = Job(1, 2, 3, 4, 5, 6, 7, 8) assert job.id == 1 assert job.timestamp == 2 @@ -20,6 +20,7 @@ def test_job(): assert job.args == 5 assert job.kwargs == 6 assert job.timeout == 7 + assert job.key == 8 def test_func_repr(): diff --git a/tests/test_queue.py b/tests/test_queue.py index bca5714..92af221 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -87,6 +87,26 @@ def test_enqueue_call(producer, logger): logger.info.assert_called_once_with('Enqueued: {}'.format(job)) +def test_enqueue_call_with_key(producer, logger): + producer_cls, producer_inst = producer + + queue = Queue(hosts='host:7000', topic='foo', timeout=300) + job = queue.enqueue_with_key('bar', success_func, 1, 2, c=[3, 4, 5]) + + assert isinstance(job, Job) + assert isinstance(job.id, str) + assert isinstance(job.timestamp, int) + assert job.topic == 'foo' + assert job.func == success_func + assert job.args == (1, 2) + assert job.kwargs == {'c': [3, 4, 5]} + assert job.timeout == 300 + assert job.key == 'bar' + + producer_inst.send.assert_called_with('foo', dill.dumps(job), key='bar') + logger.info.assert_called_once_with('Enqueued: {}'.format(job)) + + def test_invalid_call(producer, logger): producer_cls, producer_inst = producer @@ -101,6 +121,20 @@ def test_invalid_call(producer, logger): assert not logger.info.called +def test_invalid_call_with_key(producer, logger): + producer_cls, producer_inst = producer + + queue = Queue(hosts='host:7000', topic='foo', timeout=300) + + for bad_func in [None, 1, {1, 2}, [1, 2, 3]]: + with pytest.raises(ValueError) as e: + queue.enqueue_with_key('foo', bad_func, 1, 2, a=3) + assert str(e.value) == '{} is not a callable'.format(bad_func) + + assert not producer_inst.send.called + assert not logger.info.called + + def test_enqueue_job(producer, logger): producer_cls, producer_inst = producer @@ -127,8 +161,46 @@ def test_enqueue_job(producer, logger): assert new_job.args == [1, 2] assert new_job.kwargs == {'a': 3} assert new_job.timeout == 300 + assert new_job.key == None + + producer_inst.send.assert_called_with( + 'foo', dill.dumps(new_job), key=None + ) + logger.info.assert_called_once_with('Enqueued: {}'.format(new_job)) + + +def test_enqueue_job_with_key(producer, logger): + producer_cls, producer_inst = producer + + queue = Queue(hosts='host:7000', topic='foo', timeout=300) + + old_job = Job( + id='2938401', + timestamp=int(time.time()), + topic='bar', + func=failure_func, + args=[1, 2], + kwargs={'a': 3}, + timeout=100, + key='bar', + ) + new_job = queue.enqueue_with_key('baz', old_job) + + assert isinstance(new_job, Job) + assert isinstance(new_job.id, str) + assert isinstance(new_job.timestamp, int) + assert old_job.id != new_job.id + assert old_job.timestamp <= new_job.timestamp + assert new_job.topic == 'foo' + assert new_job.func == failure_func + assert new_job.args == [1, 2] + assert new_job.kwargs == {'a': 3} + assert new_job.timeout == 300 + assert new_job.key == 'baz' - producer_inst.send.assert_called_with('foo', dill.dumps(new_job), key=None) + producer_inst.send.assert_called_with( + 'foo', dill.dumps(new_job), key='baz' + ) logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))