Skip to content

Commit

Permalink
Merge pull request #62 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Dec 6, 2021
2 parents 9149082 + a7ea16c commit 39144a0
Show file tree
Hide file tree
Showing 134 changed files with 19,927 additions and 801 deletions.
7 changes: 7 additions & 0 deletions atlas/lib/idds/atlas/notifier/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from idds.common.plugin.plugin_base import PluginBase
from idds.common.utils import setup_logging


setup_logging(__name__)
logging.getLogger("stomp").setLevel(logging.CRITICAL)


class MessagingListener(stomp.ConnectionListener):
Expand Down Expand Up @@ -54,6 +56,7 @@ def __init__(self, **kwargs):
self.setup_logger()
self.graceful_stop = threading.Event()
self.request_queue = None
self.output_queue = None

if not hasattr(self, 'brokers'):
raise Exception('brokers is required but not defined.')
Expand All @@ -78,6 +81,9 @@ def stop(self):
def set_request_queue(self, request_queue):
self.request_queue = request_queue

def set_output_queue(self, output_queue):
self.output_queue = output_queue

def connect_to_messaging_brokers(self):
broker_addresses = []
for b in self.brokers:
Expand Down Expand Up @@ -124,6 +130,7 @@ def run(self):
msg = self.request_queue.get(False)
if msg:
self.send_message(msg)
self.output_queue.put(msg)
else:
time.sleep(1)
except Exception as error:
Expand Down
2 changes: 1 addition & 1 deletion atlas/lib/idds/atlas/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.5.0"
release_version = "0.9.1"
4 changes: 3 additions & 1 deletion atlas/lib/idds/atlas/workflow/atlasactuatorwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ def syn_work_status(self, registered_input_output_maps):
self.status = WorkStatus.Failed
elif self.is_processings_subfinished():
self.status = WorkStatus.SubFinished
else:
self.status = WorkStatus.Transforming

####### functions for carrier ######## # noqa E266
###################################### # noqa E266
Expand Down Expand Up @@ -451,4 +453,4 @@ def poll_processing_updates(self, processing, input_output_maps):
'output_metadata': processing_outputs}}

updated_contents = []
return update_processing, updated_contents
return update_processing, updated_contents, {}
4 changes: 4 additions & 0 deletions atlas/lib/idds/atlas/workflow/atlascondorwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def generate_processing_submit_file(self, processing):

# self.logger.info("tf_inputs: %s, tf_outputs: %s" % (str(tf_inputs), str(tf_outputs)))

# if 'X509_USER_PROXY' in os.environ and os.environ['X509_USER_PROXY']:
# proxy_filename = os.path.basename(os.environ['X509_USER_PROXY'])
# tf_inputs = tf_inputs + [os.environ['X509_USER_PROXY']]

if tf_inputs:
jdl += "transfer_input_files = %s\n" % (str(','.join(tf_inputs)))
if tf_outputs:
Expand Down
115 changes: 71 additions & 44 deletions atlas/lib/idds/atlas/workflow/atlashpowork.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ def syn_work_status(self, registered_input_output_maps, all_updates_flushed=True
self.status = WorkStatus.Failed
else:
self.status = WorkStatus.SubFinished
else:
self.status = WorkStatus.Transforming

####### functions for carrier ######## # noqa E266
###################################### # noqa E266
Expand All @@ -436,6 +438,11 @@ def generate_processing_script_nevergrad(self, processing):
'NUM_POINTS': self.points_to_generate,
'IN': self.input_json,
'OUT': self.output_json}
if 'X509_USER_PROXY' in os.environ and os.environ['X509_USER_PROXY']:
proxy_filename = os.path.basename(os.environ['X509_USER_PROXY'])
param_values['X509_USER_PROXY_FULLNAME'] = os.environ['X509_USER_PROXY']
param_values['X509_USER_PROXY_BASENAME'] = proxy_filename

arguments = replace_parameters_with_values(arguments, param_values)

script = "#!/bin/bash\n\n"
Expand Down Expand Up @@ -471,6 +478,12 @@ def generate_processing_script_container(self, processing):
'NUM_POINTS': self.points_to_generate,
'IN': self.input_json,
'OUT': self.output_json}
proxy_filename = 'x509up'
if 'X509_USER_PROXY' in os.environ and os.environ['X509_USER_PROXY']:
proxy_filename = os.path.basename(os.environ['X509_USER_PROXY'])
param_values['X509_USER_PROXY_FULLNAME'] = os.environ['X509_USER_PROXY']
param_values['X509_USER_PROXY_BASENAME'] = proxy_filename

executable = replace_parameters_with_values(self.executable, param_values)
arguments = replace_parameters_with_values(self.arguments, param_values)

Expand All @@ -490,7 +503,7 @@ def generate_processing_script_container(self, processing):
script += "\n"

if self.sandbox and 'docker' in executable:
arguments = 'run --rm -v $(pwd):%s %s ' % (self.container_workdir, self.sandbox) + arguments
arguments = 'run --rm -v $(pwd):%s -v /cvmfs:/cvmfs -e X509_USER_PROXY=%s/%s %s ' % (self.container_workdir, self.container_workdir, proxy_filename, self.sandbox) + arguments

script += "echo '%s' '%s'\n" % (str(executable), str(arguments))
script += '%s %s\n' % (str(executable), str(arguments))
Expand All @@ -513,6 +526,11 @@ def generate_processing_script_sandbox(self, processing):
'NUM_POINTS': self.points_to_generate,
'IN': self.input_json,
'OUT': self.output_json}
if 'X509_USER_PROXY' in os.environ and os.environ['X509_USER_PROXY']:
proxy_filename = os.path.basename(os.environ['X509_USER_PROXY'])
param_values['X509_USER_PROXY_FULLNAME'] = os.environ['X509_USER_PROXY']
param_values['X509_USER_PROXY_BASENAME'] = proxy_filename

executable = replace_parameters_with_values(self.executable, param_values)
arguments = replace_parameters_with_values(self.arguments, param_values)

Expand Down Expand Up @@ -656,51 +674,60 @@ def parse_processing_outputs(self, processing):
return None, 'Failed to load the content of %s: %s' % (str(full_output_json), str(ex))

def poll_processing(self, processing):
proc = processing['processing_metadata']['processing']
job_status, job_err_msg = self.poll_condor_job_status(processing, proc.external_id)
processing_outputs = None
reset_expired_at = False
if job_status in [ProcessingStatus.Finished]:
job_outputs, parser_errors = self.parse_processing_outputs(processing)
if job_outputs:
processing_status = ProcessingStatus.Finished
try:
proc = processing['processing_metadata']['processing']
job_status, job_err_msg = self.poll_condor_job_status(processing, proc.external_id)
processing_outputs = None
reset_expired_at = False
if job_status in [ProcessingStatus.Finished]:
job_outputs, parser_errors = self.parse_processing_outputs(processing)
if job_outputs:
processing_status = ProcessingStatus.Finished
processing_err = None
processing_outputs = job_outputs
else:
processing_status = ProcessingStatus.Failed
processing_err = parser_errors
elif job_status in [ProcessingStatus.Failed]:
processing_status = job_status
processing_err = job_err_msg
elif self.toexpire:
processing_status = ProcessingStatus.Expired
processing_err = "The processing is expired"
elif job_status in [ProcessingStatus.Cancelled]:
processing_status = job_status
processing_err = job_err_msg
elif self.tocancel:
self.cancelled_processings.append(proc.internal_id)
processing_status = ProcessingStatus.Cancelled
processing_outputs = None
processing_err = 'Cancelled'
elif self.tosuspend:
self.suspended_processings.append(proc.internal_id)
processing_status = ProcessingStatus.Suspended
processing_outputs = None
processing_err = 'Suspend'
elif self.toresume:
# self.old_processings.append(processing['processing_metadata']['internal_id'])
# self.active_processings.clear()
# self.active_processings.remove(processing['processing_metadata']['internal_id'])
processing['processing_metadata']['resuming_at'] = datetime.datetime.utcnow()
processing_status = ProcessingStatus.Running
reset_expired_at = True
processing_outputs = None
processing_err = None
processing_outputs = job_outputs
else:
processing_status = job_status
processing_err = job_err_msg
return processing_status, processing_outputs, processing_err, reset_expired_at
except Exception as ex:
self.logger.error("processing_id %s exception: %s, %s" % (processing['processing_id'], str(ex), traceback.format_exc()))
proc.retries += 1
if proc.retries > 10:
processing_status = ProcessingStatus.Failed
processing_err = parser_errors
elif job_status in [ProcessingStatus.Failed]:
processing_status = job_status
processing_err = job_err_msg
elif self.toexpire:
processing_status = ProcessingStatus.Expired
processing_err = "The processing is expired"
elif job_status in [ProcessingStatus.Cancelled]:
processing_status = job_status
processing_err = job_err_msg
elif self.tocancel:
self.cancelled_processings.append(proc.internal_id)
processing_status = ProcessingStatus.Cancelled
processing_outputs = None
processing_err = 'Cancelled'
elif self.tosuspend:
self.suspended_processings.append(proc.internal_id)
processing_status = ProcessingStatus.Suspended
processing_outputs = None
processing_err = 'Suspend'
elif self.toresume:
# self.old_processings.append(processing['processing_metadata']['internal_id'])
# self.active_processings.clear()
# self.active_processings.remove(processing['processing_metadata']['internal_id'])
processing['processing_metadata']['resuming_at'] = datetime.datetime.utcnow()
processing_status = ProcessingStatus.Running
reset_expired_at = True
processing_outputs = None
processing_err = None
else:
processing_status = job_status
processing_err = job_err_msg
return processing_status, processing_outputs, processing_err, reset_expired_at
else:
processing_status = ProcessingStatus.Running
return processing_status, None, None, False

def poll_processing_updates(self, processing, input_output_maps):
processing_status, processing_outputs, processing_err, reset_expired_at = self.poll_processing(processing)
Expand All @@ -721,4 +748,4 @@ def poll_processing_updates(self, processing, input_output_maps):
update_processing['parameters']['expired_at'] = None
processing['expired_at'] = None
updated_contents = []
return update_processing, updated_contents
return update_processing, updated_contents, {}
Loading

0 comments on commit 39144a0

Please sign in to comment.