diff --git a/.gitignore b/.gitignore index 47ab9b2..305c50e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,6 @@ __pycache__/ # Distribution / packaging .Python -env/ build/ develop-eggs/ dist/ @@ -20,9 +19,11 @@ lib64/ parts/ sdist/ var/ +wheels/ *.egg-info/ .installed.cfg *.egg +MANIFEST # PyInstaller # Usually these files are written by a python script from a template @@ -42,8 +43,9 @@ htmlcov/ .cache nosetests.xml coverage.xml -*,cover +*.cover .hypothesis/ +.pytest_cache/ # Translations *.mo @@ -52,6 +54,7 @@ coverage.xml # Django stuff: *.log local_settings.py +db.sqlite3 # Flask stuff: instance/ @@ -66,7 +69,7 @@ docs/_build/ # PyBuilder target/ -# IPython Notebook +# Jupyter Notebook .ipynb_checkpoints # pyenv @@ -75,17 +78,36 @@ target/ # celery beat schedule file celerybeat-schedule -# dotenv -.env +# SageMath parsed files +*.sage.py -# virtualenv +# Environments +.env +.venv +env/ venv/ ENV/ +env.bak/ +venv.bak/ # Spyder project settings .spyderproject +.spyproject # Rope project settings .ropeproject -.idea* +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# MacOS +.DS_Store + +# PyCharm +.idea/ + +# KQ specific +output.log diff --git a/.travis.yml b/.travis.yml index 66bcaaa..f8a470d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,17 +1,19 @@ -language: python sudo: false +language: python python: - - 2.7 - - 3.4 - 3.5 - 3.6 +services: + - docker +before_install: + - docker run --name kafka -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 spotify/kafka install: - - pip install coverage - - pip install pytest - - pip install pytest-cov - - pip install python-coveralls - - python setup.py install + - pip install flake8 mock pytest pytest-cov python-coveralls sphinx sphinx_rtd_theme + - pip install . script: - - py.test --cov=kq + - python -m flake8 + - python -m sphinx -b doctest docs docs/_build + - python -m sphinx -b html -W docs docs/_build + - py.test -s -v --cov=kq after_success: - - coveralls \ No newline at end of file + - coveralls diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..95dea49 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include README.rst LICENSE +prune tests diff --git a/README.rst b/README.rst index cbc82c5..024b91a 100644 --- a/README.rst +++ b/README.rst @@ -13,7 +13,7 @@ KQ: Kafka-based Job Queue for Python :target: https://badge.fury.io/py/kq :alt: Package Version -.. image:: https://img.shields.io/badge/python-2.7%2C%203.4%2C%203.5%2C%203.6-blue.svg +.. image:: https://img.shields.io/badge/python-3.5%2C%203.6-blue.svg :target: https://github.com/joowani/kq :alt: Python Versions @@ -31,117 +31,138 @@ KQ: Kafka-based Job Queue for Python | -KQ (Kafka Queue) is a lightweight Python library which provides a simple API -to process jobs asynchronously in the background. It uses `Apache Kafka`_ and -is designed primarily for ease of use. +**KQ (Kafka Queue)** is a lightweight Python library which lets you queue and +execute jobs asynchronously using `Apache Kafka`_. It uses kafka-python_ under +the hood. -.. _Apache Kafka: https://kafka.apache.org +Announcements +============= +* KQ version `2.0.0`_ is now out! +* Please see the releases_ page for latest updates. Requirements ============ -- Apache Kafka 0.9+ -- Python 2.7, 3.4, 3.5 or 3.6 - +* `Apache Kafka`_ 0.9+ +* Python 3.5+ -Getting Started -=============== +Installation +============ -First, ensure that your Kafka instance is up and running: +To install a stable version from PyPI_ (recommended): .. code-block:: bash - # This command is just an example - ~$ ./kafka-server-start.sh -daemon server.properties - + ~$ pip install kq -Let's say you want to run the following function asynchronously: +To install the latest version directly from GitHub_: -.. code-block:: python +.. code-block:: bash - import time + ~$ pip install -e git+git@github.com:joowani/kq.git@master#egg=kq - def my_func(foo, bar, baz=None): - """This is a blocking function.""" - time.sleep(10) - return foo, bar, baz +You may need to use ``sudo`` depending on your environment. +Getting Started +=============== -Start a KQ worker: +First, ensure that your Kafka instance is up and running: .. code-block:: bash - ~$ kq worker --verbose - [INFO] Starting Worker(topic=default) ... - + ~$ ./kafka-server-start.sh -daemon server.properties -Enqueue the function call as a job: +Define your KQ worker module: .. code-block:: python - # Import the blocking function - from my_module import my_func + # my_worker.py - # Initialize a queue - from kq import Queue - q = Queue() + import logging - # Enqueue the function call - q.enqueue(my_func, 1, 2, baz=3) + from kafka import KafkaConsumer + from kq import Worker + # Set up logging. + formatter = logging.Formatter('[%(levelname)s] %(message)s') + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger = logging.getLogger('kq.worker') + logger.setLevel(logging.DEBUG) + logger.addHandler(stream_handler) -Sit back and watch the worker process it in the background: + # Set up a Kafka consumer. + consumer = KafkaConsumer( + bootstrap_servers='127.0.0.1:9092', + group_id='group', + auto_offset_reset='latest' + ) -.. code-block:: bash + # Set up a worker. + worker = Worker(topic='topic', consumer=consumer) + worker.start() - ~$ kq worker --verbose - [INFO] Starting Worker(topic=default) ... - [INFO] Processing Record(topic=default, partition=5, offset=3) ... - [INFO] Running Job 1b92xle0: my_module.my_func(1, 2, baz=3) ... - [INFO] Job 1b92xle0 returned: (1, 2, 3) +Start the worker: +.. code-block:: bash -Check out the full documentation_ for more details! + ~$ python my_worker.py + [INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ... -.. _documentation: http://kq.readthedocs.io/en/master/ +Enqueue a function call: +.. code-block:: python -Installation -============ + import requests -To install a stable version from PyPI_ (recommended): + from kafka import KafkaProducer + from kq import Queue -.. code-block:: bash + # Set up a Kafka producer. + producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') - ~$ pip install kq + # Set up a queue. + queue = Queue(topic='topic', producer=producer) + # Enqueue a function call. + job = queue.enqueue(requests.get, 'https://www.google.com') -To install the latest version directly from GitHub_: +Sit back and watch the worker process it in the background: .. code-block:: bash - ~$ pip install -e git+git@github.com:joowani/kq.git@master#egg=kq + ~$ python my_worker.py + [INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ... + [INFO] Processing Message(topic=topic, partition=0, offset=0) ... + [INFO] Executing job c7bf2359: requests.api.get('https://www.google.com') + [INFO] Job c7bf2359 returned: -You may need to use ``sudo`` depending on your environment setup. +**NEW in 2.0.0**: You can now specify the job timeout, message key and partition: -.. _PyPI: https://pypi.python.org/pypi/kq -.. _GitHub: https://github.com/joowani/kq +.. code-block:: python + job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://www.google.com') + +Check out the full documentation_ for more information. Contributing ============ Please have a look at this page_ before submitting a pull request. Thanks! -.. _page: - http://kq.readthedocs.io/en/master/contributing.html - Credits ======= -This project was inspired by RQ_ and built on top of kafka-python_. +This project was inspired by RQ_. -.. _RQ: https://github.com/nvie/rq +.. _Apache Kafka: https://kafka.apache.org .. _kafka-python: https://github.com/dpkp/kafka-python +.. _2.0.0: https://github.com/joowani/kq/releases/tag/2.0.0 +.. _releases: https://github.com/joowani/kq/releases +.. _PyPI: https://pypi.python.org/pypi/kq +.. _GitHub: https://github.com/joowani/kq +.. _documentation: http://kq.readthedocs.io +.. _page: http://kq.readthedocs.io/en/master/contributing.html +.. _RQ: https://github.com/rq/rq diff --git a/docs/callback.rst b/docs/callback.rst index e11933d..df6abd3 100644 --- a/docs/callback.rst +++ b/docs/callback.rst @@ -1,95 +1,70 @@ -Callbacks ---------- +Callback +-------- -A callback can be passed to a KQ worker to extend the latter's functionality. -Whenever a KQ worker processes a job, the callback function (if given) is -automatically invoked. The callback must take exactly the following positional -arguments: +KQ allows you to assign a callback function to workers. The callback function +is invoked every time a message is processed. It must take the following +positional arguments: -- status (str | unicode): - The status of the job execution, which can be ``"timeout"``, ``"failure"`` - or ``"success"``. +* **status** (str): Job status. Possible values are: -- job (kq.job.Job): - The namedtuple object representing the job executed. + * ``invalid`` : Job could not be deserialized, or was malformed. + * ``failure`` : Job raised an exception. + * ``timeout`` : Job took too long and timed out. + * ``success`` : Job successfully finished and returned a result. -- result (object): - The result of the job execution. +* **message** (:doc:`kq.Message `): Kafka message. +* **job** (:doc:`kq.Job ` | None): Job object, or None if Kafka message + was invalid or malformed. +* **result** (object | None): Job result, or None if an exception was raised. +* **exception** (Exception | None): Exception raised, or None if job finished + successfully. +* **stacktrace** (str | None): Exception stacktrace, or None if job finished + successfully. -- exception (Exception | None) - The exception raised on job execution, or ``None`` if there were no errors. +You can inject your callback function during :doc:`worker ` +initialization. -- traceback (str | unicode | None) - The traceback of the exception raised, or ``None`` if there were no errors. +**Example:** +.. testcode:: -Here is a trivial example: - -.. code-block:: python - - import logging + from kafka import KafkaConsumer + from kq import Worker - from kq import Job - logger = logging.getLogger(__name__) + def callback(status, message, job, result, exception, stacktrace): + """This is an example callback showing what arguments to expect.""" - def callback(status, job, result, exception, traceback): + assert status in ['invalid', 'success', 'timeout', 'failure'] + assert isinstance(message, kq.Message) - assert isinstance(job, Job) - logger.info('Job UUID: {}'.format(job.id)) - logger.info('Enqueued at {}'.format(job.timestamp)) - logger.info('In topic: {}'.format(job.topic)) - logger.info('Function: {}'.format(job.func)) - logger.info('Arguments {}'.format(job.args)) - logger.info('Keyword arguments: {}'.format(job.kwargs)) - logger.info('Timeout threshold: {}'.format(job.timeout)) - logger.info('Job message key: {}'.format(job.key)) + if status == 'invalid': + assert job is None + assert result is None + assert exception is None + assert stacktrace is None if status == 'success': - logger.info('The job returned: {}'.format(result)) + assert isinstance(job, kq.Job) assert exception is None - assert traceback is None + assert stacktrace is None elif status == 'timeout': - logger.info('The job took too long and timed out') + assert isinstance(job, kq.Job) assert result is None assert exception is None - assert traceback is None + assert stacktrace is None elif status == 'failure': - logger.info('The job raised an exception on runtime') + assert isinstance(job, kq.Job) assert result is None assert exception is not None - assert traceback is not None - - -Once the callback is defined, it can be passed in during initialization: - -.. code-block:: python + assert stacktrace is not None - from kq import Worker - from my_module import callback - - worker = Worker( - hosts='host:7000,host:8000', - callback=callback, # pass in the callback + consumer = KafkaConsumer( + bootstrap_servers='127.0.0.1:9092', + group_id='group' ) - # The worker will automatically invoke the callback - worker.start() - - -When using the :ref:`KQ command line tool `, the path to -Python module with the callback function definition can be provided instead: - -.. code-block:: bash - - ~$ kq worker --verbose --callback=/my/module/with/callback/function - - [INFO] Starting Worker(topic=default) ... - [INFO] Processing Record(topic=default, partition=5, offset=3) ... - [INFO] Running Job 1b92xle0: my_module.my_func(1, 2, baz=3) ... - [INFO] Job 1b92xle0 returned: (1, 2, 3) - [INFO] Executing callback ... -The module must contain a function named ``callback`` with the correct -signature. + # Inject your callback function during worker initialization. + worker = Worker('topic', consumer, callback=callback) diff --git a/docs/cli.rst b/docs/cli.rst deleted file mode 100644 index 5fb73de..0000000 --- a/docs/cli.rst +++ /dev/null @@ -1,45 +0,0 @@ -.. _command-line-tool: - -Command Line Tool ------------------ - -KQ comes with a command line tool ``kq`` that let's you quickly spawn workers -or view the offsets on topic partitions: - -.. code-block:: bash - - ~$ kq --help - - KQ (Kafka Queue) command line tool - - Usage: - kq (worker|info) [--hosts=] - [--topic=] - [--timeout=] - [--callback=] - [--job-size=] - [--cafile=] - [--certfile=] - [--keyfile=] - [--crlfile=] - [--proc-ttl=] - [--offset=] - [--verbose] - kq --help - kq --version - - Options: - --hosts= Comma-separated Kafka hosts [default: 127.0.0.1] - --topic= Name of the Kafka topic [default: default] - --job-size= Maximum job size in bytes [default: 1048576] - --timeout= Job timeout threshold in seconds - --callback= Python module containing the callback function - --cafile= Full path to SSL trusted CA certificate - --certfile= Full path to SSL client certificate - --keyfile= Full path to SSL private key - --crlfile= Full path to SSL crlfile for verifying expiry - --proc-ttl= Records read before re-spawning process [default: 5000] - --offset= Kafka consumer offset reset policy [default: latest] - --verbose Turn on debug logging output - --help Display this help menu - --version Display the version of KQ diff --git a/docs/conf.py b/docs/conf.py index b52095c..ddb54bc 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -19,9 +19,12 @@ import os import sys -from kq import VERSION +sys.path.insert(0, os.path.abspath('..')) +sys.path.append(os.path.dirname(__file__)) -sys.path.insert(0, os.path.abspath('../kq')) +_version = {} +with open("../kq/version.py") as fp: + exec(fp.read(), _version) # -- General configuration ------------------------------------------------ @@ -34,6 +37,10 @@ # ones. extensions = [ 'sphinx.ext.autodoc', + 'sphinx.ext.doctest', + 'sphinx.ext.coverage', + 'sphinx.ext.viewcode', + 'sphinx.ext.githubpages', ] # Add any paths that contain templates here, relative to this directory. @@ -62,9 +69,9 @@ # built documents. # # The short X.Y version. -version = VERSION +version = _version['__version__'] # The full version, including alpha/beta/rc tags. -release = VERSION +release = _version['__version__'] # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -343,3 +350,8 @@ # texinfo_no_detailmenu = False autodoc_member_order = 'bysource' + +doctest_global_setup = """ +from unittest import mock +mock.patch('math.inf', 0).start() +""" diff --git a/docs/contributing.rst b/docs/contributing.rst index 1aa4172..032d6df 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst @@ -1,43 +1,36 @@ -.. _contributing-page: - Contributing ------------ -Instructions +Requirements ============ Before submitting a pull request on GitHub_, please make sure you meet the -**requirements**: - -* The pull request points to the dev_ (development) branch. -* All changes are squashed into a single commit (I like to use git rebase -i - to do this). -* The commit message is in present tense (good: "Add feature", bad: - "Added feature"). -* Correct and consistent style: Sphinx_-compatible docstrings, using snake - vs. camel casing properly_ and PEP8_ compliance. Use flake8_ (see below). -* No classes/methods/functions with missing docstrings or commented-out lines. - You can refer to existing docstrings for examples. -* The test coverage_ remains at %100. Sometimes you may find yourself having to - write superfluous unit tests to keep this number up. If a piece of code is - trivial and has no need for unittests, use this_ to exclude it from coverage. -* No build failures on TravisCI_. The builds automatically trigger on PR - submissions. -* Does not break backward-compatibility (unless there is a really good reason). -* Compatibility with all supported Python versions (2.7, 3.4, 3.5, 3.6). +following requirements: + +* The pull request points to dev_ branch. +* Changes are squashed into a single commit. I like to use git rebase for this. +* Commit message is in present tense. For example, "Fix bug" is good while + "Fixed bug" is not. +* Sphinx_-compatible docstrings. +* PEP8_ compliance. +* No missing docstrings or commented-out lines. +* Test coverage_ remains at %100. If a piece of code is trivial and does not + need unit tests, use this_ to exclude it from coverage. +* No build failures on `Travis CI`_. Builds automatically trigger on pull + request submissions. +* Documentation is kept up-to-date with the new changes (see below). .. warning:: - The dev branch is occasionally rebased_, and its commit history may be - overwritten in the process (I try very hard never to do this). So before - you begin feature work, git fetch/pull to ensure that branches have not - diverged. If you see git conflicts and just want to start from scratch, - run this command: + The dev branch is occasionally rebased, and its commit history may be + overwritten in the process. Before you begin your feature work, git fetch + or pull to ensure that your local branch has not diverged. If you see git + conflicts and want to start with a clean slate, run the following commands: .. code-block:: bash ~$ git checkout dev ~$ git fetch origin - ~$ git reset --hard origin/dev # THIS WILL WIPE ALL CHANGES + ~$ git reset --hard origin/dev # THIS WILL WIPE ALL LOCAL CHANGES Style ===== @@ -51,24 +44,22 @@ To ensure PEP8_ compliance, run flake8_: ~$ cd kq ~$ flake8 -You should try to resolve all issues reported. If there is a good reason to -ignore errors from a specific piece of code, however, visit here_ to see how -to exclude the lines from the check. +If there is a good reason to ignore a warning, see here_ on how to exclude it. Testing ======= -To test your changes, run the unit tests that come with **kq** on your -local machine. The tests use pytest_. +To test your changes, run the integration test suite that comes with **kq**. It +uses pytest_ and requires an actual Kafka instance. -To run the unit tests: +To run the integration test suite: .. code-block:: bash ~$ pip install pytest ~$ git clone https://github.com/joowani/kq.git ~$ cd kq - ~$ py.test --verbose + ~$ py.test -v -s --host=127.0.0.1 --port=9092 # Enter your Kafka host and port To run the unit tests with coverage report: @@ -77,16 +68,18 @@ To run the unit tests with coverage report: ~$ pip install coverage pytest pytest-cov ~$ git clone https://github.com/joowani/kq.git ~$ cd kq - ~$ py.test --verbose --cov-report=html --cov=kq - ~$ # Open the generated file htmlcov/index.html in a browser + ~$ py.test -v -s --host=127.0.0.1 --port=9092 --cov=kq --cov-report=html + + # Open the generated file htmlcov/index.html in a browser +As the test suite creates real topics and messages, it should only be run in +development environments. Documentation ============= -The documentation (including the README) is written in reStructuredText_ and -uses Sphinx_. To build the HTML version of the documentation on your local -machine: +The documentation including the README is written in reStructuredText_ and uses +Sphinx_. To build an HTML version on your local machine: .. code-block:: bash @@ -94,19 +87,17 @@ machine: ~$ git clone https://github.com/joowani/kq.git ~$ cd kq/docs ~$ sphinx-build . build - ~$ # Open the generated file build/index.html in a browser + # Open the generated file build/index.html in a browser -As always, thanks for your contribution! +As always, thank you for your contribution! -.. _rebased: https://git-scm.com/book/en/v2/Git-Branching-Rebasing .. _dev: https://github.com/joowani/kq/tree/dev .. _GitHub: https://github.com/joowani/kq -.. _properly: https://stackoverflow.com/questions/159720 .. _PEP8: https://www.python.org/dev/peps/pep-0008/ .. _coverage: https://coveralls.io/github/joowani/kq .. _this: http://coverage.readthedocs.io/en/latest/excluding.html -.. _TravisCI: https://travis-ci.org/joowani/kq +.. _Travis CI: https://travis-ci.org/joowani/kq .. _Sphinx: https://github.com/sphinx-doc/sphinx .. _flake8: http://flake8.pycqa.org .. _here: http://flake8.pycqa.org/en/latest/user/violations.html#in-line-ignoring-errors diff --git a/docs/decorator.rst b/docs/decorator.rst deleted file mode 100644 index 19b4120..0000000 --- a/docs/decorator.rst +++ /dev/null @@ -1,60 +0,0 @@ -Decorator ---------- - -KQ provides a decorator which adds to the wrapped function a new method called -**delay**. The **delay** method takes the same set of arguments as the wrapped -function itself. When the method is called, instead of executing the function, -it enqueues it as a job. Here is an example: - -Decorate a function in a module that workers can import: - - -.. code-block:: python - - from kq import Queue - - # Initialize the queue - queue = Queue(topic='foobar') - - # Decorate a function to mark it as a job - @queue.job - def calculate_sum(a, b, c): - return a + b + c - - -Start a KQ worker: - -.. code-block:: bash - - ~$ kq worker --verbose - [INFO] Starting Worker(topic=foobar) ... - - -Import the decorated function and call the new **delay** method: - -.. code-block:: python - - from my_module import calculate_sum - from kq import Job - - # The function can still be executed normally - result = calculate_sum(1, 2, 3) - assert result == 6 - - # Call the delay method to enqueue the function call as a job - result = calculate_sum.delay(1, 2, 3) - assert isinstance(result, Job) - - -The worker processes the job in the background: - -.. code-block:: bash - - ~$ kq worker --verbose - [INFO] Starting Worker(topic=foobar) ... - [INFO] Processing Record(topic=foobar, partition=1, offset=2) ... - [INFO] Running Job 1b92xle0: my_module.calculate_sum(1, 2, 3) ... - [INFO] Job 1b92xle0 returned: 6 - - -The decorator is simply an alternative to the ``enqueue`` method. \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 1192b5c..1652269 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,20 +1,15 @@ KQ: Kafka-based Job Queue for Python ------------------------------------ -Welcome to the documentation for **KQ (Kafka Queue)**, a light-weight Python -library which provides a simple API to queue and process jobs asynchronously -in the background. It is backed by `Apache Kafka`_ and designed primarily for -ease of use. - -.. _Apache Kafka: https://kafka.apache.org - +Welcome to the documentation for **KQ (Kafka Queue)**, a lightweight Python +library which lets you queue and execute jobs asynchronously using `Apache Kafka`_. +It uses kafka-python_ under the hood. Requirements ============ -- Apache Kafka 0.9+ -- Python 2.7, 3.4, 3.5 or 3.6 - +- `Apache Kafka`_ 0.9+ +- Python 3.5+ Installation ============ @@ -25,18 +20,13 @@ To install a stable version from PyPI_ (recommended): ~$ pip install kq - To install the latest version directly from GitHub_: .. code-block:: bash ~$ pip install -e git+git@github.com:joowani/kq.git@master#egg=kq -You may need to use ``sudo`` depending on your environment setup. - -.. _PyPI: https://pypi.python.org/pypi/kq -.. _GitHub: https://github.com/joowani/kq - +You may need to use ``sudo`` depending on your environment. Contents ======== @@ -48,18 +38,13 @@ Contents queue worker job - manager - decorator + message callback - cli + serializer logging contributing - -Credits -======= - -This project was inspired by RQ_ and built on top of kafka-python_. - -.. _RQ: https://github.com/nvie/rq -.. _kafka-python: https://github.com/dpkp/kafka-python \ No newline at end of file +.. _Apache Kafka: https://kafka.apache.org +.. _kafka-python: https://github.com/dpkp/kafka-python +.. _PyPI: https://pypi.python.org/pypi/kq +.. _GitHub: https://github.com/joowani/kq diff --git a/docs/job.rst b/docs/job.rst index ae3ac8c..e042239 100644 --- a/docs/job.rst +++ b/docs/job.rst @@ -1,30 +1,41 @@ Job ---- -KQ encapsulates jobs using namedtuples_. The definition is as follows: +KQ encapsulates jobs using ``kq.Job`` namedtuples, which have the following +fields: -.. code-block:: python +* **id** (str): Job ID. +* **timestamp** (int): Unix timestamp indicating the time of enqueue. +* **topic** (str): Name of the Kafka topic. +* **func** (callable): Function to execute. +* **args** (list | tuple): Positional arguments for the function. +* **kwargs** (dict): Keyword arguments for the function. +* **timeout** (int | float): Job timeout threshold in seconds. +* **key** (bytes | None): Kafka message key. Jobs with the same keys are sent + to the same topic partition and executed sequentially. Applies only if the + **partition** field is not set, and the producer's partitioner configuration + is left as default. +* **partition** (int | None): Kafka topic partition. If set, the **key** field + is ignored. + +.. testcode:: from collections import namedtuple Job = namedtuple( typename='Job', - field_names=[ - 'id', # UUID of the job - 'timestamp', # Unix timestamp indicating when the job was queued - 'topic', # Name of the Kafka topic the job was enqueued in - 'func', # Job function/callable - 'args', # Job function arguments - 'kwargs', # Job function keyword arguments - 'timeout', # Job timeout threshold in seconds - 'key' # Jobs w/ the same keys end up in the same partition - ] + field_names=( + 'id', + 'timestamp', + 'topic', + 'func', + 'args', + 'kwargs', + 'timeout', + 'key', + 'partition' + ) ) -When a function call is enqueued, an instance of the job namedtuple is created. -The instance is then serialized using the dill_ Python library and placed in a -Kafka topic. A worker then fetches, de-serializes and executes the job. - -.. _dill: https://github.com/uqfoundation/dill -.. _namedtuples: - https://docs.python.org/2/library/collections.html#collections.namedtuple \ No newline at end of file +When a function call is enqueued, an instance of this namedtuple is created to +store the metadata. It is then serialized into a byte string and sent to Kafka. diff --git a/docs/logging.rst b/docs/logging.rst index 5a1748a..d0596d1 100644 --- a/docs/logging.rst +++ b/docs/logging.rst @@ -1,38 +1,39 @@ Logging ------- -By default KQ logs messages using the ``kq`` logger. +By default, :doc:`queues ` log messages via ``kq.queue`` logger, and +:doc:`workers ` log messages via ``kq.worker`` logger. You can either +use these loggers or inject your own during queue/worker initialization. -Here is an example showing how the logger can be enabled and customized: +**Example:** -.. code-block:: python +.. testcode:: import logging - from kq import Queue + from kafka import KafkaConsumer, KafkaProducer + from kq import Queue, Worker - logger = logging.getLogger('kq') - - # Set the logging level - logger.setLevel(logging.DEBUG) - - # Attach a handler - handler = logging.StreamHandler() formatter = logging.Formatter('[%(levelname)s] %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) - # Enqueue function calls - q = Queue() - q.enqueue(int, 1) - q.enqueue(str, 1) - q.enqueue(bool, 1) + # Set up "kq.queue" logger. + queue_logger = logging.getLogger('kq.queue') + queue_logger.setLevel(logging.INFO) + queue_logger.addHandler(stream_handler) + # Set up "kq.worker" logger. + worker_logger = logging.getLogger('kq.worker') + worker_logger.setLevel(logging.DEBUG) + worker_logger.addHandler(stream_handler) -The logging output for above would look something like this: + # Alternatively, you can inject your own loggers. + queue_logger = logging.getLogger('your_worker_logger') + worker_logger = logging.getLogger('your_worker_logger') -.. code-block:: bash + producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + consumer = KafkaConsumer(bootstrap_servers='127.0.0.1:9092', group_id='group') - [INFO] Enqueued: Job(id='64ee47d', topic='default', func= ...) - [INFO] Enqueued: Job(id='4578f57', topic='default', func= ...) - [INFO] Enqueued: Job(id='792643c', topic='default', func= ...) + queue = Queue('topic', producer, logger=queue_logger) + worker = Worker('topic', consumer, logger=worker_logger) diff --git a/docs/manager.rst b/docs/manager.rst deleted file mode 100644 index bfbee88..0000000 --- a/docs/manager.rst +++ /dev/null @@ -1,6 +0,0 @@ -Manager -------- - -.. autoclass:: kq.manager.Manager - :members: - :member-order: bysource diff --git a/docs/message.rst b/docs/message.rst new file mode 100644 index 0000000..c351801 --- /dev/null +++ b/docs/message.rst @@ -0,0 +1,30 @@ +Message +------- + +KQ encapsulates Kafka messages using ``kq.Message`` namedtuples, which have +the following fields: + +* **topic** (str): Name of the Kafka topic. +* **partition** (int): Kafka topic partition. +* **offset** (int): Partition offset. +* **key** (bytes | None): Kafka message key. +* **value** (bytes): Kafka message payload. + +.. testcode:: + + from collections import namedtuple + + Message = namedtuple( + typename='Message', + field_names=( + 'topic', + 'partition', + 'offset', + 'key', + 'value' + ) + ) + +Raw Kafka messages are converted into these namedtuples, which are then sent +to :doc:`workers ` (and also to :doc:`callback functions ` +if defined). diff --git a/docs/overview.rst b/docs/overview.rst index b9ade57..751e06c 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -5,54 +5,80 @@ First, ensure that your Kafka instance is up and running: .. code-block:: bash - # This command is just an example ~$ ./kafka-server-start.sh -daemon server.properties +Define your KQ worker module: -Let's say you want to run the following function asynchronously: +.. testsetup:: + + from unittest import mock + mock.patch('math.inf', 0).start() .. code-block:: python - import time + # my_worker.py - def my_func(foo, bar, baz=None): - """This is a blocking function.""" - time.sleep(10) - return foo, bar, baz + import logging + from kafka import KafkaConsumer + from kq import Worker -Start a KQ worker: + # Set up logging. + formatter = logging.Formatter('[%(levelname)s] %(message)s') + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger = logging.getLogger('kq.worker') + logger.setLevel(logging.DEBUG) + logger.addHandler(stream_handler) -.. code-block:: bash + # Set up a Kafka consumer. + consumer = KafkaConsumer( + bootstrap_servers='127.0.0.1:9092', + group_id='group', + auto_offset_reset='latest' + ) - ~$ kq worker --verbose - [INFO] Starting Worker(topic=default) ... + # Set up a worker. + worker = Worker(topic='topic', consumer=consumer) + worker.start() +Start the worker: -Enqueue the function call as a job: +.. code-block:: bash -.. code-block:: python + ~$ python my_worker.py + [INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ... + +Enqueue a function call: - # Import the blocking function - from my_module import my_func +.. testcode:: - # Initialize a queue + import requests + + from kafka import KafkaProducer from kq import Queue - q = Queue() - # Enqueue the function call - q.enqueue(my_func, 1, 2, baz=3) + # Set up a Kafka producer. + producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + + # Set up a queue. + queue = Queue(topic='topic', producer=producer) + # Enqueue a function call. + job = queue.enqueue(requests.get, 'https://www.google.com') Sit back and watch the worker process it in the background: .. code-block:: bash - ~$ kq worker --verbose - [INFO] Starting Worker(topic=default) ... - [INFO] Processing Record(topic=default, partition=5, offset=3) ... - [INFO] Running Job 1b92xle0: my_module.my_func(1, 2, baz=3) ... - [INFO] Job 1b92xle0 returned: (1, 2, 3) + ~$ python my_worker.py + [INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ... + [INFO] Processing Message(topic=topic, partition=0, offset=0) ... + [INFO] Executing job c7bf2359: requests.api.get('https://www.google.com') + [INFO] Job c7bf2359 returned: +You can also specify the job timeout, message key and partition: + +.. code-block:: python -And that's essentially all there is to KQ! \ No newline at end of file + job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://www.google.com') diff --git a/docs/serializer.rst b/docs/serializer.rst new file mode 100644 index 0000000..d8c6bff --- /dev/null +++ b/docs/serializer.rst @@ -0,0 +1,46 @@ +Serializer +---------- + +You can inject your own functions for serializing (pickling) jobs. By default, +KQ uses the dill_ library. + +.. _dill: https://github.com/uqfoundation/dill + +The serializer function must take a :doc:`job ` namedtuple and return a +byte string. You can inject it during queue initialization. + +**Example:** + +.. testcode:: + + # Let's use pickle instead of dill + import pickle + + from kafka import KafkaProducer + from kq import Queue + + producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + + # Inject your serializer function during queue initialization. + queue = Queue('topic', producer, serializer=pickle.dumps) + +The deserializer function must take a byte string and returns a :doc:`job ` +namedtuple. You can inject it during worker initialization. + +**Example:** + +.. testcode:: + + # Let's use pickle instead of dill + import pickle + + from kafka import KafkaConsumer + from kq import Worker + + consumer = KafkaConsumer( + bootstrap_servers='127.0.0.1:9092', + group_id='group' + ) + + # Inject your deserializer function during worker initialization. + worker = Worker('topic', consumer, deserializer=pickle.loads) diff --git a/example/callback b/example/callback deleted file mode 100644 index f5c0a1a..0000000 --- a/example/callback +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python - -import logging - -logger = logging.getLogger(__name__) - - -def callback(status, job, result, exception, traceback): - """Function executed by a worker after a job is fetched and consumed. - - This is an example callback function. - - :param status: The status of the job consumption. Possible values are - ``timeout', ``failure`` and ``success``. - :type status: str | unicode - :param job: The job consumed by the worker - :type job: kq.job.Job - :param result: The result of the job execution. - :type result: object - :param exception: Exception raised while the job was running (i.e. status - was ``failure``), or ``None`` if there were no errors (i.e. status was - ``success``). - :type exception: Exception - :param traceback: The traceback of the exception (i.e. status was - ``failure``) was running or ``None`` if there were no errors (i.e. - status was ``success``). - :type traceback: str | unicode - """ - logger.info('Job UUID: {}'.format(job.id)) - logger.info('Enqueued at {}'.format(job.timestamp)) - logger.info('In topic: {}'.format(job.topic)) - logger.info('Function: {}'.format(job.func)) - logger.info('Arguments {}'.format(job.args)) - logger.info('Keyword arguments {}'.format(job.kwargs)) - logger.info('Timeout threshold {}'.format(job.timeout)) - - if status == 'success': - logger.info('The job returned: {}'.format(result)) - assert exception is None - assert traceback is None - - elif status == 'timeout': - logger.info('The job took too long and timed out') - assert result is None - assert exception is None - assert traceback is None - - elif status == 'failure': - logger.info('The job raised an exception on runtime') - assert result is None - assert exception is not None - assert traceback is not None diff --git a/example/worker-cli.py b/example/worker-cli.py new file mode 100644 index 0000000..078db65 --- /dev/null +++ b/example/worker-cli.py @@ -0,0 +1,90 @@ +import logging + +import dill +from kafka import KafkaConsumer +from kq import Job, Message, Worker + +# Set up logging +formatter = logging.Formatter( + fmt='[%(asctime)s][%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(formatter) +logger = logging.getLogger('kq.worker') +logger.setLevel(logging.DEBUG) +logger.addHandler(stream_handler) + + +def callback(status, message, job, result, exception, stacktrace): + """Example callback function. + + :param status: Job status. Possible values are "invalid" (job could not be + deserialized or was malformed), "failure" (job raised an exception), + "timeout" (job timed out), or "success" (job finished successfully and + returned a result). + :type status: str + :param message: Kafka message. + :type message: kq.Message + :param job: Job object, or None if **status** was "invalid". + :type job: kq.Job + :param result: Job result, or None if an exception was raised. + :type result: object | None + :param exception: Exception raised by job, or None if there was none. + :type exception: Exception | None + :param stacktrace: Exception traceback, or None if there was none. + :type stacktrace: str | None + """ + assert status in ['invalid', 'success', 'timeout', 'failure'] + assert isinstance(message, Message) + + if status == 'invalid': + assert job is None + assert result is None + assert exception is None + assert stacktrace is None + + if status == 'success': + assert isinstance(job, Job) + assert exception is None + assert stacktrace is None + + elif status == 'timeout': + assert isinstance(job, Job) + assert result is None + assert exception is None + assert stacktrace is None + + elif status == 'failure': + assert isinstance(job, Job) + assert result is None + assert exception is not None + assert stacktrace is not None + + +def deserializer(serialized): + """Example deserializer function with extra sanity checking. + + :param serialized: Serialized byte string. + :type serialized: bytes + :return: Deserialized job object. + :rtype: kq.Job + """ + assert isinstance(serialized, bytes), 'Expecting a bytes' + return dill.loads(serialized) + + +if __name__ == '__main__': + consumer = KafkaConsumer( + bootstrap_servers='127.0.0.1:9092', + group_id='group', + enable_auto_commit=False, + auto_offset_reset='latest' + ) + worker = Worker( + topic='topic', + consumer=consumer, + callback=callback, + deserializer=deserializer + ) + worker.start() diff --git a/kq/__init__.py b/kq/__init__.py index 5afffed..e65ac80 100644 --- a/kq/__init__.py +++ b/kq/__init__.py @@ -1,16 +1,7 @@ -from __future__ import absolute_import, print_function, unicode_literals - -import logging +__all__ = ['Job', 'Message', 'Queue', 'Worker', '__version__'] from kq.job import Job -from kq.manager import Manager -from kq.worker import Worker +from kq.message import Message from kq.queue import Queue -from kq.version import VERSION - -__all__ = ['Job', 'Manager', 'Worker', 'Queue', 'VERSION'] - -# Reduce logging noise from PyKafka -for name, logger in logging.Logger.manager.loggerDict.items(): - if name.startswith('kafka') and isinstance(logger, logging.Logger): - logger.setLevel(logging.CRITICAL) +from kq.worker import Worker +from kq.version import __version__ diff --git a/kq/cli.py b/kq/cli.py deleted file mode 100644 index 25425d9..0000000 --- a/kq/cli.py +++ /dev/null @@ -1,110 +0,0 @@ -""" -KQ (Kafka Queue) command line tool - -Usage: - kq (worker|info) [--hosts=] - [--topic=] - [--timeout=] - [--callback=] - [--job-size=] - [--cafile=] - [--certfile=] - [--keyfile=] - [--crlfile=] - [--proc-ttl=] - [--offset=] - [--verbose] - kq --help - kq --version - -Options: - --hosts= Comma-separated Kafka hosts [default: 127.0.0.1] - --topic= Name of the Kafka topic [default: default] - --job-size= Maximum job size in bytes [default: 1048576] - --timeout= Job timeout threshold in seconds - --callback= Python module containing the callback function - --cafile= Full path to SSL trusted CA certificate - --certfile= Full path to SSL client certificate - --keyfile= Full path to SSL private key - --crlfile= Full path to SSL crlfile for verifying expiry - --proc-ttl= Records read before respawning process [default: 5000] - --offset= Kafka consumer offset reset policy [default: latest] - --verbose Turn on debug logging output - --help Display this help menu - --version Display the version of KQ - -""" - -import os -import sys -import logging - -import docopt - -import kq - - -def entry_point(): - args = docopt.docopt(__doc__, version=kq.VERSION) - - # Set up the logger - logger = logging.getLogger('kq') - if args['--verbose']: - logger.setLevel(logging.DEBUG) - else: - logger.setLevel(logging.WARNING) - - stream_handler = logging.StreamHandler() - stream_handler.setLevel(logging.DEBUG) - stream_handler.setFormatter(logging.Formatter( - '[%(asctime)s] [%(levelname)s] %(message)s', - )) - logger.addHandler(stream_handler) - - # Import the callback function if given - callback = None - if args['--callback']: # pragma: no cover - try: - path = os.path.abspath( - os.path.expanduser(args['--callback']) - ) - if sys.version_info.major == 2: - import imp - module = imp.load_source('kqconfig', path) - elif sys.version_info.minor < 5: - from importlib.machinery import SourceFileLoader as Loader - module = Loader('kqconfig', path).load_module() - else: - from importlib import util - spec = util.spec_from_file_location("kqconfig", path) - module = util.module_from_spec(spec) - spec.loader.exec_module(module) - callback = getattr(module, 'callback') - except Exception as e: - logger.exception('Failed to load callback: {}'.format(e)) - - if args['info']: - kq.Manager( - hosts=args['--hosts'], - cafile=args['--cafile'], - certfile=args['--certfile'], - keyfile=args['--keyfile'], - crlfile=args['--crlfile'], - ).info() - - elif args['worker']: - timeout = args['--timeout'] - print(args['--offset']) - kq.Worker( - hosts=args['--hosts'], - topic=args['--topic'], - timeout=int(timeout) if timeout else None, - callback=callback, - job_size=int(args['--job-size']), - cafile=args['--cafile'], - certfile=args['--certfile'], - keyfile=args['--keyfile'], - crlfile=args['--crlfile'], - proc_ttl=int(args['--proc-ttl']), - offset_policy=args['--offset'] - ).start() diff --git a/kq/job.py b/kq/job.py index ff35fb0..9bd6b2f 100644 --- a/kq/job.py +++ b/kq/job.py @@ -1,20 +1,22 @@ -from __future__ import absolute_import, print_function, unicode_literals +__all__ = ['Job'] from collections import namedtuple - -# Namedtuple which encapsulates a KQ job +# Namedtuple which encapsulates a KQ job. Job = namedtuple( typename='Job', - field_names=[ - 'id', # UUID of the job - 'timestamp', # Unix timestamp indicating when the job was queued - 'topic', # Name of the Kafka topic the job was enqueued in - 'func', # Job function/callable - 'args', # Job function arguments - 'kwargs', # Job function keyword arguments - 'timeout', # Job timeout threshold in seconds - 'key' # Jobs w/ the same keys end up in the same partition - ] + field_names=( + 'id', # Job ID (str) + 'timestamp', # Unix timestamp indicating when job was enqueued (int) + 'topic', # Name of the Kafka topic (str) + 'func', # Function to execute (callable) + 'args', # Positional arguments (list) + 'kwargs', # Keyword arguments (dict) + 'timeout', # Job timeout threshold in seconds (int | float) + 'key', # Kafka message key if any (str | None) + 'partition' # Kafka topic partition if any (str | None) + ) ) -Job.__new__.__defaults__ = (None,) + +# noinspection PyUnresolvedReferences,PyProtectedMember +Job.__new__.__defaults__ = (None,) * len(Job._fields) diff --git a/kq/manager.py b/kq/manager.py deleted file mode 100644 index 0a3f00d..0000000 --- a/kq/manager.py +++ /dev/null @@ -1,102 +0,0 @@ -from __future__ import absolute_import, print_function, unicode_literals - -import kafka - - -# TODO need to add more functionality -class Manager(object): - """KQ manager. - - Here is an example of initializing and using a manager: - - .. code-block:: python - - from kq import Manager - - manager = Manager( - hosts='host:7000,host:8000', - cafile='/my/files/cafile', - certfile='/my/files/certfile', - keyfile='/my/files/keyfile', - crlfile='/my/files/crlfile' - ) - manager.info() - - .. note:: - - There is not much happening with this class right now. It's still - a work in progress. - - :param hosts: Comma-separated Kafka hostnames and ports. For example, - ``"localhost:9000,localhost:8000,192.168.1.1:7000"`` is a valid - input string. Default: ``"127.0.0.1:9092"``. - :type hosts: str | unicode - :param cafile: Full path to the trusted CA certificate file. - :type cafile: str | unicode - :param certfile: Full path to the client certificate file. - :type certfile: str | unicode - :param keyfile: Full path to the client private key file. - :type keyfile: str | unicode - :param crlfile: Full path to the CRL file for validating certification - expiry. This option is only available with Python 3.4+ or 2.7.9+. - :type crlfile: str | unicode - """ - - def __init__(self, - hosts='127.0.0.1:9092', - cafile=None, - certfile=None, - keyfile=None, - crlfile=None): - self._hosts = hosts - self._consumer = kafka.KafkaConsumer( - bootstrap_servers=self._hosts, - ssl_cafile=cafile, - ssl_certfile=certfile, - ssl_keyfile=keyfile, - ssl_crlfile=crlfile, - consumer_timeout_ms=-1, - enable_auto_commit=True, - auto_offset_reset='latest', - ) - - def __repr__(self): - """Return a string representation of the queue. - - :return: string representation of the queue - :rtype: str | unicode - """ - return 'Manager(hosts={})'.format(self._hosts) - - @property - def consumer(self): - """Return the Kafka consumer object. - - :return: Kafka consumer object. - :rtype: kafka.consumer.KafkaConsumer - """ - return self._consumer - - @property - def hosts(self): - """Return the list of Kafka host names and ports. - - :return: list of Kafka host names and ports - :rtype: [str] - """ - return self._hosts.split(',') - - def info(self): - """Print the offset information for all topics and partitions.""" - print('Offsets per Topic:') - for topic in self._consumer.topics(): - print('\nTopic {}:\n'.format(topic)) - partitions = self._consumer.partitions_for_topic(topic) - if partitions is None: # pragma: no cover - print(' Polling failed (please try again)') - continue - for partition in self._consumer.partitions_for_topic(topic): - topic_partition = kafka.TopicPartition(topic, partition) - self._consumer.assign([topic_partition]) - offset = self._consumer.position(topic_partition) - print(' Partition {:<3}: {}'.format(partition, offset)) diff --git a/kq/message.py b/kq/message.py new file mode 100644 index 0000000..223132d --- /dev/null +++ b/kq/message.py @@ -0,0 +1,15 @@ +__all__ = ['Message'] + +from collections import namedtuple + +# Namedtuple which encapsulates a Kafka message. +Message = namedtuple( + typename='Message', + field_names=( + 'topic', # Name of the Kafka topic (str) + 'partition', # Topic partition (int) + 'offset', # Offset (int) + 'key', # Message key (bytes | None) + 'value' # Message value (bytes) + ) +) diff --git a/kq/queue.py b/kq/queue.py index 2bd6b1b..33bcca1 100644 --- a/kq/queue.py +++ b/kq/queue.py @@ -1,322 +1,359 @@ -from __future__ import absolute_import, print_function, unicode_literals +__all__ = ['Queue'] -import functools import logging import time import uuid import dill -import kafka +from kafka import KafkaProducer from kq.job import Job +from kq.utils import ( + is_dict, + is_iter, + is_number, + is_str, + is_none_or_bytes, + is_none_or_func, + is_none_or_int, + is_none_or_logger, +) class Queue(object): - """KQ queue. - - A queue serializes incoming function calls and places them into a Kafka - topic as *jobs*. Workers fetch these jobs and execute them asynchronously - in the background. Here is an example of initializing and using a queue: - - .. code-block:: python - - from kq import Queue, Job - - queue = Queue( - hosts='host:7000,host:8000', - topic='foo', - timeout=3600, - compression='gzip', - acks=0, - retries=5, - job_size=10000000, - cafile='/my/files/cafile', - certfile='/my/files/certfile', - keyfile='/my/files/keyfile', - crlfile='/my/files/crlfile' - ) - job = queue.enqueue(my_func, *args, **kwargs) - assert isinstance(job, Job) - - .. note:: - - The number of partitions in a Kafka topic limits how many workers - can read from the queue in parallel. For example, maximum of 10 - workers can work off a queue with 10 partitions. - - - :param hosts: Comma-separated Kafka hostnames and ports. For example, - ``"localhost:9000,localhost:8000,192.168.1.1:7000"`` is a valid input - string. Default: ``"127.0.0.1:9092"``. - :type hosts: str | unicode - :param topic: Name of the Kafka topic. Default: ``"default"``. - :type topic: str | unicode - :param timeout: Default job timeout threshold in seconds. If not set, the - enqueued jobs are left to run until they finish. This means a hanging - job can potentially block the workers. Default: ``None`` (no timeout). - :type timeout: int - :param compression: The algorithm used for compressing job data. Allowed - values are: ``"gzip"``, ``"snappy"`` and ``"lz4"``. Default: ``None`` - (no compression). - :type compression: str | unicode - :param acks: The number of acknowledgements required from the broker(s) - before considering a job successfully enqueued. Allowed values are: - - .. code-block:: none - - 0: Do not wait for any acknowledgment from the broker leader - and consider the job enqueued as soon as it is added to - the socket buffer. Persistence is not guaranteed on broker - failures. - - 1: Wait for the job to be saved on the broker leader but not - for it be replicated across other brokers. If the leader - broker fails before the replication finishes the job may - not be persisted. - - -1: Wait for the job to be replicated across all brokers. As - long as one of the brokers is functional job persistence - is guaranteed. - - Default: ``1``. - - :type acks: int - :param retries: The maximum number of attempts to re-enqueue a job when - the job fails to reach the broker. Retries may alter the sequence of - the enqueued jobs. Default: ``0``. - :type retries: int - :param job_size: The max size of each job in bytes. Default: ``1048576``. - :type job_size: int - :param cafile: Full path to the trusted CA certificate file. - :type cafile: str | unicode - :param certfile: Full path to the client certificate file. - :type certfile: str | unicode - :param keyfile: Full path to the client private key file. - :type keyfile: str | unicode - :param crlfile: Full path to the CRL file for validating certification - expiry. This option is only available with Python 3.4+ or 2.7.9+. - :type crlfile: str | unicode + """Enqueues function calls in Kafka topics as :doc:`jobs `. + + :param topic: Name of the Kafka topic. + :type topic: str + :param producer: Kafka producer instance. For more details on producers, + refer to kafka-python's `documentation + `_. + :type producer: kafka.KafkaProducer_ + :param serializer: Callable which takes a :doc:`job ` namedtuple and + returns a serialized byte string. If not set, ``dill.dumps`` is used + by default. See :doc:`here ` for more details. + :type serializer: callable + :param timeout: Default job timeout threshold in seconds. If left at 0 + (default), jobs run until completion. This value can be overridden + when enqueueing jobs. + :type timeout: int | float + :param logger: Logger for recording queue activities. If not set, logger + named ``kq.queue`` is used with default settings (you need to define + your own formatters and handlers). See :doc:`here ` for more + details. + :type logger: logging.Logger + + **Example:** + + .. testcode:: + + import requests + + from kafka import KafkaProducer + from kq import Queue + + # Set up a Kafka producer. + producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + + # Set up a queue. + queue = Queue(topic='topic', producer=producer, timeout=3600) + + # Enqueue a function call. + job = queue.enqueue(requests.get, 'https://www.google.com/') + + .. _kafka.KafkaProducer: + http://kafka-python.rtfd.io/en/master/apidoc/KafkaProducer.html """ def __init__(self, - hosts='127.0.0.1:9092', - topic='default', - timeout=None, - compression=None, - acks=1, - retries=0, - job_size=1048576, - cafile=None, - certfile=None, - keyfile=None, - crlfile=None): - self._hosts = hosts + topic, + producer, + serializer=None, + timeout=0, + logger=None): + + assert is_str(topic), 'topic must be a str' + assert isinstance(producer, KafkaProducer), 'bad producer instance' + assert is_none_or_func(serializer), 'serializer must be a callable' + assert is_number(timeout), 'timeout must be an int or float' + assert timeout >= 0, 'timeout must be 0 or greater' + assert is_none_or_logger(logger), 'bad logger instance' + self._topic = topic + self._hosts = producer.config['bootstrap_servers'] + self._producer = producer + self._serializer = serializer or dill.dumps self._timeout = timeout - self._logger = logging.getLogger('kq') - self._producer = kafka.KafkaProducer( - bootstrap_servers=self._hosts, - compression_type=compression, - acks=acks, - retries=retries, - max_request_size=job_size, - buffer_memory=max(job_size, 33554432), - ssl_cafile=cafile, - ssl_certfile=certfile, - ssl_keyfile=keyfile, - ssl_crlfile=crlfile + self._logger = logger or logging.getLogger('kq.queue') + self._default_enqueue_spec = EnqueueSpec( + topic=self._topic, + producer=self._producer, + serializer=self._serializer, + logger=self._logger, + timeout=self._timeout, + key=None, + partition=None ) def __repr__(self): - """Return a string representation of the queue. + """Return the string representation of the queue. :return: String representation of the queue. - :rtype: str | unicode + :rtype: str """ - return 'Queue(topic={})'.format(self._topic) + return 'Queue(hosts={}, topic={})'.format(self._hosts, self._topic) - @property - def producer(self): - """Return the Kafka producer object. - - :return: Kafka producer object. - :rtype: kafka.producer.KafkaProducer - """ - return self._producer + def __del__(self): # pragma: no covers + # noinspection PyBroadException + try: + self._producer.close() + except Exception: + pass @property def hosts(self): - """Return the list of Kafka host names and ports. + """Return comma-separated Kafka hosts and ports string. - :return: List of Kafka host names and ports. - :rtype: [str] + :return: Comma-separated Kafka hosts and ports. + :rtype: str """ - return self._hosts.split(',') + return self._hosts @property def topic(self): - """Return the name of the Kafka topic in use. + """Return the name of the Kafka topic. - :return: Name of the Kafka topic in use. - :rtype: str | unicode + :return: Name of the Kafka topic. + :rtype: str """ return self._topic + @property + def producer(self): + """Return the Kafka producer instance. + + :return: Kafka producer instance. + :rtype: kafka.KafkaProducer + """ + return self._producer + + @property + def serializer(self): + """Return the serializer function. + + :return: Serializer function. + :rtype: callable + """ + return self._serializer + @property def timeout(self): - """Return the job timeout threshold in seconds. + """Return the default job timeout threshold in seconds. - :return: Job timeout threshold in seconds. - :rtype: int + :return: Default job timeout threshold in seconds. + :rtype: int | float """ return self._timeout - def enqueue(self, obj, *args, **kwargs): - """Place the function call (or the job) in the Kafka topic. + def enqueue(self, func, *args, **kwargs): + """Enqueue a function call or a :doc:`job `. - For example: + :param func: Function or a :doc:`job ` object. Must be + serializable and available to :doc:`workers `. + :type func: callable | :doc:`kq.Job ` + :param args: Positional arguments for the function. Ignored if **func** + is a :doc:`job ` object. + :param kwargs: Keyword arguments for the function. Ignored if **func** + is a :doc:`job ` object. + :return: Enqueued job. + :rtype: :doc:`kq.Job ` - .. code-block:: python + **Example:** + + .. testcode:: import requests - from kq import Queue - - q = Queue() - - # You can queue the function call with its arguments - job = q.enqueue(requests.get, 'https://www.google.com') - - # Or you can queue a kq.job.Job instance directly - q.enqueue(job) - - :param obj: Function or the job object to enqueue. If a function is - given, the function *must* be pickle-able. - :type obj: callable | kq.job.Job - :param args: Arguments for the function. Ignored if a KQ job object - is given for the first argument instead. - :type args: list - :param kwargs: Keyword arguments for the function. Ignored if a KQ - job instance is given as the first argument instead. - :type kwargs: dict - :return: The job that was enqueued - :rtype: kq.job.Job + + from kafka import KafkaProducer + from kq import Job, Queue + + # Set up a Kafka producer. + producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + + # Set up a queue. + queue = Queue(topic='topic', producer=producer) + + # Enqueue a function call. + queue.enqueue(requests.get, 'https://www.google.com/') + + # Enqueue a job object. + job = Job(func=requests.get, args=['https://www.google.com/']) + queue.enqueue(job) + + .. note:: + + The following rules apply when enqueueing a :doc:`job `: + + * If ``Job.id`` is not set, a random one is generated. + * If ``Job.timestamp`` is set, it is replaced with current time. + * If ``Job.topic`` is set, it is replaced with current topic. + * If ``Job.timeout`` is set, its value overrides others. + * If ``Job.key`` is set, its value overrides others. + * If ``Job.partition`` is set, its value overrides others. + """ - if isinstance(obj, Job): - func = obj.func - args = obj.args - kwargs = obj.kwargs - key = obj.key - else: - func = obj - key = None + return self._default_enqueue_spec.enqueue(func, *args, **kwargs) + + def using(self, timeout=None, key=None, partition=None): + """Set enqueue specifications such as timeout, key and partition. + + :param timeout: Job timeout threshold in seconds. If not set, default + timeout (specified during queue initialization) is used instead. + :type timeout: int | float + :param key: Kafka message key. Jobs with the same keys are sent to the + same topic partition and executed sequentially. Applies only if the + **partition** parameter is not set, and the producer’s partitioner + configuration is left as default. For more details on producers, + refer to kafka-python's documentation_. + :type key: bytes + :param partition: Topic partition the message is sent to. If not set, + the producer's partitioner selects the partition. For more details + on producers, refer to kafka-python's documentation_. + :type partition: int + :return: Enqueue specification object which has an ``enqueue`` method + with the same signature as :func:`kq.queue.Queue.enqueue`. + + **Example:** + + .. testcode:: - if not callable(func): - raise ValueError('{} is not a callable'.format(func)) + import requests - job = Job( - id=str(uuid.uuid4()), - timestamp=int(time.time()), + from kafka import KafkaProducer + from kq import Job, Queue + + # Set up a Kafka producer. + producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + + # Set up a queue. + queue = Queue(topic='topic', producer=producer) + + url = 'https://www.google.com/' + + # Enqueue a function call in partition 0 with message key 'foo'. + queue.using(partition=0, key=b'foo').enqueue(requests.get, url) + + # Enqueue a function call with a timeout of 10 seconds. + queue.using(timeout=10).enqueue(requests.get, url) + + # Job values are preferred over values set with "using" method. + job = Job(func=requests.get, args=[url], timeout=5) + queue.using(timeout=10).enqueue(job) # timeout is still 5 + + .. _documentation: http://kafka-python.rtfd.io/en/master/#kafkaproducer + """ + return EnqueueSpec( topic=self._topic, - func=func, - args=args, - kwargs=kwargs, - timeout=self._timeout, - key=key + producer=self._producer, + serializer=self._serializer, + logger=self._logger, + timeout=timeout or self._timeout, + key=key, + partition=partition ) - self._producer.send(self._topic, dill.dumps(job), key=key) - self._logger.info('Enqueued: {}'.format(job)) - return job - def enqueue_with_key(self, key, obj, *args, **kwargs): - """Place the function call (or the job) in the Kafka topic with key. - For example: +class EnqueueSpec(object): - .. code-block:: python + __slots__ = [ + '_topic', + '_producer', + '_serializer', + '_logger', + '_timeout', + '_key', + '_part', + 'delay' + ] - import requests - from kq import Queue - - q = Queue() - - url = 'https://www.google.com' - - # You can queue the function call with its arguments - job = q.enqueue_with_key('my_key', requests.get, url) - - # Or you can queue a kq.job.Job instance directly - q.enqueue_with_key('my_key', job) - - :param key: The key for the Kafka message. Jobs with the same key are - guaranteed to be placed in the same Kafka partition and processed - sequentially. If a job object is enqueued, its key is overwritten. - :type key: str - :param obj: Function or the job object to enqueue. If a function is - given, the function *must* be pickle-able. - :type obj: callable | kq.job.Job - :param args: Arguments for the function. Ignored if a KQ job object - is given for the first argument instead. - :type args: list - :param kwargs: Keyword arguments for the function. Ignored if a KQ - job instance is given as the first argument instead. - :type kwargs: dict - :return: The job that was enqueued - :rtype: kq.job.Job + def __init__(self, + topic, + producer, + serializer, + logger, + timeout, + key, + partition): + assert is_number(timeout), 'timeout must be an int or float' + assert is_none_or_bytes(key), 'key must be a bytes' + assert is_none_or_int(partition), 'partition must be an int' + + self._topic = topic + self._producer = producer + self._serializer = serializer + self._logger = logger + self._timeout = timeout + self._key = key + self._part = partition + + def enqueue(self, obj, *args, **kwargs): + """Enqueue a function call or :doc:`job` instance. + + :param func: Function or :doc:`job `. Must be serializable and + importable by :doc:`worker ` processes. + :type func: callable | :doc:`kq.Job ` + :param args: Positional arguments for the function. Ignored if **func** + is a :doc:`job ` object. + :param kwargs: Keyword arguments for the function. Ignored if **func** + is a :doc:`job ` object. + :return: Enqueued job. + :rtype: :doc:`kq.Job ` """ + timestamp = int(time.time() * 1000) + if isinstance(obj, Job): + job_id = uuid.uuid4().hex if obj.id is None else obj.id func = obj.func - args = obj.args - kwargs = obj.kwargs + args = tuple() if obj.args is None else obj.args + kwargs = {} if obj.kwargs is None else obj.kwargs + timeout = self._timeout if obj.timeout is None else obj.timeout + key = self._key if obj.key is None else obj.key + partition = self._part if obj.partition is None else obj.partition + + assert is_str(job_id), 'Job.id must be a str' + assert callable(func), 'Job.func must be a callable' + assert is_iter(args), 'Job.args must be a list or tuple' + assert is_dict(kwargs), 'Job.kwargs must be a dict' + assert is_number(timeout), 'Job.timeout must be an int or float' + assert is_none_or_bytes(key), 'Job.key must be a bytes' + assert is_none_or_int(partition), 'Job.partition must be an int' else: + assert callable(obj), 'first argument must be a callable' + job_id = uuid.uuid4().hex func = obj - - if not callable(func): - raise ValueError('{} is not a callable'.format(func)) + args = args + kwargs = kwargs + timeout = self._timeout + key = self._key + partition = self._part job = Job( - id=str(uuid.uuid4()), - timestamp=int(time.time()), + id=job_id, + timestamp=timestamp, topic=self._topic, func=func, args=args, kwargs=kwargs, - timeout=self._timeout, - key=key + timeout=timeout, + key=key, + partition=partition + ) + self._logger.info('Enqueueing {} ...'.format(job)) + self._producer.send( + self._topic, + value=self._serializer(job), + key=self._serializer(key) if key else None, + partition=partition, + timestamp_ms=timestamp ) - self._producer.send(self._topic, dill.dumps(job), key=key) - self._logger.info('Enqueued: {}'.format(job)) return job - - def job(self, func): - """Decorator which add a **delay** method to a function. - - When the **delay** method is called, the function is queued as a job. - For example: - - .. code-block:: python - - from kq import Queue - queue = Queue() - - @queue.job - def calculate_sum(a, b, c): - return a + b + c - - # Enqueue the function as a job - calculate_sum.delay(1, 2, 3) - - :param func: The function to decorate. - :type func: callable - :return: The decorated function with new method **delay** - :rtype: callable - """ - @functools.wraps(func) - def delay(*args, **kwargs): # pragma: no cover - return self.enqueue(func, *args, **kwargs) - func.delay = delay - return func - - def flush(self): - """Force-flush all buffered records to the broker.""" - self._logger.info('Flushing {} ...'.format(self)) - self._producer.flush() diff --git a/kq/utils.py b/kq/utils.py index 40925e6..98ce8fc 100644 --- a/kq/utils.py +++ b/kq/utils.py @@ -1,35 +1,58 @@ -from __future__ import absolute_import, print_function, unicode_literals +import logging +from inspect import ismethod, isfunction, isbuiltin, isclass -def rec_repr(record): - """Return the string representation of the consumer record. - - :param record: Record fetched from the Kafka topic. - :type record: kafka.consumer.fetcher.ConsumerRecord - :return: String representation of the consumer record. - :rtype: str | unicode - """ - return 'Record(topic={}, partition={}, offset={})'.format( - record.topic, record.partition, record.offset - ) - - -def func_repr(func, args, kwargs): +def get_call_repr(func, *args, **kwargs): """Return the string representation of the function call. - :param func: Function to represent. + :param func: A callable (e.g. function, method). :type func: callable - :param args: Function arguments. - :type args: list | collections.Iterable - :param kwargs: Function keyword arguments. - :type kwargs: dict | collections.Mapping + :param args: Positional arguments for the callable. + :param kwargs: Keyword arguments for the callable. :return: String representation of the function call. - :rtype: str | unicode + :rtype: str """ - params = list(map(repr, args)) - params.extend(k + '=' + repr(v) for k, v in sorted(kwargs.items())) - return '{}.{}({})'.format( - getattr(func, '__module__', '?'), - getattr(func, '__name__', '?'), - ', '.join(params) - ) + # Functions, builtins and methods + if ismethod(func) or isfunction(func) or isbuiltin(func): + func_repr = '{}.{}'.format(func.__module__, func.__qualname__) + # A callable class instance + elif not isclass(func) and hasattr(func, '__call__'): + func_repr = '{}.{}'.format(func.__module__, func.__class__.__name__) + else: + func_repr = repr(func) + + args_reprs = [repr(arg) for arg in args] + kwargs_reprs = [k + '=' + repr(v) for k, v in sorted(kwargs.items())] + return '{}({})'.format(func_repr, ', '.join(args_reprs + kwargs_reprs)) + + +def is_none_or_logger(obj): + return obj is None or isinstance(obj, logging.Logger) + + +def is_none_or_int(obj): + return obj is None or isinstance(obj, int) + + +def is_none_or_bytes(obj): + return obj is None or isinstance(obj, bytes) + + +def is_none_or_func(obj): + return obj is None or callable(obj) + + +def is_str(obj): + return isinstance(obj, str) + + +def is_number(obj): + return isinstance(obj, (int, float)) + + +def is_dict(obj): + return isinstance(obj, dict) + + +def is_iter(obj): + return isinstance(obj, (list, tuple)) diff --git a/kq/version.py b/kq/version.py index 88f9db1..afced14 100644 --- a/kq/version.py +++ b/kq/version.py @@ -1 +1 @@ -VERSION = '1.3.2' +__version__ = '2.0.0' diff --git a/kq/worker.py b/kq/worker.py index 3dd5334..e39155d 100644 --- a/kq/worker.py +++ b/kq/worker.py @@ -1,291 +1,264 @@ -from __future__ import absolute_import, print_function, unicode_literals +__all__ = ['Worker'] -import collections +import _thread import logging -import multiprocessing as mp -import traceback as tb +import math +import threading +import traceback import dill -import kafka +from kafka import KafkaConsumer -from kq.job import Job -from kq.utils import func_repr, rec_repr +from kq.message import Message +from kq.utils import ( + get_call_repr, + is_str, + is_none_or_func, + is_none_or_logger +) class Worker(object): - """KQ worker. - - A worker fetches jobs from a Kafka broker, de-serializes them and - executes them asynchronously in the background. Here is an example - of initializing and starting a worker: + """Fetches :doc:`jobs ` from Kafka topics and processes them. + + :param topic: Name of the Kafka topic. + :type topic: str + :param consumer: Kafka consumer instance with a group ID (required). For + more details on consumers, refer to kafka-python's documentation_. + :type consumer: kafka.KafkaConsumer_ + :param callback: Callback function which is executed every time a job is + processed. See :doc:`here ` for more details. + :type callback: callable + :param deserializer: Callable which takes a byte string and returns a + deserialized :doc:`job ` namedtuple. If not set, ``dill.loads`` + is used by default. See :doc:`here ` for more details. + :type deserializer: callable + :param logger: Logger for recording worker activities. If not set, logger + named ``kq.worker`` is used with default settings (you need to define + your own formatters and handlers). See :doc:`here ` for more + details. + :type logger: logging.Logger + + **Example:** .. code-block:: python + from kafka import KafkaConsumer from kq import Worker - worker = Worker( - hosts='host:7000,host:8000', - topic='foo', - timeout=3600, - callback=None, - job_size=10000000, - cafile='/my/files/cafile', - certfile='/my/files/certfile', - keyfile='/my/files/keyfile', - crlfile='/my/files/crlfile', - proc_ttl=2000, - offset_policy='earliest' + # Set up a Kafka consumer. Group ID is required. + consumer = KafkaConsumer( + bootstrap_servers='127.0.0.1:9092', + group_id='group' ) - worker.start() - - .. note:: - - The number of partitions in a Kafka topic limits how many workers - can read from the queue in parallel. For example, maximum of 10 - workers can work off a queue with 10 partitions. - - :param hosts: Comma-separated Kafka hostnames and ports. For example, - ``"localhost:9000,localhost:8000,192.168.1.1:7000"`` is a valid - input string. Default: ``"127.0.0.1:9092"``. - :type hosts: str | unicode - :param topic: Name of the Kafka topic. Default: ``"default"``. - :type topic: str | unicode - :param timeout: Default job timeout threshold in seconds. If not set, the - enqueued jobs are left to run until they finish. This means a hanging - job can potentially block the workers. Default: ``None`` (no timeout). - If set, overrides timeouts set when the jobs were first enqueued. - :type timeout: int - :param callback: Function executed after a job is fetched and processed. - Its signature must be ``callback(status, job, exception, traceback)`` - where: - - .. code-block:: none - status (str | unicode) - The status of the job execution, which can be "timeout", - "failure" or "success". + # Set up a worker. + worker = Worker(topic='topic', consumer=consumer) - job (kq.job.Job) - The job namedtuple object consumed by the worker. - - result (object) - The result of the job execution. - - exception (Exception | None) - The exception raised while the job was running, or None - if there were no errors. - - traceback (str | unicode | None) - The traceback of the exception raised while the job was - running, or None if there were no errors. + # Start the worker to process jobs. + worker.start() - :type callback: callable - :param job_size: The max size of each job in bytes. Default: ``1048576``. - :type job_size: int - :param cafile: Full path to the trusted CA certificate file. - :type cafile: str | unicode - :param certfile: Full path to the client certificate file. - :type certfile: str | unicode - :param keyfile: Full path to the client private key file. - :type keyfile: str | unicode - :param crlfile: Full path to the CRL file for validating certification - expiry. This option is only available with Python 3.4+ or 2.7.9+. - :type crlfile: str | unicode - :param proc_ttl: The number of records read before the worker's process - (multiprocessing pool of 1 process) is re-spawned. If set to ``0`` - or ``None``, the re-spawning is disabled. Default: ``5000``. - :type proc_ttl: int - :param offset_policy: Policy for resetting offsets on the Kafka consumer. - Value ``"earliest"`` moves the offset to the oldest available message - and ``"latest"`` to the most recent. Default: ``"latest"``. - :type offset_policy: str | unicode + .. _documentation: + http://kafka-python.rtfd.io/en/master/#kafkaconsumer + .. _kafka.KafkaConsumer: + http://kafka-python.rtfd.io/en/master/apidoc/KafkaConsumer.html """ def __init__(self, - hosts='127.0.0.1:9092', - topic='default', - timeout=None, + topic, + consumer, callback=None, - job_size=1048576, - cafile=None, - certfile=None, - keyfile=None, - crlfile=None, - proc_ttl=5000, - offset_policy='latest'): - self._hosts = hosts + deserializer=None, + logger=None): + + assert is_str(topic), 'topic must be a str' + assert isinstance(consumer, KafkaConsumer), 'bad consumer instance' + assert consumer.config['group_id'], 'consumer must have group_id' + assert is_none_or_func(callback), 'callback must be a callable' + assert is_none_or_func(deserializer), 'deserializer must be a callable' + assert is_none_or_logger(logger), 'bad logger instance' + self._topic = topic - self._timeout = timeout + self._hosts = consumer.config['bootstrap_servers'] + self._group = consumer.config['group_id'] + self._consumer = consumer self._callback = callback - self._pool = None - self._proc_ttl = proc_ttl - self._logger = logging.getLogger('kq') - self._consumer = kafka.KafkaConsumer( - self._topic, - group_id=self._topic, - bootstrap_servers=self._hosts, - max_partition_fetch_bytes=job_size * 2, - ssl_cafile=cafile, - ssl_certfile=certfile, - ssl_keyfile=keyfile, - ssl_crlfile=crlfile, - consumer_timeout_ms=-1, - enable_auto_commit=False, - auto_offset_reset=offset_policy, - ) - - def __del__(self): - """Commit the Kafka consumer offsets and close the consumer.""" - if hasattr(self, '_consumer'): - try: - self._logger.info('Closing consumer ...') - self._consumer.close() - except Exception as e: # pragma: no cover - self._logger.warning('Failed to close consumer: {}'.format(e)) + self._deserializer = deserializer or dill.loads + self._logger = logger or logging.getLogger('kq.worker') def __repr__(self): - """Return a string representation of the worker. + """Return the string representation of the worker. - :return: string representation of the worker - :rtype: str | unicode + :return: String representation of the worker. + :rtype: str """ - return 'Worker(topic={})'.format(self._topic) - - def _exec_callback(self, status, job, result, exception, traceback): - """Execute the callback in a try-except block. - - :param status: The status of the job consumption. Possible values are - ``timeout', ``failure`` and ``success``. - :type status: str | unicode - :param job: The job consumed by the worker - :type job: kq.job.Job - :param result: The result of the job execution. - :type result: object - :param exception: Exception raised while the job was running (i.e. - status was ``failure``), or ``None`` if there were no errors - (i.e. status was ``success``). - :type exception: Exception | None - :param traceback: The stacktrace of the exception (i.e. status was - ``failure``) was running or ``None`` if there were no errors. - :type traceback: str | unicode | None + return 'Worker(hosts={}, topic={}, group={})'.format( + self._hosts, self._topic, self._group + ) + + def __del__(self): # pragma: no cover + # noinspection PyBroadException + try: + self._consumer.close() + except Exception: + pass + + def _execute_callback(self, status, message, job, res, err, stacktrace): + """Execute the callback. + + :param status: Job status. Possible values are "invalid" (job could not + be deserialized or was malformed), "failure" (job raised an error), + "timeout" (job timed out), or "success" (job finished successfully + and returned a result). + :type status: str + :param message: Kafka message. + :type message: :doc:`kq.Message ` + :param job: Job object, or None if **status** was "invalid". + :type job: kq.Job + :param res: Job result, or None if an exception was raised. + :type res: object | None + :param err: Exception raised by job, or None if there was none. + :type err: Exception | None + :param stacktrace: Exception traceback, or None if there was none. + :type stacktrace: str | None """ if self._callback is not None: try: self._logger.info('Executing callback ...') - self._callback(status, job, result, exception, traceback) + self._callback(status, message, job, res, err, stacktrace) except Exception as e: - self._logger.exception('Callback failed: {}'.format(e)) + self._logger.exception( + 'Callback raised an exception: {}'.format(e)) - def _consume_record(self, record): - """De-serialize the message and execute the incoming job. + def _process_message(self, msg): + """De-serialize the message and execute the job. - :param record: Record fetched from the Kafka topic. - :type record: kafka.consumer.fetcher.ConsumerRecord + :param msg: Kafka message. + :type msg: :doc:`kq.Message ` """ - rec = rec_repr(record) - self._logger.info('Processing {} ...'.format(rec)) - # noinspection PyBroadException + self._logger.info( + 'Processing Message(topic={}, partition={}, offset={}) ...' + .format(msg.topic, msg.partition, msg.offset)) try: - job = dill.loads(record.value) - except Exception: - self._logger.warning('{} unloadable. Skipping ...'.format(rec)) + job = self._deserializer(msg.value) + job_repr = get_call_repr(job.func, *job.args, **job.kwargs) + + except Exception as err: + self._logger.exception('Job was invalid: {}'.format(err)) + self._execute_callback('invalid', msg, None, None, None, None) else: - # Simple check for job validity - if not (isinstance(job, Job) - and isinstance(job.args, collections.Iterable) - and isinstance(job.kwargs, collections.Mapping) - and callable(job.func)): - self._logger.warning('{} malformed. Skipping ...'.format(rec)) - return - func, args, kwargs = job.func, job.args, job.kwargs - self._logger.info('Running Job {}: {} ...'.format( - job.id, func_repr(func, args, kwargs) - )) + self._logger.info('Executing job {}: {}'.format(job.id, job_repr)) + + if job.timeout: + timer = threading.Timer(job.timeout, _thread.interrupt_main) + timer.start() + else: + timer = None try: - timeout = self._timeout or job.timeout - if timeout is None: - res = func(*args, **kwargs) - else: - run = self._pool.apply_async(func, args, kwargs) - res = run.get(timeout) - except mp.TimeoutError: - self._logger.error('Job {} timed out after {} seconds.' - .format(job.id, job.timeout)) - self._exec_callback('timeout', job, None, None, None) - except Exception as e: - self._logger.exception('Job {} failed: {}'.format(job.id, e)) - self._exec_callback('failure', job, None, e, tb.format_exc()) + res = job.func(*job.args, **job.kwargs) + except KeyboardInterrupt: + self._logger.error( + 'Job {} timed out or was interrupted'.format(job.id)) + self._execute_callback('timeout', msg, job, None, None, None) + except Exception as err: + self._logger.exception( + 'Job {} raised an exception:'.format(job.id)) + tb = traceback.format_exc() + self._execute_callback('failure', msg, job, None, err, tb) else: self._logger.info('Job {} returned: {}'.format(job.id, res)) - self._exec_callback('success', job, res, None, None) + self._execute_callback('success', msg, job, res, None, None) + finally: + if timer is not None: + timer.cancel() - def _refresh_pool(self): - """Terminate the previous process pool and initialize a new one.""" - self._logger.info('Refreshing process pool ...') - try: - self._pool.terminate() - except Exception as e: # pragma: no cover - self._logger.exception('Failed to terminate pool: {}'.format(e)) - finally: - self._pool = mp.Pool(processes=1) + @property + def hosts(self): + """Return comma-separated Kafka hosts and ports string. + + :return: Comma-separated Kafka hosts and ports. + :rtype: str + """ + return self._hosts @property - def consumer(self): - """Return the Kafka consumer object. + def topic(self): + """Return the name of the Kafka topic. - :return: Kafka consumer object. - :rtype: kafka.consumer.KafkaConsumer + :return: Name of the Kafka topic. + :rtype: str """ - return self._consumer + return self._topic @property - def hosts(self): - """Return the list of Kafka host names and ports. + def group(self): + """Return the Kafka consumer group ID. - :return: list of Kafka host names and ports - :rtype: [str] + :return: Kafka consumer group ID. + :rtype: str """ - return self._hosts.split(',') + return self._group @property - def topic(self): - """Return the name of the Kafka topic in use. + def consumer(self): + """Return the Kafka consumer instance. - :return: Name of the Kafka topic in use. - :rtype: str | unicode + :return: Kafka consumer instance. + :rtype: kafka.KafkaConsumer """ - return self._topic + return self._consumer @property - def timeout(self): - """Return the job timeout threshold in seconds. + def deserializer(self): + """Return the deserializer function. - :return: Job timeout threshold in seconds. - :rtype: int + :return: Deserializer function. + :rtype: callable """ - return self._timeout + return self._deserializer - def start(self): - """Start fetching and processing enqueued jobs in the topic. + @property + def callback(self): + """Return the callback function. - Once started, the worker will continuously poll the Kafka broker in - a loop until jobs are available in the topic partitions. The loop is - only stopped via external triggers (e.g. keyboard interrupts). + :return: Callback function, or None if not set. + :rtype: callable | None + """ + return self._callback + + def start(self, max_messages=math.inf, commit_offsets=True): + """Start processing Kafka messages and executing jobs. + + :param max_messages: Maximum number of Kafka messages to process before + stopping. If not set, worker runs until interrupted. + :type max_messages: int + :param commit_offsets: If set to True, consumer offsets are committed + every time a message is processed (default: True). + :type commit_offsets: bool + :return: Total number of messages processed. + :rtype: int """ self._logger.info('Starting {} ...'.format(self)) - self._pool = mp.Pool(processes=1) - records_read = 0 - try: - for record in self._consumer: - self._consume_record(record) + self._consumer.unsubscribe() + self._consumer.subscribe([self.topic]) + + messages_processed = 0 + while messages_processed < max_messages: + record = next(self._consumer) + + message = Message( + topic=record.topic, + partition=record.partition, + offset=record.offset, + key=record.key, + value=record.value + ) + self._process_message(message) + + if commit_offsets: self._consumer.commit() - if self._proc_ttl and records_read >= self._proc_ttl: - self._refresh_pool() - records_read = 0 - else: - records_read += 1 - - except KeyboardInterrupt: # pragma: no cover - self._logger.info('Stopping {} ...'.format(self)) - self._pool.terminate() # TODO not sure if necessary + + messages_processed += 1 + + return messages_processed diff --git a/setup.py b/setup.py index 6470631..8f02379 100644 --- a/setup.py +++ b/setup.py @@ -4,27 +4,25 @@ with open('./kq/version.py') as fp: exec(fp.read(), version) +with open('./README.rst') as fp: + description = fp.read() + setup( name='kq', description='Kafka Job Queue for Python', - version=version['VERSION'], + version=version['__version__'], + long_description=description, author='Joohwan Oh', author_email='joohwan.oh@outlook.com', url='https://github.com/joowani/kq', - packages=find_packages(), + packages=find_packages(exclude=['tests']), include_package_data=True, license='MIT', - entry_points={ - 'console_scripts': [ - 'kq = kq.cli:entry_point', - ], - }, install_requires=[ 'dill>=0.2.5', - 'docopt>=0.6.2', - 'kafka-python>=1.3.1' + 'kafka-python>=1.3.1', ], - tests_require=['pytest'], + tests_require=['pytest', 'mock', 'flake8', 'tinydb'], classifiers=[ 'Intended Audience :: Developers', 'Intended Audience :: End Users/Desktop', @@ -36,7 +34,6 @@ 'Operating System :: MacOS', 'Operating System :: Unix', 'Programming Language :: Python', - 'Programming Language :: Python :: 2', 'Programming Language :: Python :: 3', 'Topic :: Internet', 'Topic :: Scientific/Engineering', diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2e966ef --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,225 @@ +import logging +import os +import uuid +import time + +import dill +import pytest +from kafka import KafkaConsumer, KafkaProducer + +from kq import Job, Message, Queue, Worker + +test_dir = os.getcwd() +if not test_dir.endswith('tests'): + test_dir = os.path.join(test_dir, 'tests') +log_file = os.path.join(test_dir, 'output.log') + +handler = logging.FileHandler(log_file, mode='w') +handler.setFormatter(logging.Formatter('[%(levelname)s] %(message)s')) + +queue_logger = logging.getLogger('kq.queue') +queue_logger.setLevel(logging.DEBUG) +queue_logger.addHandler(handler) + +worker_logger = logging.getLogger('kq.worker') +worker_logger.setLevel(logging.DEBUG) +worker_logger.addHandler(handler) + + +def success_function(a, b): + """Function which completes successfully.""" + return a * b + + +def failure_function(a, b): + """Function which raises an exception.""" + raise ValueError(a + b) + + +def timeout_function(a, b): + """Function which runs forever and times out.""" + while True: + assert a + b > 0 + + +# noinspection PyMethodMayBeStatic +class Callable(object): # pragma: no covers + + unbound_method = success_function + + def __call__(self, a, b): + return a * b + + @staticmethod + def static_method(a, b): + return a * b + + def instance_method(self, a, b): + return a * b + + +class Callback(object): + """Callback which can be set to succeed or fail.""" + + def __init__(self): + self.logger = logging.getLogger('kq.worker') + self.succeed = True + + def __call__(self, status, message, job, result, exception, stacktrace): + if not self.succeed: + raise RuntimeError + + assert status in ['invalid', 'success', 'timeout', 'failure'] + assert isinstance(message, Message) + + if status == 'invalid': + assert job is None + assert result is None + assert exception is None + assert stacktrace is None + + if status == 'success': + assert isinstance(job, Job) + assert exception is None + assert stacktrace is None + + elif status == 'timeout': + assert isinstance(job, Job) + assert result is None + assert exception is None + assert stacktrace is None + + elif status == 'failure': + assert isinstance(job, Job) + assert result is None + assert exception is not None + assert stacktrace is not None + + self.logger.info('Callback got job status "{}"'.format(status)) + + +class Deserializer(object): + """Deserializer which can be set to succeed or fail.""" + + def __init__(self): + self.succeed = True + + def __call__(self, job): + if not self.succeed: + raise RuntimeError + return dill.loads(job) + + +class LogAccessor(object): + """KQ log accessor.""" + + @property + def lines(self): + time.sleep(0.5) + with open(log_file, 'r') as fp: + lines = fp.read().splitlines() + return [l for l in lines if l.startswith('[')] + + @property + def last_line(self): + return self.lines[-1] + + def last_lines(self, line_count=1): + return iter(self.lines[-line_count:]) + + +def pytest_addoption(parser): + parser.addoption('--host', action='store', default='127.0.0.1') + parser.addoption('--port', action='store', default='9092') + + +@pytest.fixture(scope='session', autouse=False) +def func(): + return success_function + + +@pytest.fixture(scope='session', autouse=False) +def success_func(): + return success_function + + +@pytest.fixture(scope='session', autouse=False) +def failure_func(): + return failure_function + + +@pytest.fixture(scope='session', autouse=False) +def timeout_func(): + return timeout_function + + +@pytest.fixture(scope='session', autouse=False) +def callable_cls(): + return Callable + + +@pytest.fixture(scope='session', autouse=False) +def log(): + return LogAccessor() + + +@pytest.fixture(scope='session', autouse=False) +def callback(): + return Callback() + + +@pytest.fixture(scope='session', autouse=False) +def deserializer(): + return Deserializer() + + +@pytest.fixture(scope='session', autouse=False) +def hosts(pytestconfig): + host = pytestconfig.getoption('host') + port = pytestconfig.getoption('port') + return host + ':' + port + + +@pytest.fixture(scope='module', autouse=False) +def topic(): + return uuid.uuid4().hex + + +@pytest.fixture(scope='module', autouse=False) +def group(): + return uuid.uuid4().hex + + +# noinspection PyShadowingNames +@pytest.fixture(scope='module', autouse=False) +def producer(hosts): + return KafkaProducer(bootstrap_servers=hosts) + + +# noinspection PyShadowingNames +@pytest.fixture(scope='module', autouse=False) +def consumer(hosts, group): + return KafkaConsumer( + bootstrap_servers=hosts, + group_id=group, + auto_offset_reset='earliest', + ) + + +# noinspection PyShadowingNames +@pytest.fixture(scope='module', autouse=False) +def queue(topic, producer): + return Queue(topic, producer) + + +# noinspection PyShadowingNames +@pytest.fixture(scope='module', autouse=False) +def worker(topic, consumer, callback, deserializer): + return Worker(topic, consumer, callback, deserializer) + + +# noinspection PyShadowingNames +@pytest.fixture(scope='function', autouse=True) +def before(callback, deserializer): + callback.succeed = True + deserializer.succeed = True diff --git a/tests/test_cli.py b/tests/test_cli.py deleted file mode 100644 index 08378df..0000000 --- a/tests/test_cli.py +++ /dev/null @@ -1,173 +0,0 @@ -from __future__ import absolute_import, print_function, unicode_literals - -import logging -import sys - -import mock -import pytest - -from kq import cli, VERSION - -from .utils import CaptureOutput - -patch_object = getattr(mock.patch, 'object') - - -@pytest.fixture(autouse=True) -def manager(monkeypatch): - mock_manager_inst = mock.MagicMock() - mock_manager_cls = mock.MagicMock(return_value=mock_manager_inst) - monkeypatch.setattr('kq.Manager', mock_manager_cls) - return mock_manager_cls, mock_manager_inst - - -@pytest.fixture(autouse=True) -def worker(monkeypatch): - mock_worker_inst = mock.MagicMock() - mock_worker_cls = mock.MagicMock(return_value=mock_worker_inst) - monkeypatch.setattr('kq.Worker', mock_worker_cls) - return mock_worker_cls, mock_worker_inst - - -@pytest.fixture(autouse=True) -def logger(monkeypatch): - mock_logger_inst = mock.MagicMock() - mock_get_logger = mock.MagicMock() - mock_get_logger.return_value = mock_logger_inst - monkeypatch.setattr('logging.getLogger', mock_get_logger) - return mock_logger_inst - - -def test_help_menu(): - with patch_object(sys, 'argv', ['kq', '--help']): - with CaptureOutput() as output, pytest.raises(SystemExit): - cli.entry_point() - assert len(output) == 1 - assert 'Usage' in output[0] - assert 'Options' in output[0] - - -def test_version(): - with patch_object(sys, 'argv', ['kq', '--version']): - with CaptureOutput() as output, pytest.raises(SystemExit): - cli.entry_point() - assert len(output) == 1 - assert output[0] == VERSION + '\n' - - -def test_info(manager, logger): - manager_cls, manager_inst = manager - - test_arguments = [ - 'kq', - 'info', - '--hosts=host:6000,host:7000', - '--cafile=/test/files/cafile', - '--certfile=/test/files/certfile', - '--keyfile=/test/files/keyfile', - '--crlfile=/test/files/crlfile' - ] - with patch_object(sys, 'argv', test_arguments): - cli.entry_point() - - logger.setLevel.assert_called_once_with(logging.WARNING) - manager_cls.assert_called_once_with( - hosts='host:6000,host:7000', - cafile='/test/files/cafile', - certfile='/test/files/certfile', - keyfile='/test/files/keyfile', - crlfile='/test/files/crlfile' - ) - manager_inst.info.assert_called_once() - - -def test_worker(worker, logger): - worker_cls, worker_inst = worker - - test_arguments = [ - 'kq', - 'worker', - '--hosts=host:6000,host:7000', - '--topic=foo', - '--timeout=4000', - '--job-size=3000000', - '--cafile=/test/files/cafile', - '--certfile=/test/files/certfile', - '--keyfile=/test/files/keyfile', - '--crlfile=/test/files/crlfile', - '--proc-ttl=1000', - '--offset=earliest' - ] - with patch_object(sys, 'argv', test_arguments): - cli.entry_point() - - logger.setLevel.assert_called_once_with(logging.WARNING) - worker_cls.assert_called_once_with( - hosts='host:6000,host:7000', - topic='foo', - timeout=4000, - callback=None, - job_size=3000000, - cafile='/test/files/cafile', - certfile='/test/files/certfile', - keyfile='/test/files/keyfile', - crlfile='/test/files/crlfile', - proc_ttl=1000, - offset_policy='earliest' - ) - worker_inst.start.assert_called_once() - - -def test_callback(worker, logger): - worker_cls, worker_inst = worker - - test_arguments = [ - 'kq', - 'worker', - '--hosts=host:6000,host:7000', - '--topic=foo', - '--timeout=4000', - '--callback=/invalid/path' - ] - with patch_object(sys, 'argv', test_arguments): - cli.entry_point() - - logger.exception.assert_called_once() - logger.setLevel.assert_called_once_with(logging.WARNING) - worker_cls.assert_called_with( - hosts='host:6000,host:7000', - topic='foo', - timeout=4000, - callback=None, - job_size=1048576, - cafile=None, - certfile=None, - keyfile=None, - crlfile=None, - proc_ttl=5000, - offset_policy='latest' - ) - worker_inst.start.assert_called_once() - - -def test_verbose(worker, logger): - worker_cls, worker_inst = worker - - with patch_object(sys, 'argv', ['kq', 'worker', '--verbose']): - cli.entry_point() - - logger.setLevel.assert_called_once_with(logging.DEBUG) - worker_cls.assert_called_with( - hosts='127.0.0.1', - topic='default', - timeout=None, - callback=None, - job_size=1048576, - cafile=None, - certfile=None, - keyfile=None, - crlfile=None, - proc_ttl=5000, - offset_policy='latest' - ) - worker_inst.start.assert_called_once() diff --git a/tests/test_job.py b/tests/test_job.py new file mode 100644 index 0000000..13c10d4 --- /dev/null +++ b/tests/test_job.py @@ -0,0 +1,37 @@ +from kq import Job + + +def test_job_init_with_args(): + job = Job(1, 2, 3, 4, 5, 6, 7, 8, 9) + assert job.id == 1 + assert job.timestamp == 2 + assert job.topic == 3 + assert job.func == 4 + assert job.args == 5 + assert job.kwargs == 6 + assert job.timeout == 7 + assert job.key == 8 + assert job.partition == 9 + + +def test_job_init_with_kwargs(): + job = Job( + id=1, + timestamp=2, + topic=3, + func=4, + args=5, + kwargs=6, + timeout=7, + key=8, + partition=9 + ) + assert job.id == 1 + assert job.timestamp == 2 + assert job.topic == 3 + assert job.func == 4 + assert job.args == 5 + assert job.kwargs == 6 + assert job.timeout == 7 + assert job.key == 8 + assert job.partition == 9 diff --git a/tests/test_manager.py b/tests/test_manager.py deleted file mode 100644 index 25eb5e6..0000000 --- a/tests/test_manager.py +++ /dev/null @@ -1,81 +0,0 @@ -from __future__ import absolute_import, print_function, unicode_literals - -import mock -import pytest - -from kq import Manager - -from .utils import CaptureOutput - -expected_info_output = """ -Offsets per Topic: - -Topic foo: - - Partition 1 : 100 - Partition 2 : 100 - Partition 3 : 100 - -Topic bar: - - Partition 1 : 100 - Partition 2 : 100 - Partition 3 : 100 - -Topic baz: - - Partition 1 : 100 - Partition 2 : 100 - Partition 3 : 100 -""" - - -@pytest.fixture(autouse=True) -def consumer(monkeypatch): - mock_consumer_inst = mock.MagicMock() - mock_consumer_inst.topics.return_value = ['foo', 'bar', 'baz'] - mock_consumer_inst.partitions_for_topic.return_value = [1, 2, 3] - mock_consumer_inst.position.return_value = 100 - mock_consumer_cls = mock.MagicMock() - mock_consumer_cls.return_value = mock_consumer_inst - monkeypatch.setattr('kafka.KafkaConsumer', mock_consumer_cls) - return mock_consumer_cls, mock_consumer_inst - - -def test_init(consumer): - consumer_cls, consumer_inst = consumer - - manager = Manager( - hosts='host:7000,host:8000', - cafile='/test/files/cafile', - certfile='/test/files/certfile', - keyfile='/test/files/keyfile', - crlfile='/test/files/crlfile' - ) - consumer_cls.assert_called_with( - bootstrap_servers='host:7000,host:8000', - ssl_cafile='/test/files/cafile', - ssl_certfile='/test/files/certfile', - ssl_keyfile='/test/files/keyfile', - ssl_crlfile='/test/files/crlfile', - consumer_timeout_ms=-1, - enable_auto_commit=True, - auto_offset_reset='latest', - ) - assert repr(manager) == 'Manager(hosts=host:7000,host:8000)' - assert manager.hosts == ['host:7000', 'host:8000'] - assert manager.consumer == consumer_inst - - -def test_info(): - manager = Manager( - hosts='host:7000,host:8000', - cafile='/test/files/cafile', - certfile='/test/files/certfile', - keyfile='/test/files/keyfile', - crlfile='/test/files/crlfile' - ) - with CaptureOutput() as output: - manager.info() - assert len(output) == 1 - assert '\n' + output[0] == expected_info_output diff --git a/tests/test_message.py b/tests/test_message.py new file mode 100644 index 0000000..651eadc --- /dev/null +++ b/tests/test_message.py @@ -0,0 +1,25 @@ +from kq import Message + + +def test_message_init_with_args(): + job = Message(1, 2, 3, 4, 5) + assert job.topic == 1 + assert job.partition == 2 + assert job.offset == 3 + assert job.key == 4 + assert job.value == 5 + + +def test_message_init_with_kwargs(): + job = Message( + topic=1, + partition=2, + offset=3, + key=4, + value=5, + ) + assert job.topic == 1 + assert job.partition == 2 + assert job.offset == 3 + assert job.key == 4 + assert job.value == 5 diff --git a/tests/test_misc.py b/tests/test_misc.py deleted file mode 100644 index a8ca47c..0000000 --- a/tests/test_misc.py +++ /dev/null @@ -1,53 +0,0 @@ -from __future__ import absolute_import, print_function, unicode_literals - -from kq.job import Job -from kq.utils import func_repr -from kq.version import VERSION - - -def test_version(): - assert isinstance(VERSION, str) - assert all(n.isdigit() for n in VERSION.split('.')) - - -def test_job(): - job = Job(1, 2, 3, 4, 5, 6, 7, 8) - - assert job.id == 1 - assert job.timestamp == 2 - assert job.topic == 3 - assert job.func == 4 - assert job.args == 5 - assert job.kwargs == 6 - assert job.timeout == 7 - assert job.key == 8 - - -def test_func_repr(): - - def f(a, b): - return a + b - - expected = 'tests.test_misc.f()' - assert func_repr(f, [], {}) == expected - - expected = 'tests.test_misc.f(1)' - assert func_repr(f, [1], {}) == expected - - expected = 'tests.test_misc.f(1, 2)' - assert func_repr(f, [1, 2], {}) == expected - - expected = 'tests.test_misc.f(a=1)' - assert func_repr(f, [], {'a': 1}) == expected - - expected = 'tests.test_misc.f(a=1, b=2)' - assert func_repr(f, [], {'a': 1, 'b': 2}) == expected - - expected = 'tests.test_misc.f(1, a=1)' - assert func_repr(f, [1], {'a': 1}) == expected - - expected = 'tests.test_misc.f(1, 2, a=1)' - assert func_repr(f, [1, 2], {'a': 1}) == expected - - expected = 'tests.test_misc.f(1, 2, a=1, b=2)' - assert func_repr(f, [1, 2], {'a': 1, 'b': 2}) == expected diff --git a/tests/test_queue.py b/tests/test_queue.py index 9789680..62a6ea0 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,225 +1,218 @@ -from __future__ import absolute_import, print_function, unicode_literals - import time +import uuid -import dill -import mock import pytest +from kafka import KafkaProducer from kq import Job, Queue -from .utils import ( - success_func, - failure_func -) - - -@pytest.fixture(autouse=True) -def producer(monkeypatch): - mock_producer_inst = mock.MagicMock() - mock_producer_cls = mock.MagicMock() - mock_producer_cls.return_value = mock_producer_inst - monkeypatch.setattr('kafka.KafkaProducer', mock_producer_cls) - return mock_producer_cls, mock_producer_inst - - -@pytest.fixture(autouse=True) -def logger(monkeypatch): - mock_logger_inst = mock.MagicMock() - mock_get_logger = mock.MagicMock() - mock_get_logger.return_value = mock_logger_inst - monkeypatch.setattr('logging.getLogger', mock_get_logger) - return mock_logger_inst - - -def test_init(producer, logger): - producer_cls, producer_inst = producer - - queue = Queue( - hosts='host:7000,host:8000', - topic='foo', - timeout=1000, - compression='gzip', - acks=0, - retries=5, - job_size=10000000, - cafile='/test/files/cafile', - certfile='/test/files/certfile', - keyfile='/test/files/keyfile', - crlfile='/test/files/crlfile' - ) - producer_cls.assert_called_with( - bootstrap_servers='host:7000,host:8000', - compression_type='gzip', - acks=0, - retries=5, - max_request_size=10000000, - buffer_memory=33554432, - ssl_cafile='/test/files/cafile', - ssl_certfile='/test/files/certfile', - ssl_keyfile='/test/files/keyfile', - ssl_crlfile='/test/files/crlfile', - ) - assert repr(queue) == 'Queue(topic=foo)' - assert queue.hosts == ['host:7000', 'host:8000'] - assert queue.timeout == 1000 - assert queue.topic == 'foo' - assert queue.producer == producer_inst - assert not logger.info.called +def test_queue_properties(queue, hosts, topic): + assert hosts in repr(queue) + assert topic in repr(queue) + assert queue.producer.config['bootstrap_servers'] == hosts + assert isinstance(queue.hosts, str) and queue.hosts == hosts + assert isinstance(queue.topic, str) and queue.topic == topic + assert isinstance(queue.producer, KafkaProducer) + assert isinstance(queue.timeout, (int, float)) + assert callable(queue.serializer) or queue.serializer is None -def test_enqueue_call(producer, logger): - producer_cls, producer_inst = producer - queue = Queue(hosts='host:7000', topic='foo', timeout=300) - job = queue.enqueue(success_func, 1, 2, c=[3, 4, 5]) +# noinspection PyTypeChecker +def test_queue_initialization_with_bad_args(producer): + with pytest.raises(AssertionError) as e: + Queue(topic=True, producer=producer) + assert str(e.value) == 'topic must be a str' - assert isinstance(job, Job) - assert isinstance(job.id, str) - assert isinstance(job.timestamp, int) - assert job.topic == 'foo' - assert job.func == success_func - assert job.args == (1, 2) - assert job.kwargs == {'c': [3, 4, 5]} - assert job.timeout == 300 + with pytest.raises(AssertionError) as e: + Queue(topic='topic', producer='bar') + assert str(e.value) == 'bad producer instance' + + with pytest.raises(AssertionError) as e: + Queue(topic='topic', producer=producer, serializer='baz') + assert str(e.value) == 'serializer must be a callable' - producer_inst.send.assert_called_with('foo', dill.dumps(job), key=None) - logger.info.assert_called_once_with('Enqueued: {}'.format(job)) + with pytest.raises(AssertionError) as e: + Queue(topic='topic', producer=producer, timeout='bar') + assert str(e.value) == 'timeout must be an int or float' + with pytest.raises(AssertionError) as e: + Queue(topic='topic', producer=producer, timeout=-1) + assert str(e.value) == 'timeout must be 0 or greater' -def test_enqueue_call_with_key(producer, logger): - producer_cls, producer_inst = producer + with pytest.raises(AssertionError) as e: + Queue(topic='topic', producer=producer, logger=1) + assert str(e.value) == 'bad logger instance' - queue = Queue(hosts='host:7000', topic='foo', timeout=300) - job = queue.enqueue_with_key('bar', success_func, 1, 2, c=[3, 4, 5]) +def test_queue_enqueue_function(queue, func, topic, log): + job = queue.enqueue(func, 1, 2) assert isinstance(job, Job) - assert isinstance(job.id, str) - assert isinstance(job.timestamp, int) - assert job.topic == 'foo' - assert job.func == success_func + assert job.id is not None + assert job.timestamp is not None + assert job.topic == topic + assert job.func == func assert job.args == (1, 2) - assert job.kwargs == {'c': [3, 4, 5]} - assert job.timeout == 300 - assert job.key == 'bar' - - producer_inst.send.assert_called_with('foo', dill.dumps(job), key='bar') - logger.info.assert_called_once_with('Enqueued: {}'.format(job)) - - -def test_invalid_call(producer, logger): - producer_cls, producer_inst = producer - - queue = Queue(hosts='host:7000', topic='foo', timeout=300) - - for bad_func in [None, 1, {1, 2}, [1, 2, 3]]: - with pytest.raises(ValueError) as e: - queue.enqueue(bad_func, 1, 2, a=3) - assert str(e.value) == '{} is not a callable'.format(bad_func) - - assert not producer_inst.send.called - assert not logger.info.called - - -def test_invalid_call_with_key(producer, logger): - producer_cls, producer_inst = producer - - queue = Queue(hosts='host:7000', topic='foo', timeout=300) - - for bad_func in [None, 1, {1, 2}, [1, 2, 3]]: - with pytest.raises(ValueError) as e: - queue.enqueue_with_key('foo', bad_func, 1, 2, a=3) - assert str(e.value) == '{} is not a callable'.format(bad_func) - - assert not producer_inst.send.called - assert not logger.info.called + assert job.kwargs == {} + assert job.timeout == 0 + assert job.key is None + assert job.partition is None + assert log.last_line == '[INFO] Enqueueing {} ...'.format(job) -def test_enqueue_job(producer, logger): - producer_cls, producer_inst = producer - - queue = Queue(hosts='host:7000', topic='foo', timeout=300) - - old_job = Job( - id='2938401', - timestamp=int(time.time()), - topic='bar', - func=failure_func, - args=[1, 2], - kwargs={'a': 3}, - timeout=100, - ) - new_job = queue.enqueue(old_job) - - assert isinstance(new_job, Job) - assert isinstance(new_job.id, str) - assert isinstance(new_job.timestamp, int) - assert old_job.id != new_job.id - assert old_job.timestamp <= new_job.timestamp - assert new_job.topic == 'foo' - assert new_job.func == failure_func - assert new_job.args == [1, 2] - assert new_job.kwargs == {'a': 3} - assert new_job.timeout == 300 - assert new_job.key is None - - producer_inst.send.assert_called_with( - 'foo', dill.dumps(new_job), key=None - ) - logger.info.assert_called_once_with('Enqueued: {}'.format(new_job)) - - -def test_enqueue_job_with_key(producer, logger): - producer_cls, producer_inst = producer - - queue = Queue(hosts='host:7000', topic='foo', timeout=300) - - old_job = Job( - id='2938401', - timestamp=int(time.time()), - topic='bar', - func=failure_func, - args=[1, 2], - kwargs={'a': 3}, - timeout=100, - key='bar', +def test_queue_enqueue_function_with_spec(func, queue, topic, log): + job = queue.using(key=b'foo', partition=0).enqueue(func, 3, 4) + assert isinstance(job, Job) + assert job.id is not None + assert job.timestamp is not None + assert job.topic == topic + assert job.func == func + assert job.args == (3, 4) + assert job.kwargs == {} + assert job.timeout == 0 + assert job.key == b'foo' + assert job.partition == 0 + assert log.last_line == '[INFO] Enqueueing {} ...'.format(job) + + +# noinspection PyTypeChecker +def test_queue_enqueue_function_with_bad_args(func, queue): + with pytest.raises(AssertionError) as e: + queue.enqueue(1) + assert str(e.value) == 'first argument must be a callable' + + with pytest.raises(AssertionError) as e: + queue.using(timeout='foo').enqueue(func) + assert str(e.value) == 'timeout must be an int or float' + + with pytest.raises(AssertionError) as e: + queue.using(key='foo').enqueue(func) + assert str(e.value) == 'key must be a bytes' + + with pytest.raises(AssertionError) as e: + queue.using(partition='foo').enqueue(func) + assert str(e.value) == 'partition must be an int' + + +def test_queue_enqueue_job_fully_populated(func, queue, topic, log): + job_id = uuid.uuid4().hex + timestamp = int(time.time() * 1000) + + job = Job( + id=job_id, + timestamp=timestamp, + topic='topic', + func=func, + args=[0], + kwargs={'b': 1}, + timeout=10, + key=b'bar', + partition=0 ) - new_job = queue.enqueue_with_key('baz', old_job) - - assert isinstance(new_job, Job) - assert isinstance(new_job.id, str) - assert isinstance(new_job.timestamp, int) - assert old_job.id != new_job.id - assert old_job.timestamp <= new_job.timestamp - assert new_job.topic == 'foo' - assert new_job.func == failure_func - assert new_job.args == [1, 2] - assert new_job.kwargs == {'a': 3} - assert new_job.timeout == 300 - assert new_job.key == 'baz' - - producer_inst.send.assert_called_with( - 'foo', dill.dumps(new_job), key='baz' + job = queue.enqueue(job) + assert isinstance(job, Job) + assert job.id == job_id + assert job.timestamp == timestamp + assert job.topic == topic + assert job.func == func + assert job.args == [0] + assert job.kwargs == {'b': 1} + assert job.timeout == 10 + assert job.key == b'bar' + assert job.partition == 0 + assert log.last_line.startswith('[INFO] Enqueueing {} ...'.format(job)) + + +def test_queue_enqueue_job_partially_populated(func, queue, topic, log): + job = Job(func=func, args=[1], kwargs={'b': 1}) + + job = queue.enqueue(job) + assert isinstance(job, Job) + assert isinstance(job.id, str) + assert isinstance(job.timestamp, int) + assert job.topic == topic + assert job.func == func + assert job.args == [1] + assert job.kwargs == {'b': 1} + assert job.timeout == 0 + assert job.key is None + assert job.partition is None + assert log.last_line.startswith('[INFO] Enqueueing {} ...'.format(job)) + + +def test_queue_enqueue_job_with_spec(func, queue, topic, log): + job_id = uuid.uuid4().hex + timestamp = int(time.time() * 1000) + + job = Job( + id=job_id, + timestamp=timestamp, + topic='topic', + func=func, + args=[0], + kwargs={'b': 1}, + timeout=10, + key=b'bar', + partition=0 ) - logger.info.assert_called_once_with('Enqueued: {}'.format(new_job)) - - -def test_job_decorator(): - queue = Queue(hosts='host:7000', topic='foo') - @queue.job - def test_function(a, b, c=None): - return a, b, c - assert hasattr(test_function, 'delay') - - with pytest.raises(Exception) as e: - test_function.delay(1, 2, 3, 4) - assert "Can't pickle" in str(e.value) - - -def test_flush(producer): - producer_cls, producer_inst = producer - - queue = Queue(hosts='host:7000', topic='foo') - queue.flush() - producer_inst.flush.assert_called_once() + # Job should override the spec. + job = queue.using(key=b'foo', timeout=5, partition=5).enqueue(job) + assert isinstance(job, Job) + assert job.id == job_id + assert job.timestamp == timestamp + assert job.topic == topic + assert job.func == func + assert job.args == [0] + assert job.kwargs == {'b': 1} + assert job.timeout == 10 + assert job.key == b'bar' + assert job.partition == 0 + assert log.last_line.startswith('[INFO] Enqueueing {} ...'.format(job)) + + +def test_queue_enqueue_job_with_bad_args(func, queue, topic): + valid_job_kwargs = { + 'id': uuid.uuid4().hex, + 'timestamp': int(time.time() * 1000), + 'topic': topic, + 'func': func, + 'args': [0], + 'kwargs': {'b': 1}, + 'timeout': 10, + 'key': b'foo', + 'partition': 0 + } + + def build_job(**kwargs): + job_kwargs = valid_job_kwargs.copy() + job_kwargs.update(kwargs) + return Job(**job_kwargs) + + with pytest.raises(AssertionError) as e: + queue.enqueue(build_job(id=1)) + assert str(e.value) == 'Job.id must be a str' + + with pytest.raises(AssertionError) as e: + queue.enqueue(build_job(func=1)) + assert str(e.value) == 'Job.func must be a callable' + + with pytest.raises(AssertionError) as e: + queue.enqueue(build_job(args=1)) + assert str(e.value) == 'Job.args must be a list or tuple' + + with pytest.raises(AssertionError) as e: + queue.enqueue(build_job(kwargs=1)) + assert str(e.value) == 'Job.kwargs must be a dict' + + with pytest.raises(AssertionError) as e: + queue.enqueue(build_job(timeout='foo')) + assert str(e.value) == 'Job.timeout must be an int or float' + + with pytest.raises(AssertionError) as e: + queue.enqueue(build_job(key='foo')) + assert str(e.value) == 'Job.key must be a bytes' + + with pytest.raises(AssertionError) as e: + queue.enqueue(build_job(partition='foo')) + assert str(e.value) == 'Job.partition must be an int' diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..a7a7c40 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,70 @@ +from kq.utils import get_call_repr + + +def test_call_repr_callable_types(success_func, callable_cls): + expected = 'None()' + assert expected == get_call_repr(None) + + expected = 'builtins.isinstance()' + assert expected == get_call_repr(isinstance) + + expected = 'tests.conftest.success_function()' + assert expected == get_call_repr(success_func) + + expected = 'tests.conftest.success_function()' + assert expected == get_call_repr(callable_cls.unbound_method) + + expected = 'tests.conftest.Callable.static_method()' + assert expected == get_call_repr(callable_cls.static_method) + + expected = 'tests.conftest.Callable.instance_method()' + assert expected == get_call_repr(callable_cls().instance_method) + + expected = 'tests.conftest.Callable()' + assert expected == get_call_repr(callable_cls()) + + +def test_call_repr_simple_args(failure_func): + expected = 'tests.conftest.failure_function(1)' + assert expected == get_call_repr(failure_func, 1) + + expected = 'tests.conftest.failure_function(1, 2)' + assert expected == get_call_repr(failure_func, 1, 2) + + expected = 'tests.conftest.failure_function(1, b=2)' + assert expected == get_call_repr(failure_func, 1, b=2) + + expected = 'tests.conftest.failure_function(a=1)' + assert expected == get_call_repr(failure_func, a=1) + + expected = 'tests.conftest.failure_function(b=1)' + assert expected == get_call_repr(failure_func, b=1) + + expected = 'tests.conftest.failure_function(a=1, b=2)' + assert expected == get_call_repr(failure_func, a=1, b=2) + + expected = 'tests.conftest.failure_function(a=1, b=2)' + assert expected == get_call_repr(failure_func, b=2, a=1) + + +def test_call_repr_complex_args(timeout_func): + expected = 'tests.conftest.timeout_function([1])' + assert expected == get_call_repr(timeout_func, [1]) + + expected = 'tests.conftest.timeout_function([1], [2])' + assert expected == get_call_repr(timeout_func, [1], [2]) + + expected = 'tests.conftest.timeout_function([1], b=[2])' + assert expected == get_call_repr(timeout_func, [1], b=[2]) + + expected = 'tests.conftest.timeout_function(a=[1])' + assert expected == get_call_repr(timeout_func, a=[1]) + + expected = 'tests.conftest.timeout_function(b=[1])' + assert expected == get_call_repr(timeout_func, b=[1]) + + expected = 'tests.conftest.timeout_function(a=[1], b=[1, 2])' + assert expected == get_call_repr(timeout_func, a=[1], b=[1, 2]) + + expected = 'tests.conftest.timeout_function(a=[1], b=[1, 2])' + assert expected == get_call_repr(timeout_func, b=[1, 2], a=[1]) diff --git a/tests/test_version.py b/tests/test_version.py new file mode 100644 index 0000000..5cdcdff --- /dev/null +++ b/tests/test_version.py @@ -0,0 +1,9 @@ +from kq.version import __version__ + + +def test_version(): + assert isinstance(__version__, str) + + version_parts = __version__.split('.') + assert len(version_parts) == 3 + assert all(part.isdigit() for part in version_parts) diff --git a/tests/test_worker.py b/tests/test_worker.py index 3c8a1d0..27f7917 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,282 +1,120 @@ -from __future__ import absolute_import, print_function, unicode_literals - -from collections import namedtuple - -import dill -import mock import pytest -from kq import Job, Worker - -from .utils import ( - success_func, - failure_func, - timeout_func -) - -MockRecord = namedtuple( - 'MockRecord', - ['topic', 'partition', 'offset', 'value'] -) -# Mocks for patching KafkaConsumer -mock_consumer = mock.MagicMock() -mock_consumer.topics.return_value = ['foo', 'bar', 'baz'] -mock_consumer.partitions_for_topic.return_value = [1, 2, 3] -mock_consumer.position.return_value = 100 -mock_consumer_cls = mock.MagicMock() -mock_consumer_cls.return_value = mock_consumer - -# Mocks for patching the logging module -success_job = Job( - id='100', - timestamp=1, - topic='foo', - func=success_func, - args=[1, 2], - kwargs={'c': 3}, - timeout=None, -) -failure_job = Job( - id='200', - timestamp=2, - topic='foo', - func=failure_func, - args=[1, 2, 3], - kwargs={}, - timeout=100, -) -timeout_job = Job( - id='300', - timestamp=3, - topic='foo', - func=timeout_func, - args=[2, 3, 4], - kwargs={}, - timeout=100, -) -value1 = dill.dumps(success_job) -value2 = dill.dumps(failure_job) -value3 = dill.dumps(timeout_job) -value4 = 'This is an unpicklable value' -value5 = dill.dumps(['This is not a job!']) - -# Mocks for consumer records -rec11 = MockRecord(topic='foo', partition=1, offset=1, value=value1) -rec11_repr = 'Record(topic=foo, partition=1, offset=1)' -rec12 = MockRecord(topic='foo', partition=1, offset=2, value=value2) -rec12_repr = 'Record(topic=foo, partition=1, offset=2)' -rec21 = MockRecord(topic='foo', partition=2, offset=1, value=value3) -rec21_repr = 'Record(topic=foo, partition=2, offset=1)' -rec22 = MockRecord(topic='foo', partition=2, offset=2, value=value4) -rec22_repr = 'Record(topic=foo, partition=2, offset=2)' -rec34 = MockRecord(topic='foo', partition=3, offset=4, value=value5) -rec34_repr = 'Record(topic=foo, partition=3, offset=4)' - - -@pytest.fixture(autouse=True) -def consumer(monkeypatch): - monkeypatch.setattr('kafka.KafkaConsumer', mock_consumer_cls) - mock_consumer.reset_mock() - - -@pytest.fixture(autouse=True) -def logger(monkeypatch): - mock_logger = mock.MagicMock() - mock_get_logger = mock.MagicMock() - mock_get_logger.return_value = mock_logger - monkeypatch.setattr('logging.getLogger', mock_get_logger) - return mock_logger - - -@pytest.fixture(autouse=True) -def callback(): - return mock.MagicMock() - - -def test_init(logger, callback): - worker = Worker( - hosts='host:7000,host:8000', - topic='foo', - timeout=1000, - callback=callback, - job_size=10000000, - cafile='/test/files/cafile', - certfile='/test/files/certfile', - keyfile='/test/files/keyfile', - crlfile='/test/files/crlfile' - ) - mock_consumer_cls.assert_called_once_with( - 'foo', - group_id='foo', - bootstrap_servers='host:7000,host:8000', - max_partition_fetch_bytes=20000000, - ssl_cafile='/test/files/cafile', - ssl_certfile='/test/files/certfile', - ssl_keyfile='/test/files/keyfile', - ssl_crlfile='/test/files/crlfile', - consumer_timeout_ms=-1, - enable_auto_commit=False, - auto_offset_reset='latest', - ) - assert repr(worker) == 'Worker(topic=foo)' - assert worker.hosts == ['host:7000', 'host:8000'] - assert worker.timeout == 1000 - assert worker.topic == 'foo' - assert worker.consumer == mock_consumer - assert not callback.called - assert not logger.info.called - - -def test_start_job_success(logger, callback): - mock_consumer.__iter__ = lambda x: iter([rec11]) - worker = Worker( - hosts='localhost', - topic='foo', - callback=callback, - ) - worker.start() - logger.info.assert_has_calls([ - mock.call('Starting Worker(topic=foo) ...'), - mock.call('Processing {} ...'.format(rec11_repr)), - mock.call('Running Job 100: tests.utils.success_func(1, 2, c=3) ...'), - mock.call('Job 100 returned: (1, 2, 3)'), - mock.call('Executing callback ...') - ]) - callback.assert_called_once_with( - 'success', success_job, (1, 2, 3), None, None - ) - - -def test_start_job_failure(logger, callback): - mock_consumer.__iter__ = lambda x: iter([rec12]) - worker = Worker( - hosts='localhost', - topic='foo', - timeout=1000, - callback=callback, - ) - worker.start() - logger.info.assert_has_calls([ - mock.call('Starting Worker(topic=foo) ...'), - mock.call('Processing {} ...'.format(rec12_repr)), - mock.call('Running Job 200: tests.utils.failure_func(1, 2, 3) ...'), - mock.call('Executing callback ...') - ]) - logger.exception.assert_called_with('Job 200 failed: failed!') - assert len(callback.call_args_list) == 1 - - callback_args = callback.call_args_list[0][0] - assert callback_args[0] == 'failure' - assert callback_args[1] == failure_job - assert callback_args[2] is None - assert isinstance(callback_args[3], ValueError) - assert isinstance(callback_args[4], str) - - -def test_start_job_timeout(logger, callback): - mock_consumer.__iter__ = lambda x: iter([rec21]) - worker = Worker( - hosts='localhost', - topic='foo', - timeout=1000, - callback=callback, - ) - worker.start() - logger.info.assert_has_calls([ - mock.call('Starting Worker(topic=foo) ...'), - mock.call('Processing {} ...'.format(rec21_repr)), - mock.call('Running Job 300: tests.utils.timeout_func(2, 3, 4) ...'), - mock.call('Executing callback ...') - ]) - logger.error.assert_called_once_with( - 'Job 300 timed out after 100 seconds.' - ) - callback.assert_called_once_with( - 'timeout', timeout_job, None, None, None - ) - - -def test_start_job_unloadable(logger, callback): - mock_consumer.__iter__ = lambda x: iter([rec22]) - worker = Worker( - hosts='localhost', - topic='foo', - timeout=1000, - callback=callback, - ) - worker.start() - logger.info.assert_has_calls([ - mock.call('Starting Worker(topic=foo) ...'), - mock.call('Processing {} ...'.format(rec22_repr)), - ]) - logger.warning.assert_called_once_with( - '{} unloadable. Skipping ...'.format(rec22_repr) - ) - assert not callback.called - - -def test_start_job_malformed(logger, callback): - mock_consumer.__iter__ = lambda x: iter([rec34]) - worker = Worker( - hosts='localhost', - topic='foo', - timeout=1000, - callback=callback, - ) - worker.start() - logger.info.assert_has_calls([ - mock.call('Starting Worker(topic=foo) ...'), - mock.call('Processing {} ...'.format(rec34_repr)), - ]) - logger.warning.assert_called_once_with( - '{} malformed. Skipping ...'.format(rec34_repr) - ) - assert not callback.called +from kafka import KafkaConsumer + +from kq import Worker -def test_start_job_callback_fail(logger, callback): - mock_consumer.__iter__ = lambda x: iter([rec11]) - expected_error = KeyError('foo') - callback.side_effect = expected_error - worker = Worker( - hosts='localhost', - topic='foo', - callback=callback, - ) - worker.start() - logger.info.assert_has_calls([ - mock.call('Starting Worker(topic=foo) ...'), - mock.call('Processing {} ...'.format(rec11_repr)), - mock.call('Running Job 100: tests.utils.success_func(1, 2, c=3) ...'), - mock.call('Job 100 returned: (1, 2, 3)'), - mock.call('Executing callback ...') - ]) - logger.exception.assert_called_once_with( - 'Callback failed: {}'.format(expected_error) - ) +def test_worker_properties(worker, hosts, topic, group): + assert hosts in repr(worker) + assert topic in repr(worker) + assert group in repr(worker) + assert worker.consumer.config['bootstrap_servers'] == hosts + assert worker.consumer.config['group_id'] == group -def test_start_proc_ttl_reached(logger, callback): - mock_consumer.__iter__ = lambda x: iter([rec11, rec11]) - worker = Worker( - hosts='localhost', - topic='foo', - callback=callback, - proc_ttl=1, - ) - worker.start() - logger.info.assert_has_calls([ - mock.call('Starting Worker(topic=foo) ...'), - mock.call('Processing {} ...'.format(rec11_repr)), - mock.call('Running Job 100: tests.utils.success_func(1, 2, c=3) ...'), - mock.call('Job 100 returned: (1, 2, 3)'), - mock.call('Executing callback ...'), - mock.call('Processing {} ...'.format(rec11_repr)), - mock.call('Running Job 100: tests.utils.success_func(1, 2, c=3) ...'), - mock.call('Job 100 returned: (1, 2, 3)'), - mock.call('Executing callback ...'), - mock.call('Refreshing process pool ...'), - ]) - callback.assert_called_with( - 'success', success_job, (1, 2, 3), None, None - ) + assert isinstance(worker.hosts, str) and worker.hosts == hosts + assert isinstance(worker.topic, str) and worker.topic == topic + assert isinstance(worker.group, str) and worker.group == group + assert isinstance(worker.consumer, KafkaConsumer) + assert callable(worker.deserializer) + assert callable(worker.callback) or worker.callback is None + + +# noinspection PyTypeChecker +def test_worker_initialization_with_bad_args(hosts, consumer): + with pytest.raises(AssertionError) as e: + Worker(topic=True, consumer=consumer) + assert str(e.value) == 'topic must be a str' + + with pytest.raises(AssertionError) as e: + Worker(topic='topic', consumer='bar') + assert str(e.value) == 'bad consumer instance' + + with pytest.raises(AssertionError) as e: + bad_consumer = KafkaConsumer(bootstrap_servers=hosts) + Worker(topic='topic', consumer=bad_consumer) + assert str(e.value) == 'consumer must have group_id' + + with pytest.raises(AssertionError) as e: + Worker(topic='topic', consumer=consumer, callback=1) + assert str(e.value) == 'callback must be a callable' + + with pytest.raises(AssertionError) as e: + Worker(topic='topic', consumer=consumer, deserializer=1) + assert str(e.value) == 'deserializer must be a callable' + + with pytest.raises(AssertionError) as e: + Worker(topic='topic', consumer=consumer, logger=1) + assert str(e.value) == 'bad logger instance' + + +def test_worker_run_success_function(queue, worker, success_func, log): + job = queue.enqueue(success_func, 1, 2) + worker.start(max_messages=1) + + out = log.last_lines(7) + assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) + assert next(out).startswith('[INFO] Starting {}'.format(worker)) + assert next(out).startswith('[INFO] Processing Message') + assert next(out).startswith('[INFO] Executing job {}'.format(job.id)) + assert next(out).startswith('[INFO] Job {} returned: 2'.format(job.id)) + assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith('[INFO] Callback got job status "success"') + + +def test_worker_run_failure_function(queue, worker, failure_func, log): + job = queue.enqueue(failure_func, 2, 3) + worker.start(max_messages=1) + + out = log.last_lines(7) + assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) + assert next(out).startswith('[INFO] Starting {}'.format(worker)) + assert next(out).startswith('[INFO] Processing Message') + assert next(out).startswith('[INFO] Executing job {}'.format(job.id)) + assert next(out).startswith('[ERROR] Job {} raised'.format(job.id)) + assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith('[INFO] Callback got job status "failure"') + + +def test_worker_run_timeout_function(queue, worker, timeout_func, log): + job = queue.using(timeout=0.5).enqueue(timeout_func, 3, 4) + worker.start(max_messages=1) + + out = log.last_lines(7) + assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) + assert next(out).startswith('[INFO] Starting {}'.format(worker)) + assert next(out).startswith('[INFO] Processing Message') + assert next(out).startswith('[INFO] Executing job {}'.format(job.id)) + assert next(out).startswith('[ERROR] Job {} timed out'.format(job.id)) + assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith('[INFO] Callback got job status "timeout"') + + +def test_worker_run_bad_callback(queue, worker, success_func, callback, log): + job = queue.enqueue(success_func, 4, 5) + callback.succeed = False + worker.start(max_messages=1) + + out = log.last_lines(7) + assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) + assert next(out).startswith('[INFO] Starting {}'.format(worker)) + assert next(out).startswith('[INFO] Processing Message') + assert next(out).startswith('[INFO] Executing job {}'.format(job.id)) + assert next(out).startswith('[INFO] Job {} returned: 20'.format(job.id)) + assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith('[ERROR] Callback raised an exception') + + +def test_worker_run_bad_job(queue, worker, success_func, deserializer, log): + job = queue.enqueue(success_func, 5, 6) + deserializer.succeed = False + worker.start(max_messages=1) + + out = log.last_lines(6) + assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) + assert next(out).startswith('[INFO] Starting {}'.format(worker)) + assert next(out).startswith('[INFO] Processing Message') + assert next(out).startswith('[ERROR] Job was invalid') + assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith('[INFO] Callback got job status "invalid"') diff --git a/tests/utils.py b/tests/utils.py deleted file mode 100644 index a627d16..0000000 --- a/tests/utils.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import absolute_import, print_function, unicode_literals - -import multiprocessing -try: - from StringIO import StringIO -except ImportError: - from io import StringIO -import sys - - -class CaptureOutput(list): - - def __enter__(self): - self._orig_stdout = sys.stdout - self._temp_stdout = StringIO() - sys.stdout = self._temp_stdout - return self - - def __exit__(self, *args): - self.append(self._temp_stdout.getvalue()) - sys.stdout = self._orig_stdout - - -def success_func(a, b, c=None): - return a, b, c - - -def failure_func(*_): - raise ValueError('failed!') - - -def timeout_func(*_): - raise multiprocessing.TimeoutError