Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1064 by adding try/except #1065

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 72 additions & 6 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ def loadCallbacks(self, plugins_to_load=None):

def _runCallbacksWorklist(self, entry_point):

if hasattr(self, entry_point):
if self.o.logLevel.lower() == 'debug' :
eval( f"self.{entry_point}(self.worklist)")
else:
try:
eval( f"self.{entry_point}(self.worklist)")
except Exception as ex:
logger.error( f'flow {entry_point} crashed: {ex}' )
logger.debug( "details:", exc_info=True )

if hasattr(self, 'plugins') and (entry_point in self.plugins):
for p in self.plugins[entry_point]:
if self.o.logLevel.lower() == 'debug' :
Expand All @@ -286,6 +296,18 @@ def _runCallbacksWorklist(self, entry_point):
logger.debug( "details:", exc_info=True )

def runCallbacksTime(self, entry_point):

if hasattr(self, entry_point):
if self.o.logLevel.lower() == 'debug' :
eval( f"self.{entry_point}()")
else:
try:
logger.info( f'normal run of self.{entry_point}' )
eval( f"self.{entry_point}()")
except Exception as ex:
logger.error( f'flow {entry_point} crashed: {ex}' )
logger.debug( "details:", exc_info=True )

for p in self.plugins[entry_point]:
if self.o.logLevel.lower() == 'debug' :
p()
Expand All @@ -302,6 +324,16 @@ def _runCallbackMetrics(self):
Expects the plugin to return a dictionary containing metrics, which is saved to ``self.metrics[plugin_name]``.
"""

if hasattr(self, "metricsReport"):
if self.o.logLevel.lower() == 'debug' :
self.metricsReport()
else:
try:
self.metricsReport()
except Exception as ex:
logger.error( f'flow metricsReport() crashed: {ex}' )
logger.debug( "details:", exc_info=True )

if 'transferConnected' in self.metrics['flow'] and self.metrics['flow']['transferConnected']:
now=nowflt()
self.metrics['flow']['transferConnectTime'] += now - self.metrics['flow']['transferConnectStart']
Expand All @@ -328,11 +360,50 @@ def _runCallbackMetrics(self):
logger.error( f'flowCallback plugin {p}/metricsReport crashed: {ex}' )
logger.debug( "details:", exc_info=True )

def _runCallbackPoll(self):
if hasattr(self, "Poll"):
if self.o.logLevel.lower() == 'debug' :
self.Poll()
else:
try:
self.Poll()
except Exception as ex:
logger.error( f'flow Poll crashed: {ex}' )
logger.debug( "details:", exc_info=True )

for plugin in self.plugins['poll']:
if self.o.logLevel.lower() == 'debug' :
new_incoming = plugin()
if len(new_incoming) > 0:
self.worklist.incoming.extend(new_incoming)
else:
try:
new_incoming = plugin()
if len(new_incoming) > 0:
self.worklist.incoming.extend(new_incoming)
except Exception as ex:
try:
logger.error(f'flowCallback plugin {plugin.__module__}.{plugin.__qualname__} crashed: {ex}' )
except:
# just in case
logger.error(f'flowCallback plugin {plugin} crashed: {ex}' )
logger.debug("details:", exc_info=True )

def _runHousekeeping(self, now) -> float:
""" Run housekeeping callbacks
Return the time when housekeeping should be run next
"""
logger.info(f'on_housekeeping pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}')
if hasattr(self, "on_housekeeping"):
if self.o.logLevel.lower() == 'debug' :
self.on_housekeeping()
else:
try:
self.on_housekeeping()
except Exception as ex:
logger.error( f'flow on_housekeeping crashed: {ex}' )
logger.debug( "details:", exc_info=True )

self.runCallbacksTime('on_housekeeping')
self.metricsFlowReset()
self.metrics['flow']['last_housekeeping'] = now
Expand Down Expand Up @@ -1037,12 +1108,7 @@ def gather(self) -> None:
self.worklist.poll_catching_up = False

if self.have_vip:
for plugin in self.plugins['poll']:
new_incoming = plugin()
if len(new_incoming) > 0:
self.worklist.incoming.extend(new_incoming)


self._runCallbackPoll()

def do(self) -> None:

Expand Down
15 changes: 11 additions & 4 deletions sarracenia/flow/poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ def __init__(self, options):
if not 'scheduled' in ','.join(self.plugins['load']):
self.plugins['load'].append('sarracenia.flowcb.scheduled.poll.Poll')

if not 'flowcb.poll.Poll' in ','.join(self.plugins['load']):
logger.info( f"adding poll plugin, because missing from: {self.plugins['load']}" )
self.plugins['load'].append('sarracenia.flowcb.poll.Poll')

if options.vip:
self.plugins['load'].insert( 0, 'sarracenia.flowcb.gather.message.Message')

Expand All @@ -80,3 +76,14 @@ def __init__(self, options):
if not features['ftppoll']['present']:
if hasattr( self.o, 'pollUrl' ) and ( self.o.pollUrl.startswith('ftp') ):
logger.critical( f"attempting to configure an FTP poll pollUrl={self.o.pollUrl}, but missing python modules: {' '.join(features['ftppoll']['modules_needed'])}" )

def on_start(self):

if 'poll' not in self.plugins or not self.plugins['poll']:
logger.info( f"adding built-in poll plugin, because no other poll provided from: {self.plugins['load']}" )

self.plugins['load'].append('sarracenia.flowcb.poll.Poll')
plugin = sarracenia.flowcb.load_library("sarracenia.flowcb.poll.Poll", self.o)
self.plugins['poll'] = [ getattr(plugin, 'poll') ]
else:
logger.info( f"not adding built-in poll, because already present: {self.plugins['poll']} " )
Loading