diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..458b09c --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,47 @@ +name: Build +on: + push: + branches: [ main, dev ] + pull_request: + branches: [ main, dev ] +jobs: + build: + name: Test + runs-on: ubuntu-latest + strategy: + matrix: + os: [ ubuntu-latest, macos-latest, windows-latest ] + python-version: [ 3.6, 3.7, 3.8, 3.9 ] + steps: + - name: Start Kafka docker + run: docker run -d -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev + - uses: actions/checkout@v2 + - name: Fetch complete history for all tags and branches + run: git fetch --prune --unshallow + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Setup pip + run: python -m pip install --upgrade pip setuptools wheel + - name: Install package + run: pip install .[dev] + - name: Run black + run: black --check . + - name: Run flake8 + run: flake8 . + - name: Run isort + run: isort --check --profile=black . + - name: Run mypy + run: mypy . + - name: Run pytest + run: py.test --cov=./ --cov-report=xml + - name: Run Sphinx doctest + run: python -m sphinx -b doctest docs docs/_build + - name: Run Sphinx HTML + run: python -m sphinx -b html -W docs docs/_build + - name: Upload coverge to Codecov + uses: codecov/codecov-action@v1 + if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.8' + with: + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/codeql-anaylsis.yaml b/.github/workflows/codeql-anaylsis.yaml new file mode 100644 index 0000000..bafb67f --- /dev/null +++ b/.github/workflows/codeql-anaylsis.yaml @@ -0,0 +1,22 @@ +name: CodeQL +on: + push: + branches: [ main, dev ] + pull_request: + branches: [ main, dev ] + schedule: + - cron: '21 2 * * 3' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Initialize CodeQL + uses: github/codeql-action/init@v1 + with: + languages: 'python' + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/pypi-upload.yaml b/.github/workflows/pypi-upload.yaml new file mode 100644 index 0000000..8c50645 --- /dev/null +++ b/.github/workflows/pypi-upload.yaml @@ -0,0 +1,31 @@ +name: Upload to PyPI +on: + release: + types: [created] +jobs: + upload: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Fetch complete history for all tags and branches + run: git fetch --prune --unshallow + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.x' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel twine setuptools-scm[toml] + - name: Build distribution + run: python setup.py sdist bdist_wheel + - name: Publish to PyPI Test + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TEST_TOKEN }} + run: twine upload --repository testpypi dist/* + - name: Publish to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} + run: twine upload --repository pypi dist/* diff --git a/.gitignore b/.gitignore index 305c50e..cebe266 100644 --- a/.gitignore +++ b/.gitignore @@ -111,3 +111,6 @@ venv.bak/ # KQ specific output.log + +# setuptools-scm +kq/version.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..fc7f652 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,31 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.4.0 + hooks: + - id: check-case-conflict + - id: check-executables-have-shebangs + - id: check-json + - id: check-merge-conflict + - id: check-symlinks + - id: check-toml + - id: check-yaml + - id: end-of-file-fixer + - id: mixed-line-ending + - repo: https://github.com/psf/black + rev: 20.8b1 + hooks: + - id: black + - repo: https://github.com/timothycrosley/isort + rev: 5.7.0 + hooks: + - id: isort + args: [ --profile, black ] + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.790 + hooks: + - id: mypy + args: [ --ignore-missing-imports ] + - repo: https://gitlab.com/pycqa/flake8 + rev: 3.8.4 + hooks: + - id: flake8 diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index fe2a417..0000000 --- a/.travis.yml +++ /dev/null @@ -1,26 +0,0 @@ -sudo: false -language: python -python: - - "3.5" - - "3.6" - - "3.7" - - "3.8" -services: - - docker -before_install: - - docker run --name kafka -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=127.0.0.1 --env ADVERTISED_PORT=9092 spotify/kafka -install: - - pip install pytest==4.6.11 - - pip install pytest-cov==2.10.0 - - pip install coveralls==2.0.0 - - pip install mock - - pip install flake8 - - pip install sphinx sphinx_rtd_theme - - pip install . -script: - - python -m flake8 - - python -m sphinx -b doctest docs docs/_build - - python -m sphinx -b html -W docs docs/_build - - py.test -s -v --cov=kq -after_success: - - coveralls diff --git a/LICENSE b/LICENSE index 6ac37cb..65368b7 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2016 Joohwan Oh +Copyright (c) 2016,2017,2018,2019,2020,2021 Joohwan Oh Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/MANIFEST.in b/MANIFEST.in index 95dea49..8815dcc 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,2 @@ -include README.rst LICENSE +include README.md LICENSE prune tests diff --git a/README.md b/README.md new file mode 100644 index 0000000..a61eefd --- /dev/null +++ b/README.md @@ -0,0 +1,121 @@ +## KQ: Kafka Job Queue for Python + +![Build](https://github.com/joowani/kq/workflows/Build/badge.svg) +![CodeQL](https://github.com/joowani/kq/workflows/CodeQL/badge.svg) +[![codecov](https://codecov.io/gh/joowani/kq/branch/master/graph/badge.svg?token=d6ooyuUCl6)](https://codecov.io/gh/joowani/kq) + +**KQ (Kafka Queue)** is a lightweight Python library which lets you queue and +execute jobs asynchronously using [Apache Kafka](https://kafka.apache.org/). It is +built on top of [kafka-python](https://github.com/dpkp/kafka-python). + +### Announcements + +* Python 3.5 will not be supported from kq v3.0.0. +* See [releases](https://github.com/joowani/kq/releases) for latest updates. + +### Requirements + +* [Apache Kafka](https://kafka.apache.org) 0.9+ +* Python 3.6+ + +### Installation + +Install using [pip](https://pip.pypa.io): + +```shell +pip install kq +``` + +### Usage + +Start your Kafka instance. +Example using [Kafka Docker](https://github.com/lensesio/fast-data-dev): + +```shell +docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev +``` + +Define your KQ ``worker.py`` module: + +```python +import logging + +from kafka import KafkaConsumer +from kq import Worker + +# Set up logging. +formatter = logging.Formatter('[%(levelname)s] %(message)s') +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(formatter) +logger = logging.getLogger('kq.worker') +logger.setLevel(logging.DEBUG) +logger.addHandler(stream_handler) + +# Set up a Kafka consumer. +consumer = KafkaConsumer( + bootstrap_servers='127.0.0.1:9092', + group_id='group', + auto_offset_reset='latest' +) + +# Set up a worker. +worker = Worker(topic='topic', consumer=consumer) +worker.start() +``` + +Start your worker: + +```shell +python my_worker.py +[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ... +``` + +Enqueue a function call: + +```python +import requests + +from kafka import KafkaProducer +from kq import Queue + +# Set up a Kafka producer. +producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + +# Set up a queue. +queue = Queue(topic='topic', producer=producer) + +# Enqueue a function call. +job = queue.enqueue(requests.get, 'https://google.com') + +# You can also specify the job timeout, Kafka message key and partition. +job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://google.com') +``` + +The worker executes the job in the background: + +```shell +python my_worker.py +[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ... +[INFO] Processing Message(topic=topic, partition=0, offset=0) ... +[INFO] Executing job c7bf2359: requests.api.get('https://www.google.com') +[INFO] Job c7bf2359 returned: +``` + +See [documentation](https://kq.readthedocs.io) for more information. + +### Contributing + +Set up dev environment: + +```shell +cd ~/your/kq/clone # Activate venv if you have one +pip install -e .[dev] # Install dev dependencies (black, mypy, pre-commit etc.) +pre-commit install # Install git pre-commit hooks +py.test # Run unit tests +``` + +Run unit tests: +```shell +docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev # Start Kafka docker +py.test +``` diff --git a/README.rst b/README.rst deleted file mode 100644 index 729c416..0000000 --- a/README.rst +++ /dev/null @@ -1,167 +0,0 @@ -KQ: Kafka-based Job Queue for Python ------------------------------------- - -.. image:: https://travis-ci.org/joowani/kq.svg?branch=master - :target: https://travis-ci.org/joowani/kq - :alt: Build Status - -.. image:: https://readthedocs.org/projects/kq/badge/?version=latest - :target: http://kq.readthedocs.io/en/latest/?badge=latest - :alt: Documentation Status - -.. image:: https://badge.fury.io/py/kq.svg - :target: https://badge.fury.io/py/kq - :alt: Package Version - -.. image:: https://img.shields.io/badge/python-3.5%2C%203.6%2C%203.7%2C%203.8-blue.svg - :target: https://github.com/joowani/kq - :alt: Python Versions - -.. image:: https://coveralls.io/repos/github/joowani/kq/badge.svg?branch=master - :target: https://coveralls.io/github/joowani/kq?branch=master - :alt: Test Coverage - -.. image:: https://img.shields.io/github/issues/joowani/kq.svg - :target: https://github.com/joowani/kq/issues - :alt: Issues Open - -.. image:: https://img.shields.io/badge/license-MIT-blue.svg - :target: https://raw.githubusercontent.com/joowani/kq/master/LICENSE - :alt: MIT License - -| - -**KQ (Kafka Queue)** is a lightweight Python library which lets you queue and -execute jobs asynchronously using `Apache Kafka`_. It uses kafka-python_ under -the hood. - -Announcements -============= - -* Please see the releases_ page for latest updates. - -Requirements -============ - -* `Apache Kafka`_ 0.9+ -* Python 3.5+ - -Installation -============ - -To install a stable version from PyPI_ (recommended): - -.. code-block:: bash - - ~$ pip install kq - -To install the latest version directly from GitHub_: - -.. code-block:: bash - - ~$ pip install -e git+git@github.com:joowani/kq.git@master#egg=kq - -You may need to use ``sudo`` depending on your environment. - -Getting Started -=============== - -First, ensure that your Kafka instance is up and running: - -.. code-block:: bash - - ~$ ./kafka-server-start.sh -daemon server.properties - -Define your KQ worker module: - -.. code-block:: python - - # my_worker.py - - import logging - - from kafka import KafkaConsumer - from kq import Worker - - # Set up logging. - formatter = logging.Formatter('[%(levelname)s] %(message)s') - stream_handler = logging.StreamHandler() - stream_handler.setFormatter(formatter) - logger = logging.getLogger('kq.worker') - logger.setLevel(logging.DEBUG) - logger.addHandler(stream_handler) - - # Set up a Kafka consumer. - consumer = KafkaConsumer( - bootstrap_servers='127.0.0.1:9092', - group_id='group', - auto_offset_reset='latest' - ) - - # Set up a worker. - worker = Worker(topic='topic', consumer=consumer) - worker.start() - -Start your worker: - -.. code-block:: bash - - ~$ python my_worker.py - [INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ... - -Enqueue a function call: - -.. code-block:: python - - import requests - - from kafka import KafkaProducer - from kq import Queue - - # Set up a Kafka producer. - producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') - - # Set up a queue. - queue = Queue(topic='topic', producer=producer) - - # Enqueue a function call. - job = queue.enqueue(requests.get, 'https://www.google.com') - -Sit back and watch the worker process it in the background: - -.. code-block:: bash - - ~$ python my_worker.py - [INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ... - [INFO] Processing Message(topic=topic, partition=0, offset=0) ... - [INFO] Executing job c7bf2359: requests.api.get('https://www.google.com') - [INFO] Job c7bf2359 returned: - -**NEW in 2.0.0**: You can now specify the job timeout, message key and partition: - -.. code-block:: python - - job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://www.google.com') - -Check out the documentation_ for more information. - -Contributing -============ - -Please have a look at this page_ before submitting a pull request. Thanks! - - -Credits -======= - -This project was inspired by RQ_. - -.. _Apache Kafka: https://kafka.apache.org -.. _kafka-python: https://github.com/dpkp/kafka-python -.. _2.0.0: https://github.com/joowani/kq/releases/tag/2.0.0 -.. _releases: https://github.com/joowani/kq/releases -.. _PyPI: https://pypi.python.org/pypi/kq -.. _GitHub: https://github.com/joowani/kq -.. _documentation: http://kq.readthedocs.io -.. _page: http://kq.readthedocs.io/en/master/contributing.html -.. _RQ: https://github.com/rq/rq diff --git a/docs/Makefile b/docs/Makefile index bef2c94..d4bb2cb 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -1,225 +1,20 @@ -# Makefile for Sphinx documentation +# Minimal makefile for Sphinx documentation # -# You can set these variables from the command line. -SPHINXOPTS = -SPHINXBUILD = sphinx-build -PAPER = +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . BUILDDIR = _build -# Internal variables. -PAPEROPT_a4 = -D latex_paper_size=a4 -PAPEROPT_letter = -D latex_paper_size=letter -ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . -# the i18n builder cannot share the environment and doctrees with the others -I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . - -.PHONY: help +# Put it first so that "make" without argument is like "make help". help: - @echo "Please use \`make ' where is one of" - @echo " html to make standalone HTML files" - @echo " dirhtml to make HTML files named index.html in directories" - @echo " singlehtml to make a single large HTML file" - @echo " pickle to make pickle files" - @echo " json to make JSON files" - @echo " htmlhelp to make HTML files and a HTML help project" - @echo " qthelp to make HTML files and a qthelp project" - @echo " applehelp to make an Apple Help Book" - @echo " devhelp to make HTML files and a Devhelp project" - @echo " epub to make an epub" - @echo " epub3 to make an epub3" - @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" - @echo " latexpdf to make LaTeX files and run them through pdflatex" - @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx" - @echo " text to make text files" - @echo " man to make manual pages" - @echo " texinfo to make Texinfo files" - @echo " info to make Texinfo files and run them through makeinfo" - @echo " gettext to make PO message catalogs" - @echo " changes to make an overview of all changed/added/deprecated items" - @echo " xml to make Docutils-native XML files" - @echo " pseudoxml to make pseudoxml-XML files for display purposes" - @echo " linkcheck to check all external links for integrity" - @echo " doctest to run all doctests embedded in the documentation (if enabled)" - @echo " coverage to run coverage check of the documentation (if enabled)" - @echo " dummy to check syntax errors of document sources" - -.PHONY: clean -clean: - rm -rf $(BUILDDIR)/* - -.PHONY: html -html: - $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html - @echo - @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." - -.PHONY: dirhtml -dirhtml: - $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml - @echo - @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml." - -.PHONY: singlehtml -singlehtml: - $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml - @echo - @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml." - -.PHONY: pickle -pickle: - $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle - @echo - @echo "Build finished; now you can process the pickle files." - -.PHONY: json -json: - $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json - @echo - @echo "Build finished; now you can process the JSON files." - -.PHONY: htmlhelp -htmlhelp: - $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp - @echo - @echo "Build finished; now you can run HTML Help Workshop with the" \ - ".hhp project file in $(BUILDDIR)/htmlhelp." - -.PHONY: qthelp -qthelp: - $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp - @echo - @echo "Build finished; now you can run "qcollectiongenerator" with the" \ - ".qhcp project file in $(BUILDDIR)/qthelp, like this:" - @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/KQ.qhcp" - @echo "To view the help file:" - @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/KQ.qhc" - -.PHONY: applehelp -applehelp: - $(SPHINXBUILD) -b applehelp $(ALLSPHINXOPTS) $(BUILDDIR)/applehelp - @echo - @echo "Build finished. The help book is in $(BUILDDIR)/applehelp." - @echo "N.B. You won't be able to view it unless you put it in" \ - "~/Library/Documentation/Help or install it in your application" \ - "bundle." - -.PHONY: devhelp -devhelp: - $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp - @echo - @echo "Build finished." - @echo "To view the help file:" - @echo "# mkdir -p $$HOME/.local/share/devhelp/KQ" - @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/KQ" - @echo "# devhelp" - -.PHONY: epub -epub: - $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub - @echo - @echo "Build finished. The epub file is in $(BUILDDIR)/epub." - -.PHONY: epub3 -epub3: - $(SPHINXBUILD) -b epub3 $(ALLSPHINXOPTS) $(BUILDDIR)/epub3 - @echo - @echo "Build finished. The epub3 file is in $(BUILDDIR)/epub3." - -.PHONY: latex -latex: - $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex - @echo - @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex." - @echo "Run \`make' in that directory to run these through (pdf)latex" \ - "(use \`make latexpdf' here to do that automatically)." - -.PHONY: latexpdf -latexpdf: - $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex - @echo "Running LaTeX files through pdflatex..." - $(MAKE) -C $(BUILDDIR)/latex all-pdf - @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." - -.PHONY: latexpdfja -latexpdfja: - $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex - @echo "Running LaTeX files through platex and dvipdfmx..." - $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja - @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." - -.PHONY: text -text: - $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text - @echo - @echo "Build finished. The text files are in $(BUILDDIR)/text." - -.PHONY: man -man: - $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man - @echo - @echo "Build finished. The manual pages are in $(BUILDDIR)/man." - -.PHONY: texinfo -texinfo: - $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo - @echo - @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo." - @echo "Run \`make' in that directory to run these through makeinfo" \ - "(use \`make info' here to do that automatically)." - -.PHONY: info -info: - $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo - @echo "Running Texinfo files through makeinfo..." - make -C $(BUILDDIR)/texinfo info - @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo." - -.PHONY: gettext -gettext: - $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale - @echo - @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale." - -.PHONY: changes -changes: - $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes - @echo - @echo "The overview file is in $(BUILDDIR)/changes." - -.PHONY: linkcheck -linkcheck: - $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck - @echo - @echo "Link check complete; look for any errors in the above output " \ - "or in $(BUILDDIR)/linkcheck/output.txt." - -.PHONY: doctest -doctest: - $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest - @echo "Testing of doctests in the sources finished, look at the " \ - "results in $(BUILDDIR)/doctest/output.txt." - -.PHONY: coverage -coverage: - $(SPHINXBUILD) -b coverage $(ALLSPHINXOPTS) $(BUILDDIR)/coverage - @echo "Testing of coverage in the sources finished, look at the " \ - "results in $(BUILDDIR)/coverage/python.txt." - -.PHONY: xml -xml: - $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml - @echo - @echo "Build finished. The XML files are in $(BUILDDIR)/xml." + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) -.PHONY: pseudoxml -pseudoxml: - $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml - @echo - @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml." +.PHONY: help Makefile -.PHONY: dummy -dummy: - $(SPHINXBUILD) -b dummy $(ALLSPHINXOPTS) $(BUILDDIR)/dummy - @echo - @echo "Build finished. Dummy builder generates no files." +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/callback.rst b/docs/callback.rst index df6abd3..0a4e7d4 100644 --- a/docs/callback.rst +++ b/docs/callback.rst @@ -1,9 +1,8 @@ Callback -------- -KQ allows you to assign a callback function to workers. The callback function -is invoked every time a message is processed. It must take the following -positional arguments: +KQ lets you set a callback function to workers. The callback function is invoked each +time a message is processed. It must accept the following positional arguments: * **status** (str): Job status. Possible values are: diff --git a/docs/conf.py b/docs/conf.py index ddb54bc..c5b67d9 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,16 +1,10 @@ -# -*- coding: utf-8 -*- +# Configuration file for the Sphinx documentation builder. # -# KQ documentation build configuration file, created by -# sphinx-quickstart on Fri Oct 28 22:45:13 2016. -# -# This file is execfile()d with the current directory set to its -# containing dir. -# -# Note that not all possible configuration values are present in this -# autogenerated file. -# -# All configuration values have a default; values that are commented out -# serve to show the default. +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Path setup -------------------------------------------------------------- # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the @@ -19,339 +13,51 @@ import os import sys -sys.path.insert(0, os.path.abspath('..')) -sys.path.append(os.path.dirname(__file__)) +sys.path.insert(0, os.path.abspath("..")) -_version = {} -with open("../kq/version.py") as fp: - exec(fp.read(), _version) +import kq -# -- General configuration ------------------------------------------------ +# -- Project information ----------------------------------------------------- + +project = "KQ" +copyright = "2021, Joohwan Oh" +author = "Joohwan Oh" -# If your documentation needs a minimal Sphinx version, state it here. -# -# needs_sphinx = '1.0' + +# -- General configuration --------------------------------------------------- # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.doctest', - 'sphinx.ext.coverage', - 'sphinx.ext.viewcode', - 'sphinx.ext.githubpages', + "sphinx_rtd_theme", + "sphinx.ext.autodoc", + "sphinx.ext.doctest", + "sphinx.ext.viewcode", ] -# Add any paths that contain templates here, relative to this directory. -templates_path = ['templates'] - -# The suffix(es) of source filenames. -# You can specify multiple suffix as a list of string: -# -# source_suffix = ['.rst', '.md'] -source_suffix = '.rst' - -# The encoding of source files. -# -# source_encoding = 'utf-8-sig' - -# The master toctree document. -master_doc = 'index' - -# General information about the project. -project = u'KQ' -copyright = u'2016, Joohwan Oh' -author = u'Joohwan Oh' +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -version = _version['__version__'] -# The full version, including alpha/beta/rc tags. -release = _version['__version__'] - -# The language for content autogenerated by Sphinx. Refer to documentation -# for a list of supported languages. -# -# This is also used if you do content translation via gettext catalogs. -# Usually you set "language" from the command line for these cases. -language = None - -# There are two options for replacing |today|: either, you set today to some -# non-false value, then it is used: -# -# today = '' -# -# Else, today_fmt is used as the format for a strftime call. -# -# today_fmt = '%B %d, %Y' - -# List of patterns, relative to source directory, that match files and -# directories to ignore when looking for source files. -# This patterns also effect to html_static_path and html_extra_path -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] - -# The reST default role (used for this markup: `text`) to use for all -# documents. -# -# default_role = None - -# If true, '()' will be appended to :func: etc. cross-reference text. -# -# add_function_parentheses = True - -# If true, the current module name will be prepended to all description -# unit titles (such as .. function::). -# -# add_module_names = True - -# If true, sectionauthor and moduleauthor directives will be shown in the -# output. They are ignored by default. -# -# show_authors = False - -# The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' - -# A list of ignored prefixes for module index sorting. -# modindex_common_prefix = [] - -# If true, keep warnings as "system message" paragraphs in the built documents. -# keep_warnings = False - -# If true, `todo` and `todoList` produce output, else they produce nothing. -todo_include_todos = False +version = kq.__version__ +# The full version, including alpha/beta/rc tagss +release = kq.__version__ +# The master toctree document. +master_doc = "index" -# -- Options for HTML output ---------------------------------------------- +# -- Options for HTML output ------------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'sphinx_rtd_theme' - -# Theme options are theme-specific and customize the look and feel of a theme -# further. For a list of options available for each theme, see the -# documentation. -# -# html_theme_options = {} - -# Add any paths that contain custom themes here, relative to this directory. -# html_theme_path = [] - -# The name for this set of Sphinx documents. -# " v documentation" by default. -# -# html_title = u'KQ v1.0' - -# A shorter title for the navigation bar. Default is the same as html_title. -# -# html_short_title = None - -# The name of an image file (relative to this directory) to place at the top -# of the sidebar. -# -# html_logo = None - -# The name of an image file (relative to this directory) to use as a favicon of -# the docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 -# pixels large. -# -# html_favicon = None - -# Add any paths that contain custom static files (such as style sheets) here, -# relative to this directory. They are copied after the builtin static files, -# so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = [] - -# Add any extra paths that contain custom files (such as robots.txt or -# .htaccess) here, relative to this directory. These files are copied -# directly to the root of the documentation. -# -# html_extra_path = [] - -# If not None, a 'Last updated on:' timestamp is inserted at every page -# bottom, using the given strftime format. -# The empty string is equivalent to '%b %d, %Y'. -# -# html_last_updated_fmt = None - -# If true, SmartyPants will be used to convert quotes and dashes to -# typographically correct entities. -# -# html_use_smartypants = True - -# Custom sidebar templates, maps document names to template names. -# -# html_sidebars = {} - -# Additional templates that should be rendered to pages, maps page names to -# template names. -# -# html_additional_pages = {} - -# If false, no module index is generated. -# -# html_domain_indices = True - -# If false, no index is generated. -# -# html_use_index = True - -# If true, the index is split into individual pages for each letter. -# -# html_split_index = False - -# If true, links to the reST sources are added to the pages. -# -# html_show_sourcelink = True - -# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. -# -# html_show_sphinx = True - -# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. -# -# html_show_copyright = True - -# If true, an OpenSearch description file will be output, and all pages will -# contain a tag referring to it. The value of this option must be the -# base URL from which the finished HTML is served. -# -# html_use_opensearch = '' - -# This is the file name suffix for HTML files (e.g. ".xhtml"). -# html_file_suffix = None - -# Language to be used for generating the HTML full-text search index. -# Sphinx supports the following languages: -# 'da', 'de', 'en', 'es', 'fi', 'fr', 'hu', 'it', 'ja' -# 'nl', 'no', 'pt', 'ro', 'ru', 'sv', 'tr', 'zh' -# -# html_search_language = 'en' - -# A dictionary with options for the search language support, empty by default. -# 'ja' uses this config value. -# 'zh' user can custom change `jieba` dictionary path. -# -# html_search_options = {'type': 'default'} - -# The name of a javascript file (relative to the configuration directory) that -# implements a search results scorer. If empty, the default will be used. -# -# html_search_scorer = 'scorer.js' - -# Output file base name for HTML help builder. -htmlhelp_basename = 'kqdoc' - -# -- Options for LaTeX output --------------------------------------------- - -latex_elements = { - # The paper size ('letterpaper' or 'a4paper'). - # - # 'papersize': 'letterpaper', - - # The font size ('10pt', '11pt' or '12pt'). - # - # 'pointsize': '10pt', - - # Additional stuff for the LaTeX preamble. - # - # 'preamble': '', - - # Latex figure (float) alignment - # - # 'figure_align': 'htbp', -} - -# Grouping the document tree into LaTeX files. List of tuples -# (source start file, target name, title, -# author, documentclass [howto, manual, or own class]). -latex_documents = [ - (master_doc, 'kq.tex', u'KQ Documentation', - u'Joohwan Oh', 'manual'), -] - -# The name of an image file (relative to this directory) to place at the top of -# the title page. -# -# latex_logo = None - -# For "manual" documents, if this is true, then toplevel headings are parts, -# not chapters. -# -# latex_use_parts = False - -# If true, show page references after internal links. -# -# latex_show_pagerefs = False - -# If true, show URL addresses after external links. -# -# latex_show_urls = False - -# Documents to append as an appendix to all manuals. -# -# latex_appendices = [] - -# It false, will not define \strong, \code, itleref, \crossref ... but only -# \sphinxstrong, ..., \sphinxtitleref, ... To help avoid clash with user added -# packages. -# -# latex_keep_old_macro_names = True - -# If false, no module index is generated. -# -# latex_domain_indices = True - - -# -- Options for manual page output --------------------------------------- - -# One entry per manual page. List of tuples -# (source start file, name, description, authors, manual section). -man_pages = [ - (master_doc, 'kq', u'KQ Documentation', - [author], 1) -] - -# If true, show URL addresses after external links. -# -# man_show_urls = False - - -# -- Options for Texinfo output ------------------------------------------- - -# Grouping the document tree into Texinfo files. List of tuples -# (source start file, target name, title, author, -# dir menu entry, description, category) -texinfo_documents = [ - (master_doc, 'kq', u'KQ Documentation', - author, 'kq', 'One line description of project.', - 'Miscellaneous'), -] - -# Documents to append as an appendix to all manuals. -# -# texinfo_appendices = [] - -# If false, no module index is generated. -# -# texinfo_domain_indices = True - -# How to display URL addresses: 'footnote', 'no', or 'inline'. -# -# texinfo_show_urls = 'footnote' - -# If true, do not generate a @detailmenu in the "Top" node's menu. -# -# texinfo_no_detailmenu = False - -autodoc_member_order = 'bysource' +html_theme = "sphinx_rtd_theme" -doctest_global_setup = """ -from unittest import mock -mock.patch('math.inf', 0).start() -""" +htmlhelp_basename = "kqdoc" diff --git a/docs/contributing.rst b/docs/contributing.rst deleted file mode 100644 index 5dd92aa..0000000 --- a/docs/contributing.rst +++ /dev/null @@ -1,101 +0,0 @@ -Contributing ------------- - -Requirements -============ - -Before submitting a pull request on GitHub_, please make sure you meet the -following requirements: - -* The pull request points to dev_ branch. -* Changes are squashed into a single commit. I like to use git rebase for this. -* Commit message is in present tense. For example, "Fix bug" is good while - "Fixed bug" is not. -* Sphinx_-compatible docstrings. -* PEP8_ compliance. -* No missing docstrings or commented-out lines. -* Test coverage_ remains at %100. If a piece of code is trivial and does not - need unit tests, use this_ to exclude it from coverage. -* No build failures on `Travis CI`_. Builds automatically trigger on pull - request submissions. -* Documentation is kept up-to-date with the new changes (see below). - -.. warning:: - The dev branch is occasionally rebased, and its commit history may be - overwritten in the process. Before you begin your feature work, git fetch - or pull to ensure that your local branch has not diverged. If you see git - conflicts and want to start with a clean slate, run the following commands: - - .. code-block:: bash - - ~$ git checkout dev - ~$ git fetch origin - ~$ git reset --hard origin/dev # THIS WILL WIPE ALL LOCAL CHANGES - -Style -===== - -To ensure PEP8_ compliance, run flake8_: - -.. code-block:: bash - - ~$ pip install flake8 - ~$ git clone https://github.com/joowani/kq.git - ~$ cd kq - ~$ flake8 - -If there is a good reason to ignore a warning, see here_ on how to exclude it. - -Testing -======= - -To test your changes, you can run the integration test suite that comes with -**kq**. It uses pytest_ and requires an actual Kafka instance. - -To run the test suite (use your own Kafka broker host and port): - -.. code-block:: bash - - ~$ pip install pytest - ~$ git clone https://github.com/joowani/kq.git - ~$ cd kq - ~$ py.test -v -s --host=127.0.0.1 --port=9092 - -To run the test suite with coverage report: - -.. code-block:: bash - - ~$ pip install coverage pytest pytest-cov - ~$ git clone https://github.com/joowani/kq.git - ~$ cd kq - ~$ py.test -v -s --host=127.0.0.1 --port=9092 --cov=kq - -As the test suite creates real topics and messages, it should only be run in -development environments. - -Documentation -============= - -The documentation including the README is written in reStructuredText_ and uses -Sphinx_. To build an HTML version on your local machine: - -.. code-block:: bash - - ~$ pip install sphinx sphinx_rtd_theme - ~$ git clone https://github.com/joowani/kq.git - ~$ cd kq/docs - ~$ sphinx-build . build # Open build/index.html in a browser - -As always, thank you for your contribution! - -.. _dev: https://github.com/joowani/kq/tree/dev -.. _GitHub: https://github.com/joowani/kq -.. _PEP8: https://www.python.org/dev/peps/pep-0008/ -.. _coverage: https://coveralls.io/github/joowani/kq -.. _this: http://coverage.readthedocs.io/en/latest/excluding.html -.. _Travis CI: https://travis-ci.org/joowani/kq -.. _Sphinx: https://github.com/sphinx-doc/sphinx -.. _flake8: http://flake8.pycqa.org -.. _here: http://flake8.pycqa.org/en/latest/user/violations.html#in-line-ignoring-errors -.. _pytest: https://github.com/pytest-dev/pytest -.. _reStructuredText: https://en.wikipedia.org/wiki/ReStructuredText diff --git a/docs/index.rst b/docs/index.rst index 1652269..133af37 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,32 +1,25 @@ KQ: Kafka-based Job Queue for Python ------------------------------------ -Welcome to the documentation for **KQ (Kafka Queue)**, a lightweight Python -library which lets you queue and execute jobs asynchronously using `Apache Kafka`_. -It uses kafka-python_ under the hood. +Welcome to the documentation for **KQ (Kafka Queue)**, Python library that lets you +enqueue and execute jobs asynchronously using `Apache Kafka`_. + +KQ is built on top of kafka-python_. Requirements ============ - `Apache Kafka`_ 0.9+ -- Python 3.5+ +- Python 3.6+ Installation ============ -To install a stable version from PyPI_ (recommended): - -.. code-block:: bash - - ~$ pip install kq - -To install the latest version directly from GitHub_: +Install using pip: .. code-block:: bash - ~$ pip install -e git+git@github.com:joowani/kq.git@master#egg=kq - -You may need to use ``sudo`` depending on your environment. + pip install kq Contents ======== @@ -42,9 +35,7 @@ Contents callback serializer logging - contributing .. _Apache Kafka: https://kafka.apache.org .. _kafka-python: https://github.com/dpkp/kafka-python -.. _PyPI: https://pypi.python.org/pypi/kq .. _GitHub: https://github.com/joowani/kq diff --git a/docs/job.rst b/docs/job.rst index e042239..35de6bc 100644 --- a/docs/job.rst +++ b/docs/job.rst @@ -1,41 +1,45 @@ Job ---- -KQ encapsulates jobs using ``kq.Job`` namedtuples, which have the following -fields: - -* **id** (str): Job ID. -* **timestamp** (int): Unix timestamp indicating the time of enqueue. -* **topic** (str): Name of the Kafka topic. -* **func** (callable): Function to execute. -* **args** (list | tuple): Positional arguments for the function. -* **kwargs** (dict): Keyword arguments for the function. -* **timeout** (int | float): Job timeout threshold in seconds. -* **key** (bytes | None): Kafka message key. Jobs with the same keys are sent - to the same topic partition and executed sequentially. Applies only if the - **partition** field is not set, and the producer's partitioner configuration - is left as default. -* **partition** (int | None): Kafka topic partition. If set, the **key** field - is ignored. +KQ encapsulates jobs using ``kq.Job`` dataclass: .. testcode:: - from collections import namedtuple - - Job = namedtuple( - typename='Job', - field_names=( - 'id', - 'timestamp', - 'topic', - 'func', - 'args', - 'kwargs', - 'timeout', - 'key', - 'partition' - ) - ) - -When a function call is enqueued, an instance of this namedtuple is created to -store the metadata. It is then serialized into a byte string and sent to Kafka. + from dataclasses import dataclass + from typing import Callable, Dict, List, Optional, Union + + + @dataclass(frozen=True) + class Job: + + # KQ job UUID. + id: Optional[str] = None + + # Unix timestamp indicating when the job was queued. + timestamp: Optional[int] = None + + # Name of the Kafka topic. + topic: Optional[str] = None + + # Function to execute. + func: Optional[Callable] = None + + # Positional arguments for the function. + args: Optional[List] = None + + # Keyword arguments for the function. + kwargs: Optional[Dict] = None + + # Job timeout threshold in seconds. + timeout: Optional[Union[float, int]] = None + + # Kafka message key. Jobs with the same keys are sent + # to the same topic partition and executed sequentially. + # Applies only when the "partition" field is not set. + key: Optional[str] = None + + # Kafka topic partition. If set, the "key" field is ignored. + partition: Optional[str] = None + +When a function call is enqueued, an instance of this dataclass is created to store the +message and the metadata. It is then serialized into a byte string and sent to Kafka. diff --git a/docs/logging.rst b/docs/logging.rst index d0596d1..5db079f 100644 --- a/docs/logging.rst +++ b/docs/logging.rst @@ -3,7 +3,7 @@ Logging By default, :doc:`queues ` log messages via ``kq.queue`` logger, and :doc:`workers ` log messages via ``kq.worker`` logger. You can either -use these loggers or inject your own during queue/worker initialization. +use these default loggers or set your own during queue/worker initialization. **Example:** diff --git a/docs/make.bat b/docs/make.bat index 57a8333..922152e 100644 --- a/docs/make.bat +++ b/docs/make.bat @@ -1,281 +1,35 @@ -@ECHO OFF - -REM Command file for Sphinx documentation - -if "%SPHINXBUILD%" == "" ( - set SPHINXBUILD=sphinx-build -) -set BUILDDIR=_build -set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% . -set I18NSPHINXOPTS=%SPHINXOPTS% . -if NOT "%PAPER%" == "" ( - set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS% - set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS% -) - -if "%1" == "" goto help - -if "%1" == "help" ( - :help - echo.Please use `make ^` where ^ is one of - echo. html to make standalone HTML files - echo. dirhtml to make HTML files named index.html in directories - echo. singlehtml to make a single large HTML file - echo. pickle to make pickle files - echo. json to make JSON files - echo. htmlhelp to make HTML files and a HTML help project - echo. qthelp to make HTML files and a qthelp project - echo. devhelp to make HTML files and a Devhelp project - echo. epub to make an epub - echo. epub3 to make an epub3 - echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter - echo. text to make text files - echo. man to make manual pages - echo. texinfo to make Texinfo files - echo. gettext to make PO message catalogs - echo. changes to make an overview over all changed/added/deprecated items - echo. xml to make Docutils-native XML files - echo. pseudoxml to make pseudoxml-XML files for display purposes - echo. linkcheck to check all external links for integrity - echo. doctest to run all doctests embedded in the documentation if enabled - echo. coverage to run coverage check of the documentation if enabled - echo. dummy to check syntax errors of document sources - goto end -) - -if "%1" == "clean" ( - for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i - del /q /s %BUILDDIR%\* - goto end -) - - -REM Check if sphinx-build is available and fallback to Python version if any -%SPHINXBUILD% 1>NUL 2>NUL -if errorlevel 9009 goto sphinx_python -goto sphinx_ok - -:sphinx_python - -set SPHINXBUILD=python -m sphinx.__init__ -%SPHINXBUILD% 2> nul -if errorlevel 9009 ( - echo. - echo.The 'sphinx-build' command was not found. Make sure you have Sphinx - echo.installed, then set the SPHINXBUILD environment variable to point - echo.to the full path of the 'sphinx-build' executable. Alternatively you - echo.may add the Sphinx directory to PATH. - echo. - echo.If you don't have Sphinx installed, grab it from - echo.http://sphinx-doc.org/ - exit /b 1 -) - -:sphinx_ok - - -if "%1" == "html" ( - %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The HTML pages are in %BUILDDIR%/html. - goto end -) - -if "%1" == "dirhtml" ( - %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml. - goto end -) - -if "%1" == "singlehtml" ( - %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml. - goto end -) - -if "%1" == "pickle" ( - %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle - if errorlevel 1 exit /b 1 - echo. - echo.Build finished; now you can process the pickle files. - goto end -) - -if "%1" == "json" ( - %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json - if errorlevel 1 exit /b 1 - echo. - echo.Build finished; now you can process the JSON files. - goto end -) - -if "%1" == "htmlhelp" ( - %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp - if errorlevel 1 exit /b 1 - echo. - echo.Build finished; now you can run HTML Help Workshop with the ^ -.hhp project file in %BUILDDIR%/htmlhelp. - goto end -) - -if "%1" == "qthelp" ( - %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp - if errorlevel 1 exit /b 1 - echo. - echo.Build finished; now you can run "qcollectiongenerator" with the ^ -.qhcp project file in %BUILDDIR%/qthelp, like this: - echo.^> qcollectiongenerator %BUILDDIR%\qthelp\KQ.qhcp - echo.To view the help file: - echo.^> assistant -collectionFile %BUILDDIR%\qthelp\KQ.ghc - goto end -) - -if "%1" == "devhelp" ( - %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. - goto end -) - -if "%1" == "epub" ( - %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The epub file is in %BUILDDIR%/epub. - goto end -) - -if "%1" == "epub3" ( - %SPHINXBUILD% -b epub3 %ALLSPHINXOPTS% %BUILDDIR%/epub3 - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The epub3 file is in %BUILDDIR%/epub3. - goto end -) - -if "%1" == "latex" ( - %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex - if errorlevel 1 exit /b 1 - echo. - echo.Build finished; the LaTeX files are in %BUILDDIR%/latex. - goto end -) - -if "%1" == "latexpdf" ( - %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex - cd %BUILDDIR%/latex - make all-pdf - cd %~dp0 - echo. - echo.Build finished; the PDF files are in %BUILDDIR%/latex. - goto end -) - -if "%1" == "latexpdfja" ( - %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex - cd %BUILDDIR%/latex - make all-pdf-ja - cd %~dp0 - echo. - echo.Build finished; the PDF files are in %BUILDDIR%/latex. - goto end -) - -if "%1" == "text" ( - %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The text files are in %BUILDDIR%/text. - goto end -) - -if "%1" == "man" ( - %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The manual pages are in %BUILDDIR%/man. - goto end -) - -if "%1" == "texinfo" ( - %SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo. - goto end -) - -if "%1" == "gettext" ( - %SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The message catalogs are in %BUILDDIR%/locale. - goto end -) - -if "%1" == "changes" ( - %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes - if errorlevel 1 exit /b 1 - echo. - echo.The overview file is in %BUILDDIR%/changes. - goto end -) - -if "%1" == "linkcheck" ( - %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck - if errorlevel 1 exit /b 1 - echo. - echo.Link check complete; look for any errors in the above output ^ -or in %BUILDDIR%/linkcheck/output.txt. - goto end -) - -if "%1" == "doctest" ( - %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest - if errorlevel 1 exit /b 1 - echo. - echo.Testing of doctests in the sources finished, look at the ^ -results in %BUILDDIR%/doctest/output.txt. - goto end -) - -if "%1" == "coverage" ( - %SPHINXBUILD% -b coverage %ALLSPHINXOPTS% %BUILDDIR%/coverage - if errorlevel 1 exit /b 1 - echo. - echo.Testing of coverage in the sources finished, look at the ^ -results in %BUILDDIR%/coverage/python.txt. - goto end -) - -if "%1" == "xml" ( - %SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The XML files are in %BUILDDIR%/xml. - goto end -) - -if "%1" == "pseudoxml" ( - %SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml. - goto end -) - -if "%1" == "dummy" ( - %SPHINXBUILD% -b dummy %ALLSPHINXOPTS% %BUILDDIR%/dummy - if errorlevel 1 exit /b 1 - echo. - echo.Build finished. Dummy builder generates no files. - goto end -) - -:end +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +if "%1" == "" goto help + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/message.rst b/docs/message.rst index c351801..ed29f5a 100644 --- a/docs/message.rst +++ b/docs/message.rst @@ -1,30 +1,31 @@ Message ------- -KQ encapsulates Kafka messages using ``kq.Message`` namedtuples, which have -the following fields: - -* **topic** (str): Name of the Kafka topic. -* **partition** (int): Kafka topic partition. -* **offset** (int): Partition offset. -* **key** (bytes | None): Kafka message key. -* **value** (bytes): Kafka message payload. +KQ encapsulates Kafka messages using ``kq.Message`` dataclass: .. testcode:: - from collections import namedtuple + from dataclasses import dataclass + from typing import Optional + + + @dataclass(frozen=True) + class Message: + # Name of the Kafka topic. + topic: str + + # Kafka topic partition. + partition: int + + # Partition offset. + offset: int + + # Kafka message key. + key: Optional[bytes] - Message = namedtuple( - typename='Message', - field_names=( - 'topic', - 'partition', - 'offset', - 'key', - 'value' - ) - ) + # Kafka message payload. + value: bytes -Raw Kafka messages are converted into these namedtuples, which are then sent +Raw Kafka messages are converted into above dataclasses, which are then sent to :doc:`workers ` (and also to :doc:`callback functions ` if defined). diff --git a/docs/overview.rst b/docs/overview.rst index 751e06c..3aa1755 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -1,23 +1,16 @@ Getting Started --------------- -First, ensure that your Kafka instance is up and running: +Start your Kafka instance. Example using Docker: .. code-block:: bash - ~$ ./kafka-server-start.sh -daemon server.properties + docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev -Define your KQ worker module: - -.. testsetup:: - - from unittest import mock - mock.patch('math.inf', 0).start() +Define your KQ ```worker.py`` module: .. code-block:: python - # my_worker.py - import logging from kafka import KafkaConsumer @@ -46,7 +39,7 @@ Start the worker: .. code-block:: bash - ~$ python my_worker.py + python my_worker.py [INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ... Enqueue a function call: @@ -65,20 +58,17 @@ Enqueue a function call: queue = Queue(topic='topic', producer=producer) # Enqueue a function call. - job = queue.enqueue(requests.get, 'https://www.google.com') + job = queue.enqueue(requests.get, 'https://google.com') + + # You can also specify the job timeout, Kafka message key and partition. + job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://google.com') -Sit back and watch the worker process it in the background: +Let the worker process it in the background: .. code-block:: bash - ~$ python my_worker.py + python my_worker.py [INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ... [INFO] Processing Message(topic=topic, partition=0, offset=0) ... [INFO] Executing job c7bf2359: requests.api.get('https://www.google.com') [INFO] Job c7bf2359 returned: - -You can also specify the job timeout, message key and partition: - -.. code-block:: python - - job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://www.google.com') diff --git a/docs/worker.rst b/docs/worker.rst index 93916ad..18f92e8 100644 --- a/docs/worker.rst +++ b/docs/worker.rst @@ -3,4 +3,4 @@ Worker .. autoclass:: kq.worker.Worker :members: - :member-order: bysource \ No newline at end of file + :member-order: bysource diff --git a/example/consumer.py b/example/consumer.py index 6290275..e956e5d 100644 --- a/example/consumer.py +++ b/example/consumer.py @@ -1,6 +1,7 @@ from kafka import KafkaConsumer -consumer = KafkaConsumer('my_topic') -print('Starting consumer...') +consumer = KafkaConsumer("my_topic") + +print("Starting consumer...") for msg in consumer: print(msg) diff --git a/example/producer.py b/example/producer.py index 874b75d..659e22a 100644 --- a/example/producer.py +++ b/example/producer.py @@ -1,6 +1,7 @@ from kafka import KafkaProducer -producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') + +producer = KafkaProducer(bootstrap_servers="127.0.0.1:9092") for _ in range(10000): - producer.send('my_topic', b'message') + producer.send("my_topic", b"message") # producer.flush() diff --git a/example/queue.py b/example/queue.py index 6278fe6..68f3a14 100644 --- a/example/queue.py +++ b/example/queue.py @@ -1,4 +1,5 @@ from kafka import KafkaProducer + from kq import Queue @@ -7,10 +8,10 @@ def add(a, b): # Set up a Kafka producer. -producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') +producer = KafkaProducer(bootstrap_servers="127.0.0.1:9092") # Set up a queue. -queue = Queue(topic='topic', producer=producer) +queue = Queue(topic="topic", producer=producer) # Enqueue a function call. job = queue.enqueue(add, 1, 2) diff --git a/example/worker.py b/example/worker.py index 1356c6f..a577d4e 100644 --- a/example/worker.py +++ b/example/worker.py @@ -2,16 +2,16 @@ import dill from kafka import KafkaConsumer + from kq import Job, Message, Worker # Set up logging formatter = logging.Formatter( - fmt='[%(asctime)s][%(levelname)s] %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' + fmt="[%(asctime)s][%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) -logger = logging.getLogger('kq.worker') +logger = logging.getLogger("kq.worker") logger.setLevel(logging.DEBUG) logger.addHandler(stream_handler) @@ -35,27 +35,27 @@ def callback(status, message, job, result, exception, stacktrace): :param stacktrace: Exception traceback, or None if there was none. :type stacktrace: str | None """ - assert status in ['invalid', 'success', 'timeout', 'failure'] + assert status in ["invalid", "success", "timeout", "failure"] assert isinstance(message, Message) - if status == 'invalid': + if status == "invalid": assert job is None assert result is None assert exception is None assert stacktrace is None - if status == 'success': + if status == "success": assert isinstance(job, Job) assert exception is None assert stacktrace is None - elif status == 'timeout': + elif status == "timeout": assert isinstance(job, Job) assert result is None assert exception is None assert stacktrace is None - elif status == 'failure': + elif status == "failure": assert isinstance(job, Job) assert result is None assert exception is not None @@ -70,21 +70,18 @@ def deserializer(serialized): :return: Deserialized job object. :rtype: kq.Job """ - assert isinstance(serialized, bytes), 'Expecting a bytes' + assert isinstance(serialized, bytes), "Expecting a bytes" return dill.loads(serialized) -if __name__ == '__main__': +if __name__ == "__main__": consumer = KafkaConsumer( - bootstrap_servers='127.0.0.1:9092', - group_id='group', + bootstrap_servers="127.0.0.1:9092", + group_id="group", enable_auto_commit=True, - auto_offset_reset='latest' + auto_offset_reset="latest", ) worker = Worker( - topic='topic', - consumer=consumer, - callback=callback, - deserializer=deserializer + topic="topic", consumer=consumer, callback=callback, deserializer=deserializer ) worker.start() diff --git a/kq/__init__.py b/kq/__init__.py index e65ac80..f549422 100644 --- a/kq/__init__.py +++ b/kq/__init__.py @@ -1,7 +1,8 @@ -__all__ = ['Job', 'Message', 'Queue', 'Worker', '__version__'] +from pkg_resources import get_distribution from kq.job import Job from kq.message import Message from kq.queue import Queue from kq.worker import Worker -from kq.version import __version__ + +__version__ = get_distribution("kq").version diff --git a/kq/job.py b/kq/job.py index 9bd6b2f..e222d9d 100644 --- a/kq/job.py +++ b/kq/job.py @@ -1,22 +1,35 @@ -__all__ = ['Job'] - -from collections import namedtuple - -# Namedtuple which encapsulates a KQ job. -Job = namedtuple( - typename='Job', - field_names=( - 'id', # Job ID (str) - 'timestamp', # Unix timestamp indicating when job was enqueued (int) - 'topic', # Name of the Kafka topic (str) - 'func', # Function to execute (callable) - 'args', # Positional arguments (list) - 'kwargs', # Keyword arguments (dict) - 'timeout', # Job timeout threshold in seconds (int | float) - 'key', # Kafka message key if any (str | None) - 'partition' # Kafka topic partition if any (str | None) - ) -) - -# noinspection PyUnresolvedReferences,PyProtectedMember -Job.__new__.__defaults__ = (None,) * len(Job._fields) +from dataclasses import dataclass +from typing import Callable, Dict, Optional, Sequence, Union + + +@dataclass(frozen=True) +class Job: + + # KQ job UUID + id: Optional[str] = None + + # Unix timestamp indicating when the job was queued. + timestamp: Optional[int] = None + + # Name of the Kafka topic. + topic: Optional[str] = None + + # Function to execute. + func: Optional[Callable] = None + + # Positional arguments for the function. + args: Optional[Sequence] = None + + # Keyword arguments for the function. + kwargs: Optional[Dict] = None + + # Job timeout threshold in seconds. + timeout: Optional[Union[float, int]] = None + + # Kafka message key. Jobs with the same keys are sent + # to the same topic partition and executed sequentially. + # Applies only when the "partition" field is not set. + key: Optional[bytes] = None + + # Kafka topic partition. If set, the "key" field is ignored. + partition: Optional[Union[float, int]] = None diff --git a/kq/message.py b/kq/message.py index 223132d..0198ae8 100644 --- a/kq/message.py +++ b/kq/message.py @@ -1,15 +1,20 @@ -__all__ = ['Message'] - -from collections import namedtuple - -# Namedtuple which encapsulates a Kafka message. -Message = namedtuple( - typename='Message', - field_names=( - 'topic', # Name of the Kafka topic (str) - 'partition', # Topic partition (int) - 'offset', # Offset (int) - 'key', # Message key (bytes | None) - 'value' # Message value (bytes) - ) -) +from dataclasses import dataclass +from typing import Optional + + +@dataclass(frozen=True) +class Message: + # Name of the Kafka topic. + topic: str + + # Kafka topic partition. + partition: int + + # Partition offset. + offset: int + + # Kafka message key. + key: Optional[bytes] + + # Kafka message payload. + value: bytes diff --git a/kq/queue.py b/kq/queue.py index bd16292..e4d006c 100644 --- a/kq/queue.py +++ b/kq/queue.py @@ -1,8 +1,7 @@ -__all__ = ['Queue'] - import logging import time import uuid +from typing import Any, Callable, Optional, Union import dill from kafka import KafkaProducer @@ -10,17 +9,17 @@ from kq.job import Job from kq.utils import ( is_dict, - is_iter, - is_number, - is_str, is_none_or_bytes, is_none_or_func, is_none_or_int, is_none_or_logger, + is_number, + is_seq, + is_str, ) -class Queue(object): +class Queue: """Enqueues function calls in Kafka topics as :doc:`jobs `. :param topic: Name of the Kafka topic. @@ -65,26 +64,27 @@ class Queue(object): http://kafka-python.rtfd.io/en/master/apidoc/KafkaProducer.html """ - def __init__(self, - topic, - producer, - serializer=None, - timeout=0, - logger=None): - - assert is_str(topic), 'topic must be a str' - assert isinstance(producer, KafkaProducer), 'bad producer instance' - assert is_none_or_func(serializer), 'serializer must be a callable' - assert is_number(timeout), 'timeout must be an int or float' - assert timeout >= 0, 'timeout must be 0 or greater' - assert is_none_or_logger(logger), 'bad logger instance' + def __init__( + self, + topic: str, + producer: KafkaProducer, + serializer: Optional[Callable] = None, + timeout=0, + logger: Optional[logging.Logger] = None, + ): + assert is_str(topic), "topic must be a str" + assert isinstance(producer, KafkaProducer), "bad producer instance" + assert is_none_or_func(serializer), "serializer must be a callable" + assert is_number(timeout), "timeout must be an int or float" + assert timeout >= 0, "timeout must be 0 or greater" + assert is_none_or_logger(logger), "bad logger instance" self._topic = topic - self._hosts = producer.config['bootstrap_servers'] + self._hosts = producer.config["bootstrap_servers"] self._producer = producer self._serializer = serializer or dill.dumps self._timeout = timeout - self._logger = logger or logging.getLogger('kq.queue') + self._logger = logger or logging.getLogger("kq.queue") self._default_enqueue_spec = EnqueueSpec( topic=self._topic, producer=self._producer, @@ -92,7 +92,7 @@ def __init__(self, logger=self._logger, timeout=self._timeout, key=None, - partition=None + partition=None, ) def __repr__(self): @@ -101,7 +101,7 @@ def __repr__(self): :return: String representation of the queue. :rtype: str """ - return 'Queue(hosts={}, topic={})'.format(self._hosts, self._topic) + return "Queue(hosts={}, topic={})".format(self._hosts, self._topic) def __del__(self): # pragma: no covers # noinspection PyBroadException @@ -155,7 +155,7 @@ def timeout(self): """ return self._timeout - def enqueue(self, func, *args, **kwargs): + def enqueue(self, func: Callable, *args: Any, **kwargs: Any): """Enqueue a function call or a :doc:`job `. :param func: Function or a :doc:`job ` object. Must be @@ -204,7 +204,12 @@ def enqueue(self, func, *args, **kwargs): """ return self._default_enqueue_spec.enqueue(func, *args, **kwargs) - def using(self, timeout=None, key=None, partition=None): + def using( + self, + timeout: Optional[Union[float, int]] = None, + key: Optional[bytes] = None, + partition: Optional[int] = None, + ): """Set enqueue specifications such as timeout, key and partition. :param timeout: Job timeout threshold in seconds. If not set, default @@ -259,34 +264,35 @@ def using(self, timeout=None, key=None, partition=None): logger=self._logger, timeout=timeout or self._timeout, key=key, - partition=partition + partition=partition, ) class EnqueueSpec(object): - __slots__ = [ - '_topic', - '_producer', - '_serializer', - '_logger', - '_timeout', - '_key', - '_partition', - 'delay' + "_topic", + "_producer", + "_serializer", + "_logger", + "_timeout", + "_key", + "_partition", + "delay", ] - def __init__(self, - topic, - producer, - serializer, - logger, - timeout, - key, - partition): - assert is_number(timeout), 'timeout must be an int or float' - assert is_none_or_bytes(key), 'key must be a bytes' - assert is_none_or_int(partition), 'partition must be an int' + def __init__( + self, + topic: str, + producer: KafkaProducer, + serializer: Callable, + logger: logging.Logger, + timeout: Union[float, int], + key: Optional[bytes], + partition: Optional[int], + ): + assert is_number(timeout), "timeout must be an int or float" + assert is_none_or_bytes(key), "key must be a bytes" + assert is_none_or_int(partition), "partition must be an int" self._topic = topic self._producer = producer @@ -296,12 +302,12 @@ def __init__(self, self._key = key self._partition = partition - def enqueue(self, obj, *args, **kwargs): + def enqueue(self, obj: Union[Callable, Job], *args: Any, **kwargs: Any): """Enqueue a function call or :doc:`job` instance. - :param func: Function or :doc:`job `. Must be serializable and + :param obj: Function or :doc:`job `. Must be serializable and importable by :doc:`worker ` processes. - :type func: callable | :doc:`kq.Job ` + :type obj: callable | :doc:`kq.Job ` :param args: Positional arguments for the function. Ignored if **func** is a :doc:`job ` object. :param kwargs: Keyword arguments for the function. Ignored if **func** @@ -312,23 +318,32 @@ def enqueue(self, obj, *args, **kwargs): timestamp = int(time.time() * 1000) if isinstance(obj, Job): - job_id = uuid.uuid4().hex if obj.id is None else obj.id + if obj.id is None: + job_id = uuid.uuid4().hex + else: + assert is_str(obj.id), "Job.id must be a str" + job_id = obj.id + + if obj.args is None: + args = tuple() + else: + assert is_seq(obj.args), "Job.args must be a list or tuple" + args = tuple(obj.args) + + assert callable(obj.func), "Job.func must be a callable" + func = obj.func - args = tuple() if obj.args is None else obj.args kwargs = {} if obj.kwargs is None else obj.kwargs timeout = self._timeout if obj.timeout is None else obj.timeout key = self._key if obj.key is None else obj.key part = self._partition if obj.partition is None else obj.partition - assert is_str(job_id), 'Job.id must be a str' - assert callable(func), 'Job.func must be a callable' - assert is_iter(args), 'Job.args must be a list or tuple' - assert is_dict(kwargs), 'Job.kwargs must be a dict' - assert is_number(timeout), 'Job.timeout must be an int or float' - assert is_none_or_bytes(key), 'Job.key must be a bytes' - assert is_none_or_int(part), 'Job.partition must be an int' + assert is_dict(kwargs), "Job.kwargs must be a dict" + assert is_number(timeout), "Job.timeout must be an int or float" + assert is_none_or_bytes(key), "Job.key must be a bytes" + assert is_none_or_int(part), "Job.partition must be an int" else: - assert callable(obj), 'first argument must be a callable' + assert callable(obj), "first argument must be a callable" job_id = uuid.uuid4().hex func = obj args = args @@ -346,15 +361,15 @@ def enqueue(self, obj, *args, **kwargs): kwargs=kwargs, timeout=timeout, key=key, - partition=part + partition=part, ) - self._logger.info('Enqueueing {} ...'.format(job)) + self._logger.info("Enqueueing {} ...".format(job)) self._producer.send( self._topic, value=self._serializer(job), key=self._serializer(key) if key else None, partition=part, - timestamp_ms=timestamp + timestamp_ms=timestamp, ) self._producer.flush() return job diff --git a/kq/utils.py b/kq/utils.py index 98ce8fc..0473e82 100644 --- a/kq/utils.py +++ b/kq/utils.py @@ -1,8 +1,9 @@ import logging -from inspect import ismethod, isfunction, isbuiltin, isclass +from inspect import isbuiltin, isclass, isfunction, ismethod +from typing import Any -def get_call_repr(func, *args, **kwargs): +def get_call_repr(func: Any, *args: Any, **kwargs: Any): """Return the string representation of the function call. :param func: A callable (e.g. function, method). @@ -14,16 +15,16 @@ def get_call_repr(func, *args, **kwargs): """ # Functions, builtins and methods if ismethod(func) or isfunction(func) or isbuiltin(func): - func_repr = '{}.{}'.format(func.__module__, func.__qualname__) + func_repr = "{}.{}".format(func.__module__, func.__qualname__) # A callable class instance - elif not isclass(func) and hasattr(func, '__call__'): - func_repr = '{}.{}'.format(func.__module__, func.__class__.__name__) + elif not isclass(func) and hasattr(func, "__call__"): + func_repr = "{}.{}".format(func.__module__, func.__class__.__name__) else: func_repr = repr(func) args_reprs = [repr(arg) for arg in args] - kwargs_reprs = [k + '=' + repr(v) for k, v in sorted(kwargs.items())] - return '{}({})'.format(func_repr, ', '.join(args_reprs + kwargs_reprs)) + kwargs_reprs = [k + "=" + repr(v) for k, v in sorted(kwargs.items())] + return "{}({})".format(func_repr, ", ".join(args_reprs + kwargs_reprs)) def is_none_or_logger(obj): @@ -54,5 +55,5 @@ def is_dict(obj): return isinstance(obj, dict) -def is_iter(obj): +def is_seq(obj): return isinstance(obj, (list, tuple)) diff --git a/kq/version.py b/kq/version.py deleted file mode 100644 index 3f39079..0000000 --- a/kq/version.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = '2.0.1' diff --git a/kq/worker.py b/kq/worker.py index e39155d..354e8c8 100644 --- a/kq/worker.py +++ b/kq/worker.py @@ -1,24 +1,19 @@ -__all__ = ['Worker'] - import _thread import logging import math import threading import traceback +from typing import Any, Callable, Optional import dill from kafka import KafkaConsumer +from kq import Job from kq.message import Message -from kq.utils import ( - get_call_repr, - is_str, - is_none_or_func, - is_none_or_logger -) +from kq.utils import get_call_repr, is_none_or_func, is_none_or_logger, is_str -class Worker(object): +class Worker: """Fetches :doc:`jobs ` from Kafka topics and processes them. :param topic: Name of the Kafka topic. @@ -64,27 +59,28 @@ class Worker(object): http://kafka-python.rtfd.io/en/master/apidoc/KafkaConsumer.html """ - def __init__(self, - topic, - consumer, - callback=None, - deserializer=None, - logger=None): - - assert is_str(topic), 'topic must be a str' - assert isinstance(consumer, KafkaConsumer), 'bad consumer instance' - assert consumer.config['group_id'], 'consumer must have group_id' - assert is_none_or_func(callback), 'callback must be a callable' - assert is_none_or_func(deserializer), 'deserializer must be a callable' - assert is_none_or_logger(logger), 'bad logger instance' + def __init__( + self, + topic: str, + consumer: KafkaConsumer, + callback: Optional[Callable] = None, + deserializer: Optional[Callable] = None, + logger: Optional[logging.Logger] = None, + ): + assert is_str(topic), "topic must be a str" + assert isinstance(consumer, KafkaConsumer), "bad consumer instance" + assert consumer.config["group_id"], "consumer must have group_id" + assert is_none_or_func(callback), "callback must be a callable" + assert is_none_or_func(deserializer), "deserializer must be a callable" + assert is_none_or_logger(logger), "bad logger instance" self._topic = topic - self._hosts = consumer.config['bootstrap_servers'] - self._group = consumer.config['group_id'] + self._hosts = consumer.config["bootstrap_servers"] + self._group = consumer.config["group_id"] self._consumer = consumer self._callback = callback self._deserializer = deserializer or dill.loads - self._logger = logger or logging.getLogger('kq.worker') + self._logger = logger or logging.getLogger("kq.worker") def __repr__(self): """Return the string representation of the worker. @@ -92,7 +88,7 @@ def __repr__(self): :return: String representation of the worker. :rtype: str """ - return 'Worker(hosts={}, topic={}, group={})'.format( + return "Worker(hosts={}, topic={}, group={})".format( self._hosts, self._topic, self._group ) @@ -103,7 +99,15 @@ def __del__(self): # pragma: no cover except Exception: pass - def _execute_callback(self, status, message, job, res, err, stacktrace): + def _execute_callback( + self, + status: str, + message: Message, + job: Optional[Job], + res: Any, + err: Optional[Exception], + stacktrace: Optional[str], + ): """Execute the callback. :param status: Job status. Possible values are "invalid" (job could not @@ -124,11 +128,10 @@ def _execute_callback(self, status, message, job, res, err, stacktrace): """ if self._callback is not None: try: - self._logger.info('Executing callback ...') + self._logger.info("Executing callback ...") self._callback(status, message, job, res, err, stacktrace) except Exception as e: - self._logger.exception( - 'Callback raised an exception: {}'.format(e)) + self._logger.exception("Callback raised an exception: {}".format(e)) def _process_message(self, msg): """De-serialize the message and execute the job. @@ -137,17 +140,19 @@ def _process_message(self, msg): :type msg: :doc:`kq.Message ` """ self._logger.info( - 'Processing Message(topic={}, partition={}, offset={}) ...' - .format(msg.topic, msg.partition, msg.offset)) + "Processing Message(topic={}, partition={}, offset={}) ...".format( + msg.topic, msg.partition, msg.offset + ) + ) try: job = self._deserializer(msg.value) job_repr = get_call_repr(job.func, *job.args, **job.kwargs) except Exception as err: - self._logger.exception('Job was invalid: {}'.format(err)) - self._execute_callback('invalid', msg, None, None, None, None) + self._logger.exception("Job was invalid: {}".format(err)) + self._execute_callback("invalid", msg, None, None, None, None) else: - self._logger.info('Executing job {}: {}'.format(job.id, job_repr)) + self._logger.info("Executing job {}: {}".format(job.id, job_repr)) if job.timeout: timer = threading.Timer(job.timeout, _thread.interrupt_main) @@ -157,17 +162,15 @@ def _process_message(self, msg): try: res = job.func(*job.args, **job.kwargs) except KeyboardInterrupt: - self._logger.error( - 'Job {} timed out or was interrupted'.format(job.id)) - self._execute_callback('timeout', msg, job, None, None, None) + self._logger.error("Job {} timed out or was interrupted".format(job.id)) + self._execute_callback("timeout", msg, job, None, None, None) except Exception as err: - self._logger.exception( - 'Job {} raised an exception:'.format(job.id)) + self._logger.exception("Job {} raised an exception:".format(job.id)) tb = traceback.format_exc() - self._execute_callback('failure', msg, job, None, err, tb) + self._execute_callback("failure", msg, job, None, err, tb) else: - self._logger.info('Job {} returned: {}'.format(job.id, res)) - self._execute_callback('success', msg, job, res, None, None) + self._logger.info("Job {} returned: {}".format(job.id, res)) + self._execute_callback("success", msg, job, res, None, None) finally: if timer is not None: timer.cancel() @@ -238,7 +241,7 @@ def start(self, max_messages=math.inf, commit_offsets=True): :return: Total number of messages processed. :rtype: int """ - self._logger.info('Starting {} ...'.format(self)) + self._logger.info("Starting {} ...".format(self)) self._consumer.unsubscribe() self._consumer.subscribe([self.topic]) @@ -252,7 +255,7 @@ def start(self, max_messages=math.inf, commit_offsets=True): partition=record.partition, offset=record.offset, key=record.key, - value=record.value + value=record.value, ) self._process_message(message) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5dabbe6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = [ + "setuptools>=42", + "wheel", + "setuptools_scm[toml]>=3.4" +] +build-backend = "setuptools.build_meta" + +[tool.coverage.run] +omit = ["kq/version.py"] + +[tool.isort] +profile = "black" + +[tool.pytest.ini_options] +addopts = "-s -vv -p no:warnings" +minversion = "6.0" +testpaths = ["tests"] + +[tool.setuptools_scm] +write_to = "kq/version.py" diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index c7b487d..0000000 --- a/pytest.ini +++ /dev/null @@ -1,4 +0,0 @@ -[pytest] -python_files = tests.py test_*.py *_tests.py -addopts = -s -vv -p no:warnings -norecursedirs = venv htmlcov build dist .idea .git kq.egg-info diff --git a/setup.cfg b/setup.cfg index 2a9acf1..32a3ba6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,8 @@ -[bdist_wheel] -universal = 1 +[flake8] +max-line-length = 88 +extend-ignore = E203, E741, W503 +exclude =.git .idea .*_cache dist htmlcov venv +per-file-ignores = __init__.py:F401 conf.py:E402 + +[mypy] +ignore_missing_imports = True diff --git a/setup.py b/setup.py index 4afe4d4..9a93945 100644 --- a/setup.py +++ b/setup.py @@ -1,44 +1,58 @@ -from setuptools import setup, find_packages +from setuptools import find_packages, setup -version = {} -with open('./kq/version.py') as fp: - exec(fp.read(), version) - -with open('./README.rst') as fp: +with open("./README.md") as fp: description = fp.read() setup( - name='kq', - description='Kafka Job Queue for Python', - version=version['__version__'], + name="kq", + description="Kafka Job Queue for Python", long_description=description, - author='Joohwan Oh', - author_email='joohwan.oh@outlook.com', - url='https://github.com/joowani/kq', - packages=find_packages(exclude=['tests']), + long_description_content_type="text/markdown", + author="Joohwan Oh", + author_email="joohwan.oh@outlook.com", + url="https://github.com/joowani/kq", + packages=find_packages(exclude=["tests"]), include_package_data=True, - license='MIT', + python_requires=">=3.6", + license="MIT", + use_scm_version=True, + setup_requires=["setuptools_scm"], install_requires=[ - 'dill>=0.3.2', - 'kafka-python>=2.0.0', + "dataclasses", + "dill>=0.3.2", + "kafka-python>=2.0.0", + "setuptools>=42", + "setuptools_scm[toml]>=3.4", ], - tests_require=['pytest', 'mock', 'flake8'], + extras_require={ + "dev": [ + "black", + "flake8", + "isort>=5.0.0", + "mypy", + "pytest>=6.0.0", + "pytest-cov>=2.0.0", + "recommonmark", + "sphinx", + "sphinx_rtd_theme", + ], + }, classifiers=[ - 'Intended Audience :: Developers', - 'Intended Audience :: End Users/Desktop', - 'Intended Audience :: Information Technology', - 'Intended Audience :: Science/Research', - 'Intended Audience :: System Administrators', - 'License :: OSI Approved :: MIT License', - 'Operating System :: POSIX', - 'Operating System :: MacOS', - 'Operating System :: Unix', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Topic :: Internet', - 'Topic :: Scientific/Engineering', - 'Topic :: System :: Distributed Computing', - 'Topic :: System :: Systems Administration', - 'Topic :: System :: Monitoring', - ] + "Intended Audience :: Developers", + "Intended Audience :: End Users/Desktop", + "Intended Audience :: Information Technology", + "Intended Audience :: Science/Research", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX", + "Operating System :: MacOS", + "Operating System :: Unix", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Topic :: Internet", + "Topic :: Scientific/Engineering", + "Topic :: System :: Distributed Computing", + "Topic :: System :: Systems Administration", + "Topic :: System :: Monitoring", + ], ) diff --git a/tests/conftest.py b/tests/conftest.py index 2e512f7..4a7d354 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,7 @@ import logging import os -import uuid import time +import uuid import dill import pytest @@ -10,18 +10,18 @@ from kq import Job, Message, Queue, Worker test_dir = os.getcwd() -if not test_dir.endswith('tests'): - test_dir = os.path.join(test_dir, 'tests') -log_file = os.path.join(test_dir, 'output.log') +if not test_dir.endswith("tests"): + test_dir = os.path.join(test_dir, "tests") +log_file = os.path.join(test_dir, "output.log") -handler = logging.FileHandler(log_file, mode='w') -handler.setFormatter(logging.Formatter('[%(levelname)s] %(message)s')) +handler = logging.FileHandler(log_file, mode="w") +handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) -queue_logger = logging.getLogger('kq.queue') +queue_logger = logging.getLogger("kq.queue") queue_logger.setLevel(logging.DEBUG) queue_logger.addHandler(handler) -worker_logger = logging.getLogger('kq.worker') +worker_logger = logging.getLogger("kq.worker") worker_logger.setLevel(logging.DEBUG) worker_logger.addHandler(handler) @@ -62,34 +62,34 @@ class Callback(object): """Callback which can be set to succeed or fail.""" def __init__(self): - self.logger = logging.getLogger('kq.worker') + self.logger = logging.getLogger("kq.worker") self.succeed = True def __call__(self, status, message, job, result, exception, stacktrace): if not self.succeed: raise RuntimeError - assert status in ['invalid', 'success', 'timeout', 'failure'] + assert status in ["invalid", "success", "timeout", "failure"] assert isinstance(message, Message) - if status == 'invalid': + if status == "invalid": assert job is None assert result is None assert exception is None assert stacktrace is None - if status == 'success': + if status == "success": assert isinstance(job, Job) assert exception is None assert stacktrace is None - elif status == 'timeout': + elif status == "timeout": assert isinstance(job, Job) assert result is None assert exception is None assert stacktrace is None - elif status == 'failure': + elif status == "failure": assert isinstance(job, Job) assert result is None assert exception is not None @@ -116,9 +116,9 @@ class LogAccessor(object): @property def lines(self): time.sleep(0.5) - with open(log_file, 'r') as fp: + with open(log_file, "r") as fp: lines = fp.read().splitlines() - return [line for line in lines if line.startswith('[')] + return [line for line in lines if line.startswith("[")] @property def last_line(self): @@ -129,97 +129,97 @@ def last_lines(self, line_count=1): def pytest_addoption(parser): - parser.addoption('--host', action='store', default='127.0.0.1') - parser.addoption('--port', action='store', default='9092') + parser.addoption("--host", action="store", default="127.0.0.1") + parser.addoption("--port", action="store", default="9092") -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def func(): return success_function -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def success_func(): return success_function -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def failure_func(): return failure_function -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def timeout_func(): return timeout_function -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def callable_cls(): return Callable -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def log(): return LogAccessor() -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def callback(): return Callback() -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def deserializer(): return Deserializer() -@pytest.fixture(scope='session', autouse=False) +@pytest.fixture(scope="session", autouse=False) def hosts(pytestconfig): - host = pytestconfig.getoption('host') - port = pytestconfig.getoption('port') - return host + ':' + port + host = pytestconfig.getoption("host") + port = pytestconfig.getoption("port") + return host + ":" + port -@pytest.fixture(scope='module', autouse=False) +@pytest.fixture(scope="module", autouse=False) def topic(): return uuid.uuid4().hex -@pytest.fixture(scope='module', autouse=False) +@pytest.fixture(scope="module", autouse=False) def group(): return uuid.uuid4().hex # noinspection PyShadowingNames -@pytest.fixture(scope='module', autouse=False) +@pytest.fixture(scope="module", autouse=False) def producer(hosts): return KafkaProducer(bootstrap_servers=hosts) # noinspection PyShadowingNames -@pytest.fixture(scope='module', autouse=False) +@pytest.fixture(scope="module", autouse=False) def consumer(hosts, group): return KafkaConsumer( bootstrap_servers=hosts, group_id=group, - auto_offset_reset='earliest', + auto_offset_reset="earliest", ) # noinspection PyShadowingNames -@pytest.fixture(scope='module', autouse=False) +@pytest.fixture(scope="module", autouse=False) def queue(topic, producer): return Queue(topic, producer) # noinspection PyShadowingNames -@pytest.fixture(scope='module', autouse=False) +@pytest.fixture(scope="module", autouse=False) def worker(topic, consumer, callback, deserializer): return Worker(topic, consumer, callback, deserializer) # noinspection PyShadowingNames -@pytest.fixture(scope='function', autouse=True) +@pytest.fixture(scope="function", autouse=True) def before(callback, deserializer): callback.succeed = True deserializer.succeed = True diff --git a/tests/test_job.py b/tests/test_job.py deleted file mode 100644 index 13c10d4..0000000 --- a/tests/test_job.py +++ /dev/null @@ -1,37 +0,0 @@ -from kq import Job - - -def test_job_init_with_args(): - job = Job(1, 2, 3, 4, 5, 6, 7, 8, 9) - assert job.id == 1 - assert job.timestamp == 2 - assert job.topic == 3 - assert job.func == 4 - assert job.args == 5 - assert job.kwargs == 6 - assert job.timeout == 7 - assert job.key == 8 - assert job.partition == 9 - - -def test_job_init_with_kwargs(): - job = Job( - id=1, - timestamp=2, - topic=3, - func=4, - args=5, - kwargs=6, - timeout=7, - key=8, - partition=9 - ) - assert job.id == 1 - assert job.timestamp == 2 - assert job.topic == 3 - assert job.func == 4 - assert job.args == 5 - assert job.kwargs == 6 - assert job.timeout == 7 - assert job.key == 8 - assert job.partition == 9 diff --git a/tests/test_message.py b/tests/test_message.py deleted file mode 100644 index 651eadc..0000000 --- a/tests/test_message.py +++ /dev/null @@ -1,25 +0,0 @@ -from kq import Message - - -def test_message_init_with_args(): - job = Message(1, 2, 3, 4, 5) - assert job.topic == 1 - assert job.partition == 2 - assert job.offset == 3 - assert job.key == 4 - assert job.value == 5 - - -def test_message_init_with_kwargs(): - job = Message( - topic=1, - partition=2, - offset=3, - key=4, - value=5, - ) - assert job.topic == 1 - assert job.partition == 2 - assert job.offset == 3 - assert job.key == 4 - assert job.value == 5 diff --git a/tests/test_queue.py b/tests/test_queue.py index 62a6ea0..1bcdda8 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -10,7 +10,7 @@ def test_queue_properties(queue, hosts, topic): assert hosts in repr(queue) assert topic in repr(queue) - assert queue.producer.config['bootstrap_servers'] == hosts + assert queue.producer.config["bootstrap_servers"] == hosts assert isinstance(queue.hosts, str) and queue.hosts == hosts assert isinstance(queue.topic, str) and queue.topic == topic assert isinstance(queue.producer, KafkaProducer) @@ -22,27 +22,27 @@ def test_queue_properties(queue, hosts, topic): def test_queue_initialization_with_bad_args(producer): with pytest.raises(AssertionError) as e: Queue(topic=True, producer=producer) - assert str(e.value) == 'topic must be a str' + assert str(e.value) == "topic must be a str" with pytest.raises(AssertionError) as e: - Queue(topic='topic', producer='bar') - assert str(e.value) == 'bad producer instance' + Queue(topic="topic", producer="bar") + assert str(e.value) == "bad producer instance" with pytest.raises(AssertionError) as e: - Queue(topic='topic', producer=producer, serializer='baz') - assert str(e.value) == 'serializer must be a callable' + Queue(topic="topic", producer=producer, serializer="baz") + assert str(e.value) == "serializer must be a callable" with pytest.raises(AssertionError) as e: - Queue(topic='topic', producer=producer, timeout='bar') - assert str(e.value) == 'timeout must be an int or float' + Queue(topic="topic", producer=producer, timeout="bar") + assert str(e.value) == "timeout must be an int or float" with pytest.raises(AssertionError) as e: - Queue(topic='topic', producer=producer, timeout=-1) - assert str(e.value) == 'timeout must be 0 or greater' + Queue(topic="topic", producer=producer, timeout=-1) + assert str(e.value) == "timeout must be 0 or greater" with pytest.raises(AssertionError) as e: - Queue(topic='topic', producer=producer, logger=1) - assert str(e.value) == 'bad logger instance' + Queue(topic="topic", producer=producer, logger=1) + assert str(e.value) == "bad logger instance" def test_queue_enqueue_function(queue, func, topic, log): @@ -57,11 +57,11 @@ def test_queue_enqueue_function(queue, func, topic, log): assert job.timeout == 0 assert job.key is None assert job.partition is None - assert log.last_line == '[INFO] Enqueueing {} ...'.format(job) + assert log.last_line == "[INFO] Enqueueing {} ...".format(job) def test_queue_enqueue_function_with_spec(func, queue, topic, log): - job = queue.using(key=b'foo', partition=0).enqueue(func, 3, 4) + job = queue.using(key=b"foo", partition=0).enqueue(func, 3, 4) assert isinstance(job, Job) assert job.id is not None assert job.timestamp is not None @@ -70,28 +70,28 @@ def test_queue_enqueue_function_with_spec(func, queue, topic, log): assert job.args == (3, 4) assert job.kwargs == {} assert job.timeout == 0 - assert job.key == b'foo' + assert job.key == b"foo" assert job.partition == 0 - assert log.last_line == '[INFO] Enqueueing {} ...'.format(job) + assert log.last_line == "[INFO] Enqueueing {} ...".format(job) # noinspection PyTypeChecker def test_queue_enqueue_function_with_bad_args(func, queue): with pytest.raises(AssertionError) as e: queue.enqueue(1) - assert str(e.value) == 'first argument must be a callable' + assert str(e.value) == "first argument must be a callable" with pytest.raises(AssertionError) as e: - queue.using(timeout='foo').enqueue(func) - assert str(e.value) == 'timeout must be an int or float' + queue.using(timeout="foo").enqueue(func) + assert str(e.value) == "timeout must be an int or float" with pytest.raises(AssertionError) as e: - queue.using(key='foo').enqueue(func) - assert str(e.value) == 'key must be a bytes' + queue.using(key="foo").enqueue(func) + assert str(e.value) == "key must be a bytes" with pytest.raises(AssertionError) as e: - queue.using(partition='foo').enqueue(func) - assert str(e.value) == 'partition must be an int' + queue.using(partition="foo").enqueue(func) + assert str(e.value) == "partition must be an int" def test_queue_enqueue_job_fully_populated(func, queue, topic, log): @@ -101,13 +101,13 @@ def test_queue_enqueue_job_fully_populated(func, queue, topic, log): job = Job( id=job_id, timestamp=timestamp, - topic='topic', + topic="topic", func=func, args=[0], - kwargs={'b': 1}, + kwargs={"b": 1}, timeout=10, - key=b'bar', - partition=0 + key=b"bar", + partition=0, ) job = queue.enqueue(job) assert isinstance(job, Job) @@ -115,16 +115,16 @@ def test_queue_enqueue_job_fully_populated(func, queue, topic, log): assert job.timestamp == timestamp assert job.topic == topic assert job.func == func - assert job.args == [0] - assert job.kwargs == {'b': 1} + assert job.args == (0,) + assert job.kwargs == {"b": 1} assert job.timeout == 10 - assert job.key == b'bar' + assert job.key == b"bar" assert job.partition == 0 - assert log.last_line.startswith('[INFO] Enqueueing {} ...'.format(job)) + assert log.last_line.startswith("[INFO] Enqueueing {} ...".format(job)) def test_queue_enqueue_job_partially_populated(func, queue, topic, log): - job = Job(func=func, args=[1], kwargs={'b': 1}) + job = Job(func=func, args=[1], kwargs={"b": 1}) job = queue.enqueue(job) assert isinstance(job, Job) @@ -132,12 +132,12 @@ def test_queue_enqueue_job_partially_populated(func, queue, topic, log): assert isinstance(job.timestamp, int) assert job.topic == topic assert job.func == func - assert job.args == [1] - assert job.kwargs == {'b': 1} + assert job.args == (1,) + assert job.kwargs == {"b": 1} assert job.timeout == 0 assert job.key is None assert job.partition is None - assert log.last_line.startswith('[INFO] Enqueueing {} ...'.format(job)) + assert log.last_line.startswith("[INFO] Enqueueing {} ...".format(job)) def test_queue_enqueue_job_with_spec(func, queue, topic, log): @@ -147,41 +147,41 @@ def test_queue_enqueue_job_with_spec(func, queue, topic, log): job = Job( id=job_id, timestamp=timestamp, - topic='topic', + topic="topic", func=func, args=[0], - kwargs={'b': 1}, + kwargs={"b": 1}, timeout=10, - key=b'bar', - partition=0 + key=b"bar", + partition=0, ) # Job should override the spec. - job = queue.using(key=b'foo', timeout=5, partition=5).enqueue(job) + job = queue.using(key=b"foo", timeout=5, partition=5).enqueue(job) assert isinstance(job, Job) assert job.id == job_id - assert job.timestamp == timestamp + assert job.timestamp >= timestamp assert job.topic == topic assert job.func == func - assert job.args == [0] - assert job.kwargs == {'b': 1} + assert job.args == (0,) + assert job.kwargs == {"b": 1} assert job.timeout == 10 - assert job.key == b'bar' + assert job.key == b"bar" assert job.partition == 0 - assert log.last_line.startswith('[INFO] Enqueueing {} ...'.format(job)) + assert log.last_line.startswith("[INFO] Enqueueing {} ...".format(job)) def test_queue_enqueue_job_with_bad_args(func, queue, topic): valid_job_kwargs = { - 'id': uuid.uuid4().hex, - 'timestamp': int(time.time() * 1000), - 'topic': topic, - 'func': func, - 'args': [0], - 'kwargs': {'b': 1}, - 'timeout': 10, - 'key': b'foo', - 'partition': 0 + "id": uuid.uuid4().hex, + "timestamp": int(time.time() * 1000), + "topic": topic, + "func": func, + "args": [0], + "kwargs": {"b": 1}, + "timeout": 10, + "key": b"foo", + "partition": 0, } def build_job(**kwargs): @@ -191,28 +191,28 @@ def build_job(**kwargs): with pytest.raises(AssertionError) as e: queue.enqueue(build_job(id=1)) - assert str(e.value) == 'Job.id must be a str' + assert str(e.value) == "Job.id must be a str" with pytest.raises(AssertionError) as e: queue.enqueue(build_job(func=1)) - assert str(e.value) == 'Job.func must be a callable' + assert str(e.value) == "Job.func must be a callable" with pytest.raises(AssertionError) as e: queue.enqueue(build_job(args=1)) - assert str(e.value) == 'Job.args must be a list or tuple' + assert str(e.value) == "Job.args must be a list or tuple" with pytest.raises(AssertionError) as e: queue.enqueue(build_job(kwargs=1)) - assert str(e.value) == 'Job.kwargs must be a dict' + assert str(e.value) == "Job.kwargs must be a dict" with pytest.raises(AssertionError) as e: - queue.enqueue(build_job(timeout='foo')) - assert str(e.value) == 'Job.timeout must be an int or float' + queue.enqueue(build_job(timeout="foo")) + assert str(e.value) == "Job.timeout must be an int or float" with pytest.raises(AssertionError) as e: - queue.enqueue(build_job(key='foo')) - assert str(e.value) == 'Job.key must be a bytes' + queue.enqueue(build_job(key="foo")) + assert str(e.value) == "Job.key must be a bytes" with pytest.raises(AssertionError) as e: - queue.enqueue(build_job(partition='foo')) - assert str(e.value) == 'Job.partition must be an int' + queue.enqueue(build_job(partition="foo")) + assert str(e.value) == "Job.partition must be an int" diff --git a/tests/test_utils.py b/tests/test_utils.py index a7a7c40..5dbb586 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,69 +2,69 @@ def test_call_repr_callable_types(success_func, callable_cls): - expected = 'None()' + expected = "None()" assert expected == get_call_repr(None) - expected = 'builtins.isinstance()' + expected = "builtins.isinstance()" assert expected == get_call_repr(isinstance) - expected = 'tests.conftest.success_function()' + expected = "tests.conftest.success_function()" assert expected == get_call_repr(success_func) - expected = 'tests.conftest.success_function()' + expected = "tests.conftest.success_function()" assert expected == get_call_repr(callable_cls.unbound_method) - expected = 'tests.conftest.Callable.static_method()' + expected = "tests.conftest.Callable.static_method()" assert expected == get_call_repr(callable_cls.static_method) - expected = 'tests.conftest.Callable.instance_method()' + expected = "tests.conftest.Callable.instance_method()" assert expected == get_call_repr(callable_cls().instance_method) - expected = 'tests.conftest.Callable()' + expected = "tests.conftest.Callable()" assert expected == get_call_repr(callable_cls()) def test_call_repr_simple_args(failure_func): - expected = 'tests.conftest.failure_function(1)' + expected = "tests.conftest.failure_function(1)" assert expected == get_call_repr(failure_func, 1) - expected = 'tests.conftest.failure_function(1, 2)' + expected = "tests.conftest.failure_function(1, 2)" assert expected == get_call_repr(failure_func, 1, 2) - expected = 'tests.conftest.failure_function(1, b=2)' + expected = "tests.conftest.failure_function(1, b=2)" assert expected == get_call_repr(failure_func, 1, b=2) - expected = 'tests.conftest.failure_function(a=1)' + expected = "tests.conftest.failure_function(a=1)" assert expected == get_call_repr(failure_func, a=1) - expected = 'tests.conftest.failure_function(b=1)' + expected = "tests.conftest.failure_function(b=1)" assert expected == get_call_repr(failure_func, b=1) - expected = 'tests.conftest.failure_function(a=1, b=2)' + expected = "tests.conftest.failure_function(a=1, b=2)" assert expected == get_call_repr(failure_func, a=1, b=2) - expected = 'tests.conftest.failure_function(a=1, b=2)' + expected = "tests.conftest.failure_function(a=1, b=2)" assert expected == get_call_repr(failure_func, b=2, a=1) def test_call_repr_complex_args(timeout_func): - expected = 'tests.conftest.timeout_function([1])' + expected = "tests.conftest.timeout_function([1])" assert expected == get_call_repr(timeout_func, [1]) - expected = 'tests.conftest.timeout_function([1], [2])' + expected = "tests.conftest.timeout_function([1], [2])" assert expected == get_call_repr(timeout_func, [1], [2]) - expected = 'tests.conftest.timeout_function([1], b=[2])' + expected = "tests.conftest.timeout_function([1], b=[2])" assert expected == get_call_repr(timeout_func, [1], b=[2]) - expected = 'tests.conftest.timeout_function(a=[1])' + expected = "tests.conftest.timeout_function(a=[1])" assert expected == get_call_repr(timeout_func, a=[1]) - expected = 'tests.conftest.timeout_function(b=[1])' + expected = "tests.conftest.timeout_function(b=[1])" assert expected == get_call_repr(timeout_func, b=[1]) - expected = 'tests.conftest.timeout_function(a=[1], b=[1, 2])' + expected = "tests.conftest.timeout_function(a=[1], b=[1, 2])" assert expected == get_call_repr(timeout_func, a=[1], b=[1, 2]) - expected = 'tests.conftest.timeout_function(a=[1], b=[1, 2])' + expected = "tests.conftest.timeout_function(a=[1], b=[1, 2])" assert expected == get_call_repr(timeout_func, b=[1, 2], a=[1]) diff --git a/tests/test_version.py b/tests/test_version.py deleted file mode 100644 index 5cdcdff..0000000 --- a/tests/test_version.py +++ /dev/null @@ -1,9 +0,0 @@ -from kq.version import __version__ - - -def test_version(): - assert isinstance(__version__, str) - - version_parts = __version__.split('.') - assert len(version_parts) == 3 - assert all(part.isdigit() for part in version_parts) diff --git a/tests/test_worker.py b/tests/test_worker.py index 27f7917..4dd2714 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,5 +1,4 @@ import pytest - from kafka import KafkaConsumer from kq import Worker @@ -10,8 +9,8 @@ def test_worker_properties(worker, hosts, topic, group): assert topic in repr(worker) assert group in repr(worker) - assert worker.consumer.config['bootstrap_servers'] == hosts - assert worker.consumer.config['group_id'] == group + assert worker.consumer.config["bootstrap_servers"] == hosts + assert worker.consumer.config["group_id"] == group assert isinstance(worker.hosts, str) and worker.hosts == hosts assert isinstance(worker.topic, str) and worker.topic == topic @@ -25,28 +24,28 @@ def test_worker_properties(worker, hosts, topic, group): def test_worker_initialization_with_bad_args(hosts, consumer): with pytest.raises(AssertionError) as e: Worker(topic=True, consumer=consumer) - assert str(e.value) == 'topic must be a str' + assert str(e.value) == "topic must be a str" with pytest.raises(AssertionError) as e: - Worker(topic='topic', consumer='bar') - assert str(e.value) == 'bad consumer instance' + Worker(topic="topic", consumer="bar") + assert str(e.value) == "bad consumer instance" with pytest.raises(AssertionError) as e: bad_consumer = KafkaConsumer(bootstrap_servers=hosts) - Worker(topic='topic', consumer=bad_consumer) - assert str(e.value) == 'consumer must have group_id' + Worker(topic="topic", consumer=bad_consumer) + assert str(e.value) == "consumer must have group_id" with pytest.raises(AssertionError) as e: - Worker(topic='topic', consumer=consumer, callback=1) - assert str(e.value) == 'callback must be a callable' + Worker(topic="topic", consumer=consumer, callback=1) + assert str(e.value) == "callback must be a callable" with pytest.raises(AssertionError) as e: - Worker(topic='topic', consumer=consumer, deserializer=1) - assert str(e.value) == 'deserializer must be a callable' + Worker(topic="topic", consumer=consumer, deserializer=1) + assert str(e.value) == "deserializer must be a callable" with pytest.raises(AssertionError) as e: - Worker(topic='topic', consumer=consumer, logger=1) - assert str(e.value) == 'bad logger instance' + Worker(topic="topic", consumer=consumer, logger=1) + assert str(e.value) == "bad logger instance" def test_worker_run_success_function(queue, worker, success_func, log): @@ -54,12 +53,12 @@ def test_worker_run_success_function(queue, worker, success_func, log): worker.start(max_messages=1) out = log.last_lines(7) - assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) - assert next(out).startswith('[INFO] Starting {}'.format(worker)) - assert next(out).startswith('[INFO] Processing Message') - assert next(out).startswith('[INFO] Executing job {}'.format(job.id)) - assert next(out).startswith('[INFO] Job {} returned: 2'.format(job.id)) - assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith("[INFO] Enqueueing {}".format(job)) + assert next(out).startswith("[INFO] Starting {}".format(worker)) + assert next(out).startswith("[INFO] Processing Message") + assert next(out).startswith("[INFO] Executing job {}".format(job.id)) + assert next(out).startswith("[INFO] Job {} returned: 2".format(job.id)) + assert next(out).startswith("[INFO] Executing callback") assert next(out).startswith('[INFO] Callback got job status "success"') @@ -68,12 +67,12 @@ def test_worker_run_failure_function(queue, worker, failure_func, log): worker.start(max_messages=1) out = log.last_lines(7) - assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) - assert next(out).startswith('[INFO] Starting {}'.format(worker)) - assert next(out).startswith('[INFO] Processing Message') - assert next(out).startswith('[INFO] Executing job {}'.format(job.id)) - assert next(out).startswith('[ERROR] Job {} raised'.format(job.id)) - assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith("[INFO] Enqueueing {}".format(job)) + assert next(out).startswith("[INFO] Starting {}".format(worker)) + assert next(out).startswith("[INFO] Processing Message") + assert next(out).startswith("[INFO] Executing job {}".format(job.id)) + assert next(out).startswith("[ERROR] Job {} raised".format(job.id)) + assert next(out).startswith("[INFO] Executing callback") assert next(out).startswith('[INFO] Callback got job status "failure"') @@ -82,12 +81,12 @@ def test_worker_run_timeout_function(queue, worker, timeout_func, log): worker.start(max_messages=1) out = log.last_lines(7) - assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) - assert next(out).startswith('[INFO] Starting {}'.format(worker)) - assert next(out).startswith('[INFO] Processing Message') - assert next(out).startswith('[INFO] Executing job {}'.format(job.id)) - assert next(out).startswith('[ERROR] Job {} timed out'.format(job.id)) - assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith("[INFO] Enqueueing {}".format(job)) + assert next(out).startswith("[INFO] Starting {}".format(worker)) + assert next(out).startswith("[INFO] Processing Message") + assert next(out).startswith("[INFO] Executing job {}".format(job.id)) + assert next(out).startswith("[ERROR] Job {} timed out".format(job.id)) + assert next(out).startswith("[INFO] Executing callback") assert next(out).startswith('[INFO] Callback got job status "timeout"') @@ -97,13 +96,13 @@ def test_worker_run_bad_callback(queue, worker, success_func, callback, log): worker.start(max_messages=1) out = log.last_lines(7) - assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) - assert next(out).startswith('[INFO] Starting {}'.format(worker)) - assert next(out).startswith('[INFO] Processing Message') - assert next(out).startswith('[INFO] Executing job {}'.format(job.id)) - assert next(out).startswith('[INFO] Job {} returned: 20'.format(job.id)) - assert next(out).startswith('[INFO] Executing callback') - assert next(out).startswith('[ERROR] Callback raised an exception') + assert next(out).startswith("[INFO] Enqueueing {}".format(job)) + assert next(out).startswith("[INFO] Starting {}".format(worker)) + assert next(out).startswith("[INFO] Processing Message") + assert next(out).startswith("[INFO] Executing job {}".format(job.id)) + assert next(out).startswith("[INFO] Job {} returned: 20".format(job.id)) + assert next(out).startswith("[INFO] Executing callback") + assert next(out).startswith("[ERROR] Callback raised an exception") def test_worker_run_bad_job(queue, worker, success_func, deserializer, log): @@ -112,9 +111,9 @@ def test_worker_run_bad_job(queue, worker, success_func, deserializer, log): worker.start(max_messages=1) out = log.last_lines(6) - assert next(out).startswith('[INFO] Enqueueing {}'.format(job)) - assert next(out).startswith('[INFO] Starting {}'.format(worker)) - assert next(out).startswith('[INFO] Processing Message') - assert next(out).startswith('[ERROR] Job was invalid') - assert next(out).startswith('[INFO] Executing callback') + assert next(out).startswith("[INFO] Enqueueing {}".format(job)) + assert next(out).startswith("[INFO] Starting {}".format(worker)) + assert next(out).startswith("[INFO] Processing Message") + assert next(out).startswith("[ERROR] Job was invalid") + assert next(out).startswith("[INFO] Executing callback") assert next(out).startswith('[INFO] Callback got job status "invalid"')