Skip to content

Commit

Permalink
Add offset_policy parameter to Worker class and add documentation for…
Browse files Browse the repository at this point in the history
… logging
  • Loading branch information
joowani committed May 10, 2017
1 parent 56efffd commit 5cc18b8
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 8 deletions.
2 changes: 2 additions & 0 deletions docs/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ or view the offsets on topic partitions:
[--keyfile=<keyfile>]
[--crlfile=<crlfile>]
[--proc-ttl=<proc-ttl>]
[--offset=<offset>]
[--verbose]
kq --help
kq --version
Expand All @@ -38,6 +39,7 @@ or view the offsets on topic partitions:
--keyfile=<keyfile> Full path to SSL private key
--crlfile=<crlfile> Full path to SSL crlfile for verifying expiry
--proc-ttl=<proc-ttl> Records read before re-spawning process [default: 5000]
--offset=<offset> Kafka consumer offset reset policy [default: latest]
--verbose Turn on debug logging output
--help Display this help menu
--version Display the version of KQ
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Contents
decorator
callback
cli
logging


Credits
Expand Down
38 changes: 38 additions & 0 deletions docs/logging.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Logging
-------

By default KQ logs messages using the ``kq`` logger.

Here is an example showing how the logger can be enabled and customized:

.. code-block:: python
import logging
from kq import Queue
logger = logging.getLogger('kq')
# Set the logging level
logger.setLevel(logging.DEBUG)
# Attach a handler
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(levelname)s] %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Enqueue function calls
q = Queue()
q.enqueue(int, 1)
q.enqueue(str, 1)
q.enqueue(bool, 1)
The logging output for above would look something like this:

.. code-block:: bash
[INFO] Enqueued: Job(id='64ee47d', topic='default', func=<class 'int'> ...)
[INFO] Enqueued: Job(id='4578f57', topic='default', func=<class 'str'> ...)
[INFO] Enqueued: Job(id='792643c', topic='default', func=<class 'bool'> ...)
6 changes: 5 additions & 1 deletion kq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
[--keyfile=<keyfile>]
[--crlfile=<crlfile>]
[--proc-ttl=<proc-ttl>]
[--offset=<offset>]
[--verbose]
kq --help
kq --version
Expand All @@ -27,6 +28,7 @@
--keyfile=<keyfile> Full path to SSL private key
--crlfile=<crlfile> Full path to SSL crlfile for verifying expiry
--proc-ttl=<proc-ttl> Records read before re-spawning process [default: 5000]
--offset=<offset> Kafka consumer offset reset policy [default: latest]
--verbose Turn on debug logging output
--help Display this help menu
--version Display the version of KQ
Expand Down Expand Up @@ -92,6 +94,7 @@ def entry_point():

elif args['worker']:
timeout = args['--timeout']
print(args['--offset'])
kq.Worker(
hosts=args['--hosts'],
topic=args['--topic'],
Expand All @@ -102,5 +105,6 @@ def entry_point():
certfile=args['--certfile'],
keyfile=args['--keyfile'],
crlfile=args['--crlfile'],
proc_ttl=int(args['--proc-ttl'])
proc_ttl=int(args['--proc-ttl']),
offset_policy=args['--offset']
).start()
2 changes: 1 addition & 1 deletion kq/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '1.2.0'
VERSION = '1.3.0'
9 changes: 7 additions & 2 deletions kq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class Worker(object):
(multiprocessing pool of 1 process) is re-spawned. If set to ``0``
or ``None``, the re-spawning is disabled. Default: ``5000``.
:type proc_ttl: int
:param offset_policy: Policy for resetting offsets on the Kafka consumer.
Value ``"earliest"`` moves the offset to the oldest available message
and ``"latest"`` to the most recent. Default: 'latest'.
:type offset_policy: str | unicode
"""

def __init__(self,
Expand All @@ -106,7 +110,8 @@ def __init__(self,
certfile=None,
keyfile=None,
crlfile=None,
proc_ttl=5000):
proc_ttl=5000,
offset_policy='latest'):
self._hosts = hosts
self._topic = topic
self._timeout = timeout
Expand All @@ -125,7 +130,7 @@ def __init__(self,
ssl_crlfile=crlfile,
consumer_timeout_ms=-1,
enable_auto_commit=False,
auto_offset_reset='latest',
auto_offset_reset=offset_policy,
)

def __del__(self):
Expand Down
12 changes: 8 additions & 4 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def test_worker(worker, logger):
'--certfile=/test/files/certfile',
'--keyfile=/test/files/keyfile',
'--crlfile=/test/files/crlfile',
'--proc-ttl=1000'
'--proc-ttl=1000',
'--offset=earliest'
]
with patch_object(sys, 'argv', test_arguments):
cli.entry_point()
Expand All @@ -111,7 +112,8 @@ def test_worker(worker, logger):
certfile='/test/files/certfile',
keyfile='/test/files/keyfile',
crlfile='/test/files/crlfile',
proc_ttl=1000
proc_ttl=1000,
offset_policy='earliest'
)
worker_inst.start.assert_called_once()

Expand Down Expand Up @@ -142,7 +144,8 @@ def test_callback(worker, logger):
certfile=None,
keyfile=None,
crlfile=None,
proc_ttl=5000
proc_ttl=5000,
offset_policy='latest'
)
worker_inst.start.assert_called_once()

Expand All @@ -164,6 +167,7 @@ def test_verbose(worker, logger):
certfile=None,
keyfile=None,
crlfile=None,
proc_ttl=5000
proc_ttl=5000,
offset_policy='latest'
)
worker_inst.start.assert_called_once()

0 comments on commit 5cc18b8

Please sign in to comment.