Skip to content

Commit

Permalink
Merge pull request #127 from wguanicedew/dev
Browse files Browse the repository at this point in the history
fix messaging to catch exceptions
  • Loading branch information
wguanicedew authored Nov 21, 2022
2 parents 7d2956a + c6dc659 commit a7f90ed
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
1 change: 1 addition & 0 deletions atlas/lib/idds/atlas/workflow/atlashpowork.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ def poll_processing(self, processing):
try:
proc = processing['processing_metadata']['processing']
job_status, job_err_msg = self.poll_condor_job_status(processing, proc.external_id)
self.logger.info("poll_condor_job_status: (status: %s, error: %s" % (str(job_status), str(job_err_msg)))
processing_outputs = None
if job_status in [ProcessingStatus.Finished]:
job_outputs, parser_errors = self.parse_processing_outputs(processing)
Expand Down
41 changes: 28 additions & 13 deletions main/lib/idds/agents/common/plugins/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ def send_message(self, msg):
'msg_type': str(msg['msg_type']).lower()})

def execute_send(self):
self.conns = self.connect_to_messaging_brokers(sender=True)
try:
self.conns = self.connect_to_messaging_brokers(sender=True)
except Exception as error:
self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))

while not self.graceful_stop.is_set():
try:
Expand Down Expand Up @@ -207,7 +210,10 @@ def subscribe(self):
conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto')

def execute_subscribe(self):
self.subscribe()
try:
self.subscribe()
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))

while not self.graceful_stop.is_set():
has_failed_connection = False
Expand All @@ -223,10 +229,13 @@ def execute_subscribe(self):
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
has_failed_connection = True

if has_failed_connection:
# re-subscribe
self.disconnect(self.receiver_conns)
self.subscribe()
if has_failed_connection or len(self.receiver_conns) == 0:
try:
# re-subscribe
self.disconnect(self.receiver_conns)
self.subscribe()
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))

self.logger.info('receiver graceful stop requested')

Expand All @@ -247,8 +256,11 @@ def __init__(self, name="MessagingMessager", **kwargs):
super(MessagingMessager, self).__init__(name=name, **kwargs)

def execute_send_subscribe(self):
self.conns = self.connect_to_messaging_brokers(sender=True)
self.subscribe()
try:
self.conns = self.connect_to_messaging_brokers(sender=True)
self.subscribe()
except Exception as error:
self.logger.error("Messaging sender_subscriber throws an exception: %s, %s" % (error, traceback.format_exc()))

while not self.graceful_stop.is_set():
# send
Expand Down Expand Up @@ -278,14 +290,17 @@ def execute_send_subscribe(self):
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
has_failed_connection = True

if has_failed_connection:
# re-subscribe
self.disconnect(self.receiver_conns)
self.subscribe()
if has_failed_connection or len(self.receiver_conns) == 0:
try:
# re-subscribe
self.disconnect(self.receiver_conns)
self.subscribe()
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))

time.sleep(0.1)

self.logger.info('receiver graceful stop requested')
self.logger.info('sender_receiver graceful stop requested')
self.disconnect(self.conns)
self.disconnect(self.receiver_conns)

Expand Down
10 changes: 6 additions & 4 deletions main/lib/idds/tests/test_domapanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
# from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
from idds.doma.workflowv2.domapandawork import DomaPanDAWork

# task_cloud = 'LSST'
task_cloud = 'US'

task_queue = 'DOMA_LSST_GOOGLE_TEST'
# task_queue = 'DOMA_LSST_GOOGLE_MERGE'
# task_queue = 'SLAC_TEST'
# task_queue = 'DOMA_LSST_SLAC_TEST'
task_queue = 'SLAC_Rubin'
task_queue = 'CC-IN2P3_TEST'
# task_queue = 'CC-IN2P3_TEST'


def randStr(chars=string.ascii_lowercase + string.digits, N=10):
Expand Down Expand Up @@ -137,7 +139,7 @@ def setup_workflow():
"token": "local",
"type": "template",
"value": "log.tgz"},
task_cloud='LSST')
task_cloud=task_cloud)
work2 = DomaPanDAWork(executable='echo',
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
Expand All @@ -151,7 +153,7 @@ def setup_workflow():
"token": "local",
"type": "template",
"value": "log.tgz"},
task_cloud='LSST')
task_cloud=task_cloud)
work3 = DomaPanDAWork(executable='echo',
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
Expand All @@ -165,7 +167,7 @@ def setup_workflow():
"token": "local",
"type": "template",
"value": "log.tgz"},
task_cloud='LSST')
task_cloud=task_cloud)

pending_time = 12
# pending_time = None
Expand Down

0 comments on commit a7f90ed

Please sign in to comment.