From affe018f73ad1634bdd101c9b4176a0b5e4c5db0 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 26 Jul 2023 20:49:15 -0400 Subject: [PATCH] fix for #643, adding functional on_cleanup, on_declare, and on_sanity events. also fixes a problem introduced by previous branch where cleanup was broken because xxxSetup routines were no longer called by init --- sarracenia/flow/__init__.py | 10 +++--- sarracenia/flowcb/__init__.py | 12 +++++-- sarracenia/flowcb/log.py | 6 ++++ sarracenia/flowcb/retry.py | 10 +++--- sarracenia/redisqueue.py | 2 +- sarracenia/sr.py | 67 ++++++++++++++++++++++++++++++++++- 6 files changed, 93 insertions(+), 14 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index a6708d6c0..c1769e3e6 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -262,7 +262,7 @@ def _runCallbacksWorklist(self, entry_point): logger.error( f'flowCallback plugin {p}/{entry_point} crashed: {ex}' ) logger.debug( "details:", exc_info=True ) - def _runCallbacksTime(self, entry_point): + def runCallbacksTime(self, entry_point): for p in self.plugins[entry_point]: if self.o.logLevel.lower() == 'debug' : p() @@ -341,14 +341,14 @@ def please_stop(self) -> None: logger.info( f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.' ) - self._runCallbacksTime('please_stop') + self.runCallbacksTime('please_stop') self._stop_requested = True self.metrics["flow"]['stop_requested'] = True def close(self) -> None: - self._runCallbacksTime('on_stop') + self.runCallbacksTime('on_stop') if os.path.exists( self.o.novipFilename ): os.unlink( self.o.novipFilename ) logger.info( @@ -405,7 +405,7 @@ def run(self): if os.path.exists( self.o.novipFilename ): os.unlink( self.o.novipFilename ) - self._runCallbacksTime(f'on_start') + self.runCallbacksTime(f'on_start') spamming = True last_gather_len = 0 @@ -587,7 +587,7 @@ def run(self): logger.info( f'on_housekeeping pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}' ) - self._runCallbacksTime('on_housekeeping') + self.runCallbacksTime('on_housekeeping') self.metricsFlowReset() self.metrics['flow']['last_housekeeping'] = now diff --git a/sarracenia/flowcb/__init__.py b/sarracenia/flowcb/__init__.py index 5374e051f..893cff29d 100755 --- a/sarracenia/flowcb/__init__.py +++ b/sarracenia/flowcb/__init__.py @@ -13,8 +13,8 @@ entry_points = [ 'ack', 'after_accept', 'after_post', 'after_work', 'destfn', 'do_poll', - 'download', 'gather', 'on_housekeeping', 'on_report', 'on_start', 'on_stop', - 'poll', 'post', 'send', 'please_stop', 'metricsReport', + 'download', 'gather', 'metricsReport', 'on_cleanup', 'on_declare', 'on_housekeeping', 'on_report', + 'on_sanity', 'on_start', 'on_stop', 'please_stop', 'poll', 'post', 'send', ] @@ -129,6 +129,14 @@ def metricsReport(self) -> dict: Return a dictionary of metrics. Example: number of messages remaining in retry queues. + def on_cleanup(self) -> None:: + allow plugins to perform additional work after broker resources are eliminated. + local state files are still present when this runs. + + def on_declare(self) -> None:: + local state files are still already present when this runs. + allow plugins to perform additional work besides broker resource setup. + def on_housekeeping(self) -> None:: do periodic processing. diff --git a/sarracenia/flowcb/log.py b/sarracenia/flowcb/log.py index 725fa5403..0d88d3d59 100755 --- a/sarracenia/flowcb/log.py +++ b/sarracenia/flowcb/log.py @@ -165,6 +165,12 @@ def stats(self): logger.info("lag: average: %.2f, maximum: %.2f " % (self.lagTotal / self.msgCount, self.lagMax)) + def on_cleanup(self): + logger.info("hello") + + def on_declare(self): + logger.info("hello") + def on_stop(self): if set(['on_stop']) & self.o.logEvents: self.stats() diff --git a/sarracenia/flowcb/retry.py b/sarracenia/flowcb/retry.py index 9e598af37..d106bb15e 100755 --- a/sarracenia/flowcb/retry.py +++ b/sarracenia/flowcb/retry.py @@ -64,11 +64,6 @@ def __init__(self, options) -> None: logger.debug('logLevel=%s' % self.o.logLevel) - def cleanup(self) -> None: - logger.debug('starting retry cleanup') - self.download_retry.cleanup() - self.post_retry.cleanup() - def after_accept(self, worklist) -> None: """ If there are only a few new messages, get some from the download retry queue and put them into @@ -121,6 +116,11 @@ def metricsReport(self) -> dict: """ return {'msgs_in_download_retry': len(self.download_retry), 'msgs_in_post_retry': len(self.post_retry)} + def on_cleanup(self) -> None: + logger.debug('starting retry cleanup') + self.download_retry.cleanup() + self.post_retry.cleanup() + def on_housekeeping(self) -> None: logger.info("on_housekeeping") diff --git a/sarracenia/redisqueue.py b/sarracenia/redisqueue.py index 761ed36ca..d06b7602f 100755 --- a/sarracenia/redisqueue.py +++ b/sarracenia/redisqueue.py @@ -228,7 +228,7 @@ def put(self, message_list): logger.debug("rpush to list %s %s" % (self.key_name_new, message)) self.redis.rpush(self.key_name_new, self._msgToJSON(message)) - def cleanup(self): + def on_cleanup(self): """ remove statefiles. """ diff --git a/sarracenia/sr.py b/sarracenia/sr.py index 83762b4d9..2c7dc52f0 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -1300,6 +1300,28 @@ def declare(self): qdc.getSetup() qdc.close() + # run on_declare plugins. + for f in self.filtered_configurations: + if f == 'audit': continue + if self.please_stop: + break + + (c, cfg) = f.split(os.sep) + + if not 'options' in self.configs[c][cfg]: + continue + + o = self.configs[c][cfg]['options'] + o.no=0 + o.finalize() + if c not in [ 'cpost', 'cpump' ]: + flow = sarracenia.flow.Flow.factory(o) + flow.loadCallbacks() + flow.runCallbacksTime('on_declare') + del flow + flow=None + + def disable(self): if len(self.filtered_configurations) == 0: logging.error('%s configuration not found', self.leftovers) @@ -1469,6 +1491,7 @@ def cleanup(self): 'queueName': o.resolved_qname, 'message_strategy': { 'stubborn':True } }) + qdc.getSetup() qdc.getCleanUp() qdc.close() queues_to_delete.append((o.broker, o.resolved_qname)) @@ -1500,10 +1523,31 @@ def cleanup(self): 'message_strategy': { 'stubborn':True } }) if qdc: + qdc.putSetup() qdc.putCleanUp() qdc.close() - self.user_cache_dir + # run on_cleanup plugins. + for f in self.filtered_configurations: + if f == 'audit': continue + if self.please_stop: + break + + (c, cfg) = f.split(os.sep) + + if not 'options' in self.configs[c][cfg]: + continue + + o = self.configs[c][cfg]['options'] + o.no=0 + o.finalize() + if c not in [ 'cpost', 'cpump' ]: + flow = sarracenia.flow.Flow.factory(o) + flow.loadCallbacks() + flow.runCallbacksTime('on_cleanup') + del flow + flow=None + for f in self.filtered_configurations: if self.please_stop: break @@ -1806,6 +1850,27 @@ def sanity(self): if not sarracenia.extras[l]['present']: print( f"notice: python module {l} is missing: {sarracenia.extras[l]['lament']}" ) + # run on_sanity plugins. + for f in self.filtered_configurations: + if f == 'audit': continue + if self.please_stop: + break + + (c, cfg) = f.split(os.sep) + + if not 'options' in self.configs[c][cfg]: + continue + + o = self.configs[c][cfg]['options'] + o.no=0 + o.finalize() + if c not in [ 'cpost', 'cpump' ]: + flow = sarracenia.flow.Flow.factory(o) + flow.loadCallbacks() + flow.runCallbacksTime('on_sanity') + del flow + flow=None + def start(self): """ Starting all components