diff --git a/atlas/lib/idds/atlas/workflow/atlashpowork.py b/atlas/lib/idds/atlas/workflow/atlashpowork.py index 640b433c..5780b5f7 100644 --- a/atlas/lib/idds/atlas/workflow/atlashpowork.py +++ b/atlas/lib/idds/atlas/workflow/atlashpowork.py @@ -634,6 +634,7 @@ def poll_processing(self, processing): try: proc = processing['processing_metadata']['processing'] job_status, job_err_msg = self.poll_condor_job_status(processing, proc.external_id) + self.logger.info("poll_condor_job_status: (status: %s, error: %s" % (str(job_status), str(job_err_msg))) processing_outputs = None if job_status in [ProcessingStatus.Finished]: job_outputs, parser_errors = self.parse_processing_outputs(processing) diff --git a/main/lib/idds/agents/common/plugins/messaging.py b/main/lib/idds/agents/common/plugins/messaging.py index 48132de5..78c14ffd 100644 --- a/main/lib/idds/agents/common/plugins/messaging.py +++ b/main/lib/idds/agents/common/plugins/messaging.py @@ -162,7 +162,10 @@ def send_message(self, msg): 'msg_type': str(msg['msg_type']).lower()}) def execute_send(self): - self.conns = self.connect_to_messaging_brokers(sender=True) + try: + self.conns = self.connect_to_messaging_brokers(sender=True) + except Exception as error: + self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc())) while not self.graceful_stop.is_set(): try: @@ -207,7 +210,10 @@ def subscribe(self): conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto') def execute_subscribe(self): - self.subscribe() + try: + self.subscribe() + except Exception as error: + self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) while not self.graceful_stop.is_set(): has_failed_connection = False @@ -223,10 +229,13 @@ def execute_subscribe(self): self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) has_failed_connection = True - if has_failed_connection: - # re-subscribe - self.disconnect(self.receiver_conns) - self.subscribe() + if has_failed_connection or len(self.receiver_conns) == 0: + try: + # re-subscribe + self.disconnect(self.receiver_conns) + self.subscribe() + except Exception as error: + self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) self.logger.info('receiver graceful stop requested') @@ -247,8 +256,11 @@ def __init__(self, name="MessagingMessager", **kwargs): super(MessagingMessager, self).__init__(name=name, **kwargs) def execute_send_subscribe(self): - self.conns = self.connect_to_messaging_brokers(sender=True) - self.subscribe() + try: + self.conns = self.connect_to_messaging_brokers(sender=True) + self.subscribe() + except Exception as error: + self.logger.error("Messaging sender_subscriber throws an exception: %s, %s" % (error, traceback.format_exc())) while not self.graceful_stop.is_set(): # send @@ -278,14 +290,17 @@ def execute_send_subscribe(self): self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) has_failed_connection = True - if has_failed_connection: - # re-subscribe - self.disconnect(self.receiver_conns) - self.subscribe() + if has_failed_connection or len(self.receiver_conns) == 0: + try: + # re-subscribe + self.disconnect(self.receiver_conns) + self.subscribe() + except Exception as error: + self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) time.sleep(0.1) - self.logger.info('receiver graceful stop requested') + self.logger.info('sender_receiver graceful stop requested') self.disconnect(self.conns) self.disconnect(self.receiver_conns) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index eca0d6ad..44b62503 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -36,13 +36,15 @@ # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork from idds.doma.workflowv2.domapandawork import DomaPanDAWork +# task_cloud = 'LSST' +task_cloud = 'US' task_queue = 'DOMA_LSST_GOOGLE_TEST' # task_queue = 'DOMA_LSST_GOOGLE_MERGE' # task_queue = 'SLAC_TEST' # task_queue = 'DOMA_LSST_SLAC_TEST' task_queue = 'SLAC_Rubin' -task_queue = 'CC-IN2P3_TEST' +# task_queue = 'CC-IN2P3_TEST' def randStr(chars=string.ascii_lowercase + string.digits, N=10): @@ -137,7 +139,7 @@ def setup_workflow(): "token": "local", "type": "template", "value": "log.tgz"}, - task_cloud='LSST') + task_cloud=task_cloud) work2 = DomaPanDAWork(executable='echo', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}], @@ -151,7 +153,7 @@ def setup_workflow(): "token": "local", "type": "template", "value": "log.tgz"}, - task_cloud='LSST') + task_cloud=task_cloud) work3 = DomaPanDAWork(executable='echo', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}], @@ -165,7 +167,7 @@ def setup_workflow(): "token": "local", "type": "template", "value": "log.tgz"}, - task_cloud='LSST') + task_cloud=task_cloud) pending_time = 12 # pending_time = None