Skip to content

Commit

Permalink
Add proc_ttl parameter to Worker class
Browse files Browse the repository at this point in the history
  • Loading branch information
joowani committed May 9, 2017
1 parent 38c5ca7 commit 3629496
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 7 deletions.
3 changes: 3 additions & 0 deletions kq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
[--certfile=<certfile>]
[--keyfile=<keyfile>]
[--crlfile=<crlfile>]
[--proc-ttl=<proc-ttl>]
[--verbose]
kq --help
kq --version
Expand All @@ -25,6 +26,7 @@
--certfile=<certfile> Full path to SSL client certificate
--keyfile=<keyfile> Full path to SSL private key
--crlfile=<crlfile> Full path to SSL crlfile for verifying expiry
--proc-ttl=<proc-ttl> Records read before re-spawning process [default: 5000]
--verbose Turn on debug logging output
--help Display this help menu
--version Display the version of KQ
Expand Down Expand Up @@ -100,4 +102,5 @@ def entry_point():
certfile=args['--certfile'],
keyfile=args['--keyfile'],
crlfile=args['--crlfile'],
proc_ttl=int(args['--proc-ttl'])
).start()
2 changes: 1 addition & 1 deletion kq/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '1.1.0'
VERSION = '1.2.0'
29 changes: 27 additions & 2 deletions kq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class Worker(object):
cafile='/my/files/cafile',
certfile='/my/files/certfile',
keyfile='/my/files/keyfile',
crlfile='/my/files/crlfile'
crlfile='/my/files/crlfile',
proc_ttl=2000,
)
worker.start()
Expand Down Expand Up @@ -89,6 +90,10 @@ class Worker(object):
:param crlfile: Full path to the CRL file for validating certification
expiry. This option is only available with Python 3.4+ or 2.7.9+.
:type crlfile: str | unicode
:param proc_ttl: The number of records read before the worker's process
(multiprocessing pool of 1 process) is re-spawned. If set to ``0``
or ``None``, the re-spawning is disabled. Default: ``5000``.
:type proc_ttl: int
"""

def __init__(self,
Expand All @@ -100,12 +105,14 @@ def __init__(self,
cafile=None,
certfile=None,
keyfile=None,
crlfile=None):
crlfile=None,
proc_ttl=5000):
self._hosts = hosts
self._topic = topic
self._timeout = timeout
self._callback = callback
self._pool = None
self._proc_ttl = proc_ttl
self._logger = logging.getLogger('kq')
self._consumer = kafka.KafkaConsumer(
self._topic,
Expand Down Expand Up @@ -205,6 +212,16 @@ def _consume_record(self, record):
self._logger.info('Job {} returned: {}'.format(job.id, res))
self._exec_callback('success', job, res, None, None)

def _refresh_pool(self):
"""Terminate the previous process pool and initialize a new one."""
self._logger.info('Refreshing process pool ...')
try:
self._pool.terminate()
except Exception as e: # pragma: no cover
self._logger.exception('Failed to terminate pool: {}'.format(e))
finally:
self._pool = mp.Pool(processes=1)

@property
def consumer(self):
"""Return the Kafka consumer object.
Expand Down Expand Up @@ -250,10 +267,18 @@ def start(self):
"""
self._logger.info('Starting {} ...'.format(self))
self._pool = mp.Pool(processes=1)
records_read = 0

try:
for record in self._consumer:
self._consume_record(record)
self._consumer.commit()
if self._proc_ttl and records_read >= self._proc_ttl:
self._refresh_pool()
records_read = 0
else:
records_read += 1

except KeyboardInterrupt: # pragma: no cover
self._logger.info('Stopping {} ...'.format(self))
self._pool.terminate() # TODO not sure if necessary
12 changes: 8 additions & 4 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def test_worker(worker, logger):
'--cafile=/test/files/cafile',
'--certfile=/test/files/certfile',
'--keyfile=/test/files/keyfile',
'--crlfile=/test/files/crlfile'
'--crlfile=/test/files/crlfile',
'--proc-ttl=1000'
]
with patch_object(sys, 'argv', test_arguments):
cli.entry_point()
Expand All @@ -109,7 +110,8 @@ def test_worker(worker, logger):
cafile='/test/files/cafile',
certfile='/test/files/certfile',
keyfile='/test/files/keyfile',
crlfile='/test/files/crlfile'
crlfile='/test/files/crlfile',
proc_ttl=1000
)
worker_inst.start.assert_called_once()

Expand Down Expand Up @@ -139,7 +141,8 @@ def test_callback(worker, logger):
cafile=None,
certfile=None,
keyfile=None,
crlfile=None
crlfile=None,
proc_ttl=5000
)
worker_inst.start.assert_called_once()

Expand All @@ -160,6 +163,7 @@ def test_verbose(worker, logger):
cafile=None,
certfile=None,
keyfile=None,
crlfile=None
crlfile=None,
proc_ttl=5000
)
worker_inst.start.assert_called_once()
26 changes: 26 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,29 @@ def test_start_job_callback_fail(logger, callback):
logger.exception.assert_called_once_with(
'Callback failed: {}'.format(expected_error)
)


def test_start_proc_ttl_reached(logger, callback):
mock_consumer.__iter__ = lambda x: iter([rec11, rec11])
worker = Worker(
hosts='localhost',
topic='foo',
callback=callback,
proc_ttl=1,
)
worker.start()
logger.info.assert_has_calls([
mock.call('Starting Worker(topic=foo) ...'),
mock.call('Processing {} ...'.format(rec11_repr)),
mock.call('Running Job 100: tests.utils.success_func(1, 2, c=3) ...'),
mock.call('Job 100 returned: (1, 2, 3)'),
mock.call('Executing callback ...'),
mock.call('Processing {} ...'.format(rec11_repr)),
mock.call('Running Job 100: tests.utils.success_func(1, 2, c=3) ...'),
mock.call('Job 100 returned: (1, 2, 3)'),
mock.call('Executing callback ...'),
mock.call('Refreshing process pool ...'),
])
callback.assert_called_with(
'success', success_job, (1, 2, 3), None, None
)

0 comments on commit 3629496

Please sign in to comment.