From f45975438528a435f97b7b76483ac4d880983b57 Mon Sep 17 00:00:00 2001 From: Reid Sunderland Date: Sun, 21 Jul 2024 21:27:44 +0000 Subject: [PATCH 1/4] Fix #1064 by adding try/except --- sarracenia/flow/__init__.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 5142fd407..f2de027f4 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -1038,9 +1038,17 @@ def gather(self) -> None: if self.have_vip: for plugin in self.plugins['poll']: - new_incoming = plugin() - if len(new_incoming) > 0: - self.worklist.incoming.extend(new_incoming) + try: + new_incoming = plugin() + if len(new_incoming) > 0: + self.worklist.incoming.extend(new_incoming) + except Exception as ex: + try: + logger.error(f'flowCallback plugin {plugin.__module__}.{plugin.__qualname__} crashed: {ex}' ) + except: + # just in case + logger.error(f'flowCallback plugin {plugin} crashed: {ex}' ) + logger.debug("details:", exc_info=True ) From 9bb5764ccb00cbd9918a7bc915b32cf83f5c1cb2 Mon Sep 17 00:00:00 2001 From: Reid Sunderland Date: Sun, 21 Jul 2024 21:47:25 +0000 Subject: [PATCH 2/4] refactor into runCallbackPoll method, allow full crash when debug is enabled --- sarracenia/flow/__init__.py | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index f2de027f4..9af890a10 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -328,6 +328,25 @@ def _runCallbackMetrics(self): logger.error( f'flowCallback plugin {p}/metricsReport crashed: {ex}' ) logger.debug( "details:", exc_info=True ) + def _runCallbackPoll(self): + for plugin in self.plugins['poll']: + if self.o.logLevel.lower() == 'debug' : + new_incoming = plugin() + if len(new_incoming) > 0: + self.worklist.incoming.extend(new_incoming) + else: + try: + new_incoming = plugin() + if len(new_incoming) > 0: + self.worklist.incoming.extend(new_incoming) + except Exception as ex: + try: + logger.error(f'flowCallback plugin {plugin.__module__}.{plugin.__qualname__} crashed: {ex}' ) + except: + # just in case + logger.error(f'flowCallback plugin {plugin} crashed: {ex}' ) + logger.debug("details:", exc_info=True ) + def _runHousekeeping(self, now) -> float: """ Run housekeeping callbacks Return the time when housekeeping should be run next @@ -1037,20 +1056,7 @@ def gather(self) -> None: self.worklist.poll_catching_up = False if self.have_vip: - for plugin in self.plugins['poll']: - try: - new_incoming = plugin() - if len(new_incoming) > 0: - self.worklist.incoming.extend(new_incoming) - except Exception as ex: - try: - logger.error(f'flowCallback plugin {plugin.__module__}.{plugin.__qualname__} crashed: {ex}' ) - except: - # just in case - logger.error(f'flowCallback plugin {plugin} crashed: {ex}' ) - logger.debug("details:", exc_info=True ) - - + self._runCallbackPoll() def do(self) -> None: From 09a28f03c472f0985d62f7d0bf45af12fda53eea Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 22 May 2024 17:44:03 -0400 Subject: [PATCH 3/4] support modular entry_points in flow itself, not just callbacks --- sarracenia/flow/__init__.py | 52 +++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 9af890a10..45c36ae36 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -274,6 +274,16 @@ def loadCallbacks(self, plugins_to_load=None): def _runCallbacksWorklist(self, entry_point): + if hasattr(self, entry_point): + if self.o.logLevel.lower() == 'debug' : + eval( f"self.{entry_point}(self.worklist)") + else: + try: + eval( f"self.{entry_point}(self.worklist)") + except Exception as ex: + logger.error( f'flow {entry_point} crashed: {ex}' ) + logger.debug( "details:", exc_info=True ) + if hasattr(self, 'plugins') and (entry_point in self.plugins): for p in self.plugins[entry_point]: if self.o.logLevel.lower() == 'debug' : @@ -286,6 +296,18 @@ def _runCallbacksWorklist(self, entry_point): logger.debug( "details:", exc_info=True ) def runCallbacksTime(self, entry_point): + + if hasattr(self, entry_point): + if self.o.logLevel.lower() == 'debug' : + eval( f"self.{entry_point}()") + else: + try: + logger.info( f'normal run of self.{entry_point}' ) + eval( f"self.{entry_point}()") + except Exception as ex: + logger.error( f'flow {entry_point} crashed: {ex}' ) + logger.debug( "details:", exc_info=True ) + for p in self.plugins[entry_point]: if self.o.logLevel.lower() == 'debug' : p() @@ -302,6 +324,16 @@ def _runCallbackMetrics(self): Expects the plugin to return a dictionary containing metrics, which is saved to ``self.metrics[plugin_name]``. """ + if hasattr(self, "metricsReport"): + if self.o.logLevel.lower() == 'debug' : + self.metricsReport() + else: + try: + self.metricsReport() + except Exception as ex: + logger.error( f'flow metricsReport() crashed: {ex}' ) + logger.debug( "details:", exc_info=True ) + if 'transferConnected' in self.metrics['flow'] and self.metrics['flow']['transferConnected']: now=nowflt() self.metrics['flow']['transferConnectTime'] += now - self.metrics['flow']['transferConnectStart'] @@ -329,6 +361,16 @@ def _runCallbackMetrics(self): logger.debug( "details:", exc_info=True ) def _runCallbackPoll(self): + if hasattr(self, "Poll"): + if self.o.logLevel.lower() == 'debug' : + self.Poll() + else: + try: + self.Poll() + except Exception as ex: + logger.error( f'flow Poll crashed: {ex}' ) + logger.debug( "details:", exc_info=True ) + for plugin in self.plugins['poll']: if self.o.logLevel.lower() == 'debug' : new_incoming = plugin() @@ -352,6 +394,16 @@ def _runHousekeeping(self, now) -> float: Return the time when housekeeping should be run next """ logger.info(f'on_housekeeping pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}') + if hasattr(self, "on_housekeeping"): + if self.o.logLevel.lower() == 'debug' : + self.on_housekeeping() + else: + try: + self.on_housekeeping() + except Exception as ex: + logger.error( f'flow on_housekeeping crashed: {ex}' ) + logger.debug( "details:", exc_info=True ) + self.runCallbacksTime('on_housekeeping') self.metricsFlowReset() self.metrics['flow']['last_housekeeping'] = now From 3920fca71449122645f357681b9a96577681567b Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 22 May 2024 17:44:38 -0400 Subject: [PATCH 4/4] adding module on_start to improve how poll callbacks are loaded --- sarracenia/flow/poll.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sarracenia/flow/poll.py b/sarracenia/flow/poll.py index f6ec080f0..628c5d108 100644 --- a/sarracenia/flow/poll.py +++ b/sarracenia/flow/poll.py @@ -65,10 +65,6 @@ def __init__(self, options): if not 'scheduled' in ','.join(self.plugins['load']): self.plugins['load'].append('sarracenia.flowcb.scheduled.poll.Poll') - if not 'flowcb.poll.Poll' in ','.join(self.plugins['load']): - logger.info( f"adding poll plugin, because missing from: {self.plugins['load']}" ) - self.plugins['load'].append('sarracenia.flowcb.poll.Poll') - if options.vip: self.plugins['load'].insert( 0, 'sarracenia.flowcb.gather.message.Message') @@ -80,3 +76,14 @@ def __init__(self, options): if not features['ftppoll']['present']: if hasattr( self.o, 'pollUrl' ) and ( self.o.pollUrl.startswith('ftp') ): logger.critical( f"attempting to configure an FTP poll pollUrl={self.o.pollUrl}, but missing python modules: {' '.join(features['ftppoll']['modules_needed'])}" ) + + def on_start(self): + + if 'poll' not in self.plugins or not self.plugins['poll']: + logger.info( f"adding built-in poll plugin, because no other poll provided from: {self.plugins['load']}" ) + + self.plugins['load'].append('sarracenia.flowcb.poll.Poll') + plugin = sarracenia.flowcb.load_library("sarracenia.flowcb.poll.Poll", self.o) + self.plugins['poll'] = [ getattr(plugin, 'poll') ] + else: + logger.info( f"not adding built-in poll, because already present: {self.plugins['poll']} " )