From 5cc18b804f28922f670d8a601f3c25d58eb8f14f Mon Sep 17 00:00:00 2001 From: Joohwan Oh Date: Wed, 10 May 2017 01:16:04 -0700 Subject: [PATCH] Add offset_policy parameter to Worker class and add documentation for logging --- docs/cli.rst | 2 ++ docs/index.rst | 1 + docs/logging.rst | 38 ++++++++++++++++++++++++++++++++++++++ kq/cli.py | 6 +++++- kq/version.py | 2 +- kq/worker.py | 9 +++++++-- tests/test_cli.py | 12 ++++++++---- 7 files changed, 62 insertions(+), 8 deletions(-) create mode 100644 docs/logging.rst diff --git a/docs/cli.rst b/docs/cli.rst index 352f34e..5fb73de 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -23,6 +23,7 @@ or view the offsets on topic partitions: [--keyfile=] [--crlfile=] [--proc-ttl=] + [--offset=] [--verbose] kq --help kq --version @@ -38,6 +39,7 @@ or view the offsets on topic partitions: --keyfile= Full path to SSL private key --crlfile= Full path to SSL crlfile for verifying expiry --proc-ttl= Records read before re-spawning process [default: 5000] + --offset= Kafka consumer offset reset policy [default: latest] --verbose Turn on debug logging output --help Display this help menu --version Display the version of KQ diff --git a/docs/index.rst b/docs/index.rst index eafeaac..2942f91 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -52,6 +52,7 @@ Contents decorator callback cli + logging Credits diff --git a/docs/logging.rst b/docs/logging.rst new file mode 100644 index 0000000..5a1748a --- /dev/null +++ b/docs/logging.rst @@ -0,0 +1,38 @@ +Logging +------- + +By default KQ logs messages using the ``kq`` logger. + +Here is an example showing how the logger can be enabled and customized: + +.. code-block:: python + + import logging + + from kq import Queue + + logger = logging.getLogger('kq') + + # Set the logging level + logger.setLevel(logging.DEBUG) + + # Attach a handler + handler = logging.StreamHandler() + formatter = logging.Formatter('[%(levelname)s] %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + + # Enqueue function calls + q = Queue() + q.enqueue(int, 1) + q.enqueue(str, 1) + q.enqueue(bool, 1) + + +The logging output for above would look something like this: + +.. code-block:: bash + + [INFO] Enqueued: Job(id='64ee47d', topic='default', func= ...) + [INFO] Enqueued: Job(id='4578f57', topic='default', func= ...) + [INFO] Enqueued: Job(id='792643c', topic='default', func= ...) diff --git a/kq/cli.py b/kq/cli.py index 69ccf10..0604102 100644 --- a/kq/cli.py +++ b/kq/cli.py @@ -12,6 +12,7 @@ [--keyfile=] [--crlfile=] [--proc-ttl=] + [--offset=] [--verbose] kq --help kq --version @@ -27,6 +28,7 @@ --keyfile= Full path to SSL private key --crlfile= Full path to SSL crlfile for verifying expiry --proc-ttl= Records read before re-spawning process [default: 5000] + --offset= Kafka consumer offset reset policy [default: latest] --verbose Turn on debug logging output --help Display this help menu --version Display the version of KQ @@ -92,6 +94,7 @@ def entry_point(): elif args['worker']: timeout = args['--timeout'] + print(args['--offset']) kq.Worker( hosts=args['--hosts'], topic=args['--topic'], @@ -102,5 +105,6 @@ def entry_point(): certfile=args['--certfile'], keyfile=args['--keyfile'], crlfile=args['--crlfile'], - proc_ttl=int(args['--proc-ttl']) + proc_ttl=int(args['--proc-ttl']), + offset_policy=args['--offset'] ).start() diff --git a/kq/version.py b/kq/version.py index ee65984..9e23359 100644 --- a/kq/version.py +++ b/kq/version.py @@ -1 +1 @@ -VERSION = '1.2.0' +VERSION = '1.3.0' diff --git a/kq/worker.py b/kq/worker.py index a911a20..119a6e1 100644 --- a/kq/worker.py +++ b/kq/worker.py @@ -94,6 +94,10 @@ class Worker(object): (multiprocessing pool of 1 process) is re-spawned. If set to ``0`` or ``None``, the re-spawning is disabled. Default: ``5000``. :type proc_ttl: int + :param offset_policy: Policy for resetting offsets on the Kafka consumer. + Value ``"earliest"`` moves the offset to the oldest available message + and ``"latest"`` to the most recent. Default: 'latest'. + :type offset_policy: str | unicode """ def __init__(self, @@ -106,7 +110,8 @@ def __init__(self, certfile=None, keyfile=None, crlfile=None, - proc_ttl=5000): + proc_ttl=5000, + offset_policy='latest'): self._hosts = hosts self._topic = topic self._timeout = timeout @@ -125,7 +130,7 @@ def __init__(self, ssl_crlfile=crlfile, consumer_timeout_ms=-1, enable_auto_commit=False, - auto_offset_reset='latest', + auto_offset_reset=offset_policy, ) def __del__(self): diff --git a/tests/test_cli.py b/tests/test_cli.py index fd27873..08378df 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -95,7 +95,8 @@ def test_worker(worker, logger): '--certfile=/test/files/certfile', '--keyfile=/test/files/keyfile', '--crlfile=/test/files/crlfile', - '--proc-ttl=1000' + '--proc-ttl=1000', + '--offset=earliest' ] with patch_object(sys, 'argv', test_arguments): cli.entry_point() @@ -111,7 +112,8 @@ def test_worker(worker, logger): certfile='/test/files/certfile', keyfile='/test/files/keyfile', crlfile='/test/files/crlfile', - proc_ttl=1000 + proc_ttl=1000, + offset_policy='earliest' ) worker_inst.start.assert_called_once() @@ -142,7 +144,8 @@ def test_callback(worker, logger): certfile=None, keyfile=None, crlfile=None, - proc_ttl=5000 + proc_ttl=5000, + offset_policy='latest' ) worker_inst.start.assert_called_once() @@ -164,6 +167,7 @@ def test_verbose(worker, logger): certfile=None, keyfile=None, crlfile=None, - proc_ttl=5000 + proc_ttl=5000, + offset_policy='latest' ) worker_inst.start.assert_called_once()