diff --git a/atlas/lib/idds/atlas/version.py b/atlas/lib/idds/atlas/version.py index 31e441df..613d7ae0 100644 --- a/atlas/lib/idds/atlas/version.py +++ b/atlas/lib/idds/atlas/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.5" diff --git a/atlas/lib/idds/atlas/workflow/atlaspandawork.py b/atlas/lib/idds/atlas/workflow/atlaspandawork.py index 621a8a76..d79f1352 100644 --- a/atlas/lib/idds/atlas/workflow/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflow/atlaspandawork.py @@ -392,7 +392,7 @@ def create_processing(self, input_output_maps=[]): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_param = proc.processing_metadata['task_param'] @@ -436,7 +436,7 @@ def submit_processing(self, processing): def poll_panda_task_status(self, processing): if 'processing' in processing['processing_metadata']: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] status, task_status = Client.getTaskStatus(proc.workload_id) @@ -468,7 +468,7 @@ def get_processing_status_from_panda_status(self, task_status): return processing_status def get_panda_task_id(self, processing): - from pandatools import Client + from pandaclient import Client start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10) start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') @@ -494,7 +494,7 @@ def get_panda_task_id(self, processing): def poll_panda_task(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: proc = processing['processing_metadata']['processing'] @@ -534,7 +534,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None): def kill_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -547,7 +547,7 @@ def kill_processing(self, processing): def kill_processing_force(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -560,7 +560,7 @@ def kill_processing_force(self, processing): def reactivate_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client # task_id = processing['processing_metadata']['task_id'] proc = processing['processing_metadata']['processing'] task_id = proc.workload_id diff --git a/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py b/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py index a1b2c764..ce36e153 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py @@ -347,7 +347,7 @@ def create_processing(self, input_output_maps): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client task_param = processing['processing_metadata']['task_param'] return_code = Client.insertTaskParams(task_param, verbose=True) diff --git a/atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py b/atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py index 2bec4ca0..0ef14b6c 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py @@ -296,7 +296,7 @@ def get_rucio_download_client(self): def poll_panda_task_output(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: output_metadata = {} diff --git a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py index 05a566f4..bcd505bb 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py @@ -173,6 +173,17 @@ def parse_task_parameters(self, task_parameters): log_col = {'scope': scope, 'name': name} self.add_log_collections(log_col) + if not self.get_primary_output_collection(): + all_colls = self.get_collections() + if all_colls: + one_coll = all_colls[0] + output_coll_scope = one_coll.scope + else: + output_coll_scope = 'pseudo.scope' + name = 'pseudo_output.' + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S_%f") + str(random.randint(1, 1000)) + output_coll = {'scope': output_coll_scope, 'name': name, 'type': CollectionType.PseudoDataset} + self.set_primary_output_collection(output_coll) + if not self.get_primary_input_collection(): output_colls = self.get_output_collections() output_coll = output_colls[0] @@ -454,7 +465,7 @@ def create_processing(self, input_output_maps=[]): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_param = proc.processing_metadata['task_param'] @@ -498,7 +509,7 @@ def submit_processing(self, processing): def poll_panda_task_status(self, processing): if 'processing' in processing['processing_metadata']: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] status, task_status = Client.getTaskStatus(proc.workload_id) @@ -530,7 +541,7 @@ def get_processing_status_from_panda_status(self, task_status): return processing_status def get_panda_task_id(self, processing): - from pandatools import Client + from pandaclient import Client start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10) start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') @@ -556,7 +567,7 @@ def get_panda_task_id(self, processing): def poll_panda_task(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: proc = processing['processing_metadata']['processing'] @@ -596,7 +607,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None): def kill_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -609,7 +620,7 @@ def kill_processing(self, processing): def kill_processing_force(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -622,7 +633,7 @@ def kill_processing_force(self, processing): def reactivate_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client # task_id = processing['processing_metadata']['task_id'] proc = processing['processing_metadata']['processing'] task_id = proc.workload_id @@ -774,7 +785,8 @@ def syn_work_status(self, registered_input_output_maps, all_updates_flushed=True self.logger.debug("syn_work_status(%s): has_to_release_inputs: %s" % (str(self.get_processing_ids()), str(self.has_to_release_inputs()))) self.logger.debug("syn_work_status(%s): to_release_input_contents: %s" % (str(self.get_processing_ids()), str(to_release_input_contents))) - if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents: + # if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents: + if self.is_processings_terminated(): # if not self.is_all_outputs_flushed(registered_input_output_maps): if not all_updates_flushed: self.logger.warn("The work processings %s is terminated. but not all outputs are flushed. Wait to flush the outputs then finish the transform" % str(self.get_processing_ids())) diff --git a/atlas/tools/env/environment.yml b/atlas/tools/env/environment.yml index 995a61d0..9f741990 100644 --- a/atlas/tools/env/environment.yml +++ b/atlas/tools/env/environment.yml @@ -9,7 +9,8 @@ dependencies: - flake8 # Wrapper around PyFlakes&pep8 - pytest # python testing tool - nose # nose test tools + - stomp.py - rucio-clients - rucio-clients-atlas - - idds-common==0.10.3 - - idds-workflow==0.10.3 \ No newline at end of file + - idds-common==0.10.5 + - idds-workflow==0.10.5 \ No newline at end of file diff --git a/client/lib/idds/client/version.py b/client/lib/idds/client/version.py index 31e441df..613d7ae0 100644 --- a/client/lib/idds/client/version.py +++ b/client/lib/idds/client/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.5" diff --git a/client/tools/env/environment.yml b/client/tools/env/environment.yml index d161a7e2..fdcb0c92 100644 --- a/client/tools/env/environment.yml +++ b/client/tools/env/environment.yml @@ -14,5 +14,6 @@ dependencies: - pytest # python testing tool - nose # nose test tools - tabulate - - idds-common==0.10.3 - - idds-workflow==0.10.3 \ No newline at end of file + - argcomplete + - idds-common==0.10.5 + - idds-workflow==0.10.5 \ No newline at end of file diff --git a/common/lib/idds/common/config.py b/common/lib/idds/common/config.py index 77192e9a..8d4853c1 100644 --- a/common/lib/idds/common/config.py +++ b/common/lib/idds/common/config.py @@ -35,6 +35,7 @@ def config_has_section(section): . :returns: True/False. """ + __CONFIG = get_config() return __CONFIG.has_section(section) @@ -47,6 +48,7 @@ def config_has_option(section, option): . :returns: True/False. """ + __CONFIG = get_config() return __CONFIG.has_option(section, option) @@ -58,6 +60,7 @@ def config_list_options(section): . :returns: list of (name, value). """ + __CONFIG = get_config() return __CONFIG.items(section) @@ -69,6 +72,7 @@ def config_get(section, option): . :returns: the configuration value. """ + __CONFIG = get_config() return __CONFIG.get(section, option) @@ -80,6 +84,7 @@ def config_get_int(section, option): . :returns: the integer configuration value. """ + __CONFIG = get_config() return __CONFIG.getint(section, option) @@ -91,6 +96,7 @@ def config_get_float(section, option): . :returns: the float configuration value. """ + __CONFIG = get_config() return __CONFIG.getfloat(section, option) @@ -102,6 +108,7 @@ def config_get_bool(section, option): . :returns: the boolean configuration value. """ + __CONFIG = get_config() return __CONFIG.getboolean(section, option) @@ -153,36 +160,38 @@ def get_local_config_value(configuration, section, name, current, default): return value -__CONFIG = ConfigParser.SafeConfigParser() +def get_config(): + __CONFIG = ConfigParser.SafeConfigParser() -__HAS_CONFIG = False -if os.environ.get('IDDS_CONFIG', None): - configfile = os.environ['IDDS_CONFIG'] - if not __CONFIG.read(configfile) == [configfile]: - raise Exception('IDDS_CONFIG is defined as %s, ' % configfile, - 'but could not load configurations from it.') - __HAS_CONFIG = True -else: - configfiles = ['%s/etc/idds/idds.cfg' % os.environ.get('IDDS_HOME', ''), - '/etc/idds/idds.cfg', - '%s/etc/idds/idds.cfg' % os.environ.get('VIRTUAL_ENV', '')] - - for configfile in configfiles: - if __CONFIG.read(configfile) == [configfile]: - __HAS_CONFIG = True - # print("Configuration file %s is used" % configfile) - break - -if not __HAS_CONFIG: - local_cfg = get_local_cfg_file() - if os.path.exists(local_cfg): - __CONFIG.read(local_cfg) + __HAS_CONFIG = False + if os.environ.get('IDDS_CONFIG', None): + configfile = os.environ['IDDS_CONFIG'] + if not __CONFIG.read(configfile) == [configfile]: + raise Exception('IDDS_CONFIG is defined as %s, ' % configfile, + 'but could not load configurations from it.') __HAS_CONFIG = True else: - raise Exception("Could not load configuration file." - "For iDDS client, please run 'idds setup' to create local config file." - "For an iDDS server, IDDS looks for a configuration file, in order:" - "\n\t${IDDS_CONFIG}" - "\n\t${IDDS_HOME}/etc/idds/idds.cfg" - "\n\t/etc/idds/idds.cfg" - "\n\t${VIRTUAL_ENV}/etc/idds/idds.cfg") + configfiles = ['%s/etc/idds/idds.cfg' % os.environ.get('IDDS_HOME', ''), + '/etc/idds/idds.cfg', + '%s/etc/idds/idds.cfg' % os.environ.get('VIRTUAL_ENV', '')] + + for configfile in configfiles: + if __CONFIG.read(configfile) == [configfile]: + __HAS_CONFIG = True + # print("Configuration file %s is used" % configfile) + break + + if not __HAS_CONFIG: + local_cfg = get_local_cfg_file() + if os.path.exists(local_cfg): + __CONFIG.read(local_cfg) + __HAS_CONFIG = True + else: + raise Exception("Could not load configuration file." + "For iDDS client, please run 'idds setup' to create local config file." + "For an iDDS server, IDDS looks for a configuration file, in order:" + "\n\t${IDDS_CONFIG}" + "\n\t${IDDS_HOME}/etc/idds/idds.cfg" + "\n\t/etc/idds/idds.cfg" + "\n\t${VIRTUAL_ENV}/etc/idds/idds.cfg") + return __CONFIG diff --git a/common/lib/idds/common/constants.py b/common/lib/idds/common/constants.py index 4b8a6a98..7cfd4972 100644 --- a/common/lib/idds/common/constants.py +++ b/common/lib/idds/common/constants.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2020 +# - Wen Guan, , 2019 - 2022 """ Constants. @@ -314,6 +314,7 @@ class ProcessingStatus(IDDSEnum): TimeOut = 23 ToFinish = 24 ToForceFinish = 25 + Broken = 26 class ProcessingLocking(IDDSEnum): diff --git a/common/lib/idds/common/exceptions.py b/common/lib/idds/common/exceptions.py index 9fab3d8a..4c54a629 100644 --- a/common/lib/idds/common/exceptions.py +++ b/common/lib/idds/common/exceptions.py @@ -252,3 +252,13 @@ def __init__(self, *args, **kwargs): super(AuthenticationPending, self).__init__(*args, **kwargs) self._message = "Authentication pending." self.error_code = 601 + + +class AuthenticationNotSupported(IDDSException): + """ + Authentication not supported + """ + def __init__(self, *args, **kwargs): + super(AuthenticationNotSupported, self).__init__(*args, **kwargs) + self._message = "Authentication not supported." + self.error_code = 602 diff --git a/common/lib/idds/common/version.py b/common/lib/idds/common/version.py index 31e441df..613d7ae0 100644 --- a/common/lib/idds/common/version.py +++ b/common/lib/idds/common/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.5" diff --git a/doma/lib/idds/doma/version.py b/doma/lib/idds/doma/version.py index c62c8f4f..ee9fe789 100644 --- a/doma/lib/idds/doma/version.py +++ b/doma/lib/idds/doma/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2020 - 2021 -release_version = "0.10.3" +release_version = "0.10.5" diff --git a/doma/lib/idds/doma/workflow/domapandawork.py b/doma/lib/idds/doma/workflow/domapandawork.py index 344298fb..0cf013f9 100644 --- a/doma/lib/idds/doma/workflow/domapandawork.py +++ b/doma/lib/idds/doma/workflow/domapandawork.py @@ -430,7 +430,7 @@ def create_processing(self, input_output_maps=[]): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_param = proc.processing_metadata['task_param'] @@ -462,7 +462,7 @@ def submit_processing(self, processing): proc.submitted_at = datetime.datetime.utcnow() def get_panda_task_id(self, processing): - from pandatools import Client + from pandaclient import Client start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10) start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') @@ -487,7 +487,7 @@ def get_panda_task_id(self, processing): def poll_panda_task_status(self, processing): if 'processing' in processing['processing_metadata']: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] status, task_status = Client.getTaskStatus(proc.workload_id) @@ -641,7 +641,7 @@ def get_update_contents_from_map_id(self, map_id, input_output_maps, job_info): def map_panda_ids(self, unregistered_job_ids, input_output_maps): self.logger.debug("map_panda_ids, unregistered_job_ids[:10]: %s" % str(unregistered_job_ids[:10])) - from pandatools import Client + from pandaclient import Client # updated_map_ids = [] full_update_contents = [] @@ -669,7 +669,7 @@ def map_panda_ids(self, unregistered_job_ids, input_output_maps): def get_status_changed_contents(self, unterminated_job_ids, input_output_maps, panda_id_to_map_ids): self.logger.debug("get_status_changed_contents, unterminated_job_ids[:10]: %s" % str(unterminated_job_ids[:10])) - from pandatools import Client + from pandaclient import Client full_update_contents = [] chunksize = 2000 @@ -698,7 +698,7 @@ def get_final_update_contents(self, input_output_maps): def poll_panda_task_old(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client jobs_ids = None if processing: @@ -769,7 +769,7 @@ def poll_panda_task_old(self, processing=None, input_output_maps=None): def poll_panda_jobs(self, job_ids): job_ids = list(job_ids) self.logger.debug("poll_panda_jobs, poll_panda_jobs_chunk_size: %s, job_ids[:10]: %s" % (self.poll_panda_jobs_chunk_size, str(job_ids[:10]))) - from pandatools import Client + from pandaclient import Client # updated_map_ids = [] inputname_jobid_map = {} @@ -874,7 +874,7 @@ def get_update_contents(self, inputnames, inputname_mapid_map, inputname_jobid_m def poll_panda_task(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: proc = processing['processing_metadata']['processing'] @@ -971,7 +971,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None): def kill_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -984,7 +984,7 @@ def kill_processing(self, processing): def kill_processing_force(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -997,7 +997,7 @@ def kill_processing_force(self, processing): def reactivate_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client # task_id = processing['processing_metadata']['task_id'] proc = processing['processing_metadata']['processing'] task_id = proc.workload_id diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 108734a4..c62b668c 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -76,6 +76,7 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None, # self.logger.setLevel(logging.DEBUG) self.task_name = task_name + self.real_task_name = None self.set_work_name(task_name) self.queue = task_queue self.dep_tasks_id_names_map = {} @@ -452,7 +453,7 @@ def create_processing(self, input_output_maps=[]): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_param = proc.processing_metadata['task_param'] @@ -483,8 +484,20 @@ def submit_processing(self, processing): if task_id: proc.submitted_at = datetime.datetime.utcnow() + def resubmit_processing(self, processing): + proc = processing['processing_metadata']['processing'] + proc.workload_id = None + task_param = proc.processing_metadata['task_param'] + if self.retry_number > 0: + proc.task_name = self.task_name + "_" + str(self.retry_number) + task_param['taskName'] = proc.task_name + task_id = self.submit_panda_task(processing) + proc.workload_id = task_id + if task_id: + proc.submitted_at = datetime.datetime.utcnow() + def get_panda_task_id(self, processing): - from pandatools import Client + from pandaclient import Client start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10) start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') @@ -497,7 +510,10 @@ def get_panda_task_id(self, processing): task_id = None for req_id in results: task_name = results[req_id]['taskName'] - if proc.workload_id is None and task_name == self.task_name: + local_task_name = proc.task_name + if not local_task_name: + local_task_name = self.task_name + if proc.workload_id is None and task_name == local_task_name: task_id = results[req_id]['jediTaskID'] # processing['processing_metadata']['task_id'] = task_id # processing['processing_metadata']['workload_id'] = task_id @@ -509,7 +525,7 @@ def get_panda_task_id(self, processing): def poll_panda_task_status(self, processing): if 'processing' in processing['processing_metadata']: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] status, task_status = Client.getTaskStatus(proc.workload_id) @@ -531,9 +547,11 @@ def get_processing_status_from_panda_status(self, task_status): elif task_status in ['finished', 'paused']: # finished, finishing, waiting it to be done processing_status = ProcessingStatus.SubFinished - elif task_status in ['failed', 'aborted', 'broken', 'exhausted']: + elif task_status in ['failed', 'aborted', 'exhausted']: # aborting, tobroken processing_status = ProcessingStatus.Failed + elif task_status in ['broken']: + processing_status = ProcessingStatus.Broken else: # finished, finishing, aborting, topreprocess, preprocessing, tobroken # toretry, toincexec, rerefine, paused, throttled, passed @@ -661,16 +679,37 @@ def get_update_contents_from_map_id(self, map_id, input_output_maps, job_info): update_contents.append(content) return update_contents + def get_panda_job_status(self, jobids): + self.logger.debug("get_panda_job_status, jobids[:10]: %s" % str(jobids[:10])) + from pandaclient import Client + ret = Client.getJobStatus(jobids, verbose=0) + if ret[0] == 0: + left_jobids = [] + ret_jobs = [] + jobs_list = ret[1] + for jobid, jobinfo in zip(jobids, jobs_list): + if jobinfo is None: + left_jobids.append(jobid) + else: + ret_jobs.append(jobinfo) + if left_jobids: + ret1 = Client.getFullJobStatus(ids=left_jobids, verbose=False) + if ret1[0] == 0: + left_jobs_list = ret1[1] + ret_jobs = ret_jobs + left_jobs_list + return ret_jobs + return [] + def map_panda_ids(self, unregistered_job_ids, input_output_maps): self.logger.debug("map_panda_ids, unregistered_job_ids[:10]: %s" % str(unregistered_job_ids[:10])) - from pandatools import Client # updated_map_ids = [] full_update_contents = [] chunksize = 2000 chunks = [unregistered_job_ids[i:i + chunksize] for i in range(0, len(unregistered_job_ids), chunksize)] for chunk in chunks: - jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + # jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + jobs_list = self.get_panda_job_status(chunk) for job_info in jobs_list: if job_info and job_info.Files and len(job_info.Files) > 0: for job_file in job_info.Files: @@ -691,13 +730,13 @@ def map_panda_ids(self, unregistered_job_ids, input_output_maps): def get_status_changed_contents(self, unterminated_job_ids, input_output_maps, panda_id_to_map_ids): self.logger.debug("get_status_changed_contents, unterminated_job_ids[:10]: %s" % str(unterminated_job_ids[:10])) - from pandatools import Client full_update_contents = [] chunksize = 2000 chunks = [unterminated_job_ids[i:i + chunksize] for i in range(0, len(unterminated_job_ids), chunksize)] for chunk in chunks: - jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + # jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + jobs_list = self.get_panda_job_status(chunk) for job_info in jobs_list: panda_id = job_info.PandaID map_id = panda_id_to_map_ids[panda_id] @@ -720,7 +759,7 @@ def get_final_update_contents(self, input_output_maps): def poll_panda_task_old(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client jobs_ids = None if processing: @@ -791,14 +830,14 @@ def poll_panda_task_old(self, processing=None, input_output_maps=None): def poll_panda_jobs(self, job_ids): job_ids = list(job_ids) self.logger.debug("poll_panda_jobs, poll_panda_jobs_chunk_size: %s, job_ids[:10]: %s" % (self.poll_panda_jobs_chunk_size, str(job_ids[:10]))) - from pandatools import Client # updated_map_ids = [] inputname_jobid_map = {} chunksize = self.poll_panda_jobs_chunk_size chunks = [job_ids[i:i + chunksize] for i in range(0, len(job_ids), chunksize)] for chunk in chunks: - jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + # jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + jobs_list = self.get_panda_job_status(chunk) if jobs_list: self.logger.debug("poll_panda_jobs, input jobs: %s, output_jobs: %s" % (len(chunk), len(jobs_list))) for job_info in jobs_list: @@ -896,7 +935,7 @@ def get_update_contents(self, inputnames, inputname_mapid_map, inputname_jobid_m def poll_panda_task(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: proc = processing['processing_metadata']['processing'] @@ -922,7 +961,15 @@ def poll_panda_task(self, processing=None, input_output_maps=None): self.reactivate_processing(processing) processing_status = ProcessingStatus.Submitted self.retry_number += 1 - + if processing_status in [ProcessingStatus.Broken]: + self.logger.error("poll_panda_task, task_id: %s is broken. retry_number: %s, num_retries: %s" % (str(task_id), self.retry_number, self.num_retries)) + if self.num_retries == 0: + self.num_retries = 1 + if self.retry_number < self.num_retries: + self.retry_number += 1 + self.logger.error("poll_panda_task, task_id: %s is broken. resubmit the task. retry_number: %s, num_retries: %s" % (str(task_id), self.retry_number, self.num_retries)) + self.resubmit_processing(processing) + return ProcessingStatus.Submitting, [] all_jobs_ids = task_info['PandaID'] terminated_jobs, inputname_mapid_map = self.get_job_maps(input_output_maps) @@ -993,7 +1040,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None): def kill_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -1006,7 +1053,7 @@ def kill_processing(self, processing): def kill_processing_force(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -1019,7 +1066,7 @@ def kill_processing_force(self, processing): def reactivate_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client # task_id = processing['processing_metadata']['task_id'] proc = processing['processing_metadata']['processing'] task_id = proc.workload_id @@ -1053,8 +1100,8 @@ def poll_processing_updates(self, processing, input_output_maps): proc.polling_retries = 0 elif proc.tosuspend: self.logger.info("Suspending processing (processing id: %s, jediTaskId: %s)" % (processing['processing_id'], proc.workload_id)) - self.kill_processing_force(processing) - # self.kill_processing(processing) + # self.kill_processing_force(processing) + self.kill_processing(processing) proc.tosuspend = False proc.polling_retries = 0 elif proc.toresume: diff --git a/doma/tools/env/environment.yml b/doma/tools/env/environment.yml index e8cd026f..a831698b 100644 --- a/doma/tools/env/environment.yml +++ b/doma/tools/env/environment.yml @@ -10,5 +10,5 @@ dependencies: - pytest # python testing tool - nose # nose test tools - panda-client # panda client - - idds-common==0.10.3 - - idds-workflow==0.10.3 \ No newline at end of file + - idds-common==0.10.5 + - idds-workflow==0.10.5 \ No newline at end of file diff --git a/main/etc/idds/auth/auth.cfg.template b/main/etc/idds/auth/auth.cfg.template index f1a3b246..d66d6fdd 100644 --- a/main/etc/idds/auth/auth.cfg.template +++ b/main/etc/idds/auth/auth.cfg.template @@ -1,5 +1,12 @@ [common] -allow_vos = panda_dev,Rubin,Rubin:production +allow_vos = atlas,panda_dev,Rubin,Rubin:production + +[atlas] +client_secret = <> +audience = https://pandaserver-doma.cern.ch +client_id = <> +oidc_config_url = https://panda-iam-doma.cern.ch/.well-known/openid-configuration +vo = atlas [panda_dev] client_secret = <> diff --git a/main/lib/idds/agents/carrier/carrier.py b/main/lib/idds/agents/carrier/carrier.py index a8604c2a..e868497b 100644 --- a/main/lib/idds/agents/carrier/carrier.py +++ b/main/lib/idds/agents/carrier/carrier.py @@ -404,6 +404,9 @@ def process_running_processing(self, processing): if not processing['submitted_at'] or processing['submitted_at'] < proc.submitted_at: processing_update['parameters']['submitted_at'] = proc.submitted_at + if proc.workload_id: + processing_update['parameters']['workload_id'] = proc.workload_id + processing_update['parameters']['next_poll_at'] = next_poll_at # processing_update['parameters']['expired_at'] = work.get_expired_at(processing) processing_update['parameters']['processing_metadata'] = processing['processing_metadata'] diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index e2e1f208..83349aa2 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -167,7 +167,7 @@ def process_new_request(self, req): ret_req = {'request_id': req['request_id'], 'parameters': {'status': RequestStatus.Failed, 'locking': RequestLocking.Idle, - 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=900)}}} + 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=800)}}} return ret_req def process_new_requests(self): @@ -425,7 +425,7 @@ def process_running_request_real(self, req): 'locking': RequestLocking.Idle, 'next_poll_at': next_poll_at, 'request_metadata': req['request_metadata'], - 'errors': {'msg': truncate_string(req_msg, 900)}} + 'errors': {'msg': truncate_string(req_msg, 800)}} new_messages = [] if req_status == RequestStatus.ToExpire: @@ -479,7 +479,7 @@ def process_running_request_message(self, req, messages): ret_req = {'request_id': req['request_id'], 'parameters': {'status': RequestStatus.Failed, 'locking': RequestLocking.Idle, - 'errors': {'msg': '%s: %s' % (ex, traceback.format_exc())}}} + 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=800)}}} return ret_req def release_inputs(self, request_id): @@ -509,7 +509,7 @@ def process_running_request(self, req): ret_req = {'request_id': req['request_id'], 'parameters': {'status': RequestStatus.Failed, 'locking': RequestLocking.Idle, - 'errors': {'msg': '%s: %s' % (ex, traceback.format_exc())}}} + 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=800)}}} return ret_req def process_operating_request_real(self, req): @@ -580,7 +580,7 @@ def process_operating_request(self, req): ret_req = {'request_id': req['request_id'], 'parameters': {'status': RequestStatus.Failed, 'locking': RequestLocking.Idle, - 'errors': {'msg': '%s: %s' % (ex, traceback.format_exc())}}} + 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=800)}}} return ret_req def process_running_requests(self): diff --git a/main/lib/idds/rest/v1/auth.py b/main/lib/idds/rest/v1/auth.py index 88f9a7d6..098b5401 100644 --- a/main/lib/idds/rest/v1/auth.py +++ b/main/lib/idds/rest/v1/auth.py @@ -42,7 +42,7 @@ def get(self, vo, auth_type='oidc'): else: raise exceptions.IDDSException("Failed to get oidc sign url: %s" % str(sign_url)) else: - raise exceptions.NotSupportedAuthentication("auth_type %s is not supported." % str(auth_type)) + raise exceptions.AuthenticationNotSupported("auth_type %s is not supported to call this function." % str(auth_type)) except exceptions.NoObject as error: return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error) except exceptions.IDDSException as error: diff --git a/main/lib/idds/tests/core_tests.py b/main/lib/idds/tests/core_tests.py index 1b116ad6..0ab3e32c 100644 --- a/main/lib/idds/tests/core_tests.py +++ b/main/lib/idds/tests/core_tests.py @@ -104,7 +104,12 @@ def show_works(req): # 283511, 283517 # reqs = get_requests(request_id=599, with_detail=True, with_metadata=True) -reqs = get_requests(request_id=283511, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=283511, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=298163, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=298557, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=299111, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=299235, with_request=True, with_detail=False, with_metadata=True) +reqs = get_requests(request_id=965, with_request=True, with_detail=False, with_metadata=True) for req in reqs: # print(req['request_id']) # print(rets) @@ -132,7 +137,9 @@ def show_works(req): """ -tfs = get_transforms(request_id=241) +""" +# tfs = get_transforms(request_id=241) +tfs = get_transforms(transform_id=176320) for tf in tfs: # print(tf) # print(tf['transform_metadata']['work'].to_dict()) @@ -140,6 +147,7 @@ def show_works(req): pass sys.exit(0) +""" """ msgs = retrieve_messages(workload_id=25972557) @@ -160,7 +168,8 @@ def show_works(req): sys.exit(0) """ -prs = get_processings(request_id=219) +# prs = get_processings(request_id=219) +prs = get_processings(transform_id=176320) i = 0 for pr in prs: # if pr['request_id'] == 91: diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index c1e20d4e..67413795 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -6,7 +6,7 @@ os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda' os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda' -from pandatools import Client # noqa E402 +from pandaclient import Client # noqa E402 """ jobids = [1408118] @@ -29,13 +29,37 @@ print(f.type) """ -jediTaskID = 8378 +jediTaskID = 10517 # 10607 +jediTaskID = 10607 ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) -ret = Client.getTaskStatus(jediTaskID, verbose=False) +# ret = Client.getTaskStatus(jediTaskID, verbose=False) +# print(ret) + +task_info = ret[1] +jobids = task_info['PandaID'] +ret = Client.getJobStatus(ids=jobids, verbose=False) print(ret) +if ret[0] == 0: + jobs = ret[1] + left_jobids = [] + ret_jobs = [] + print(len(jobs)) + for jobid, jobinfo in zip(jobids, jobs): + if jobinfo is None: + left_jobids.append(jobid) + else: + ret_jobs.append(jobinfo) + if left_jobids: + print(len(left_jobids)) + ret = Client.getFullJobStatus(ids=left_jobids, verbose=False) + print(ret) + print(len(ret[1])) + ret_jobs = ret_jobs + ret[1] + print(len(ret_jobs)) + sys.exit(0) """ @@ -104,7 +128,7 @@ # site = newOpts.get('site', None) # excludedSite = newOpts.get('excludedSite', None) # for JEDI -taskIDs = [7056, 7057] +taskIDs = [10624] for taskID in taskIDs: status, out = Client.retryTask(taskID, verbose=True, properErrorCode=True, newParams=newOpts) print(status) diff --git a/main/lib/idds/tests/test_activelearning.py b/main/lib/idds/tests/test_activelearning.py index 8ef1b0af..7303eafc 100644 --- a/main/lib/idds/tests/test_activelearning.py +++ b/main/lib/idds/tests/test_activelearning.py @@ -22,7 +22,7 @@ except ImportError: from urllib.parse import quote -from pandatools import Client +from pandaclient import Client # from idds.client.client import Client from idds.client.clientmanager import ClientManager @@ -120,7 +120,7 @@ def test_panda_work(): # print('output_collections: %s' % str(test_work.get_output_collections())) # print(json_dumps(test_work, sort_keys=True, indent=4)) - # from pandatools import Client + # from pandaclient import Client # Client.getJediTaskDetails(taskDict,fullFlag,withTaskInfo,verbose=False) # ret = Client.getJediTaskDetails({'jediTaskID': panda_task_id},False,True) # print(ret) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 04a90bf1..bc3c778e 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -140,6 +140,7 @@ def setup_workflow(): output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}], log_collections=[], dependency_map=taskN2.dependencies, task_name=taskN2.name, task_queue=task_queue, + encode_command_line=True, task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", @@ -152,6 +153,7 @@ def setup_workflow(): output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}], log_collections=[], dependency_map=taskN3.dependencies, task_name=taskN3.name, task_queue=task_queue, + encode_command_line=True, task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", diff --git a/main/lib/idds/tests/test_domapanda_workflow.py b/main/lib/idds/tests/test_domapanda_workflow.py index 5c78a3f3..be0b7347 100644 --- a/main/lib/idds/tests/test_domapanda_workflow.py +++ b/main/lib/idds/tests/test_domapanda_workflow.py @@ -136,6 +136,7 @@ def setup_workflow(): output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}], log_collections=[], dependency_map=taskN2.dependencies, task_name=taskN2.name, task_queue=task_queue, + encode_command_line=True, task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", @@ -148,6 +149,7 @@ def setup_workflow(): output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}], log_collections=[], dependency_map=taskN3.dependencies, task_name=taskN3.name, task_queue=task_queue, + encode_command_line=True, task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index 0f0181df..c89c7bad 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -29,9 +29,9 @@ def migrate(): # atlas atlas_host = 'https://aipanda181.cern.ch:443/idds' # noqa F841 - cm1 = ClientManager(host=dev_host) + cm1 = ClientManager(host=atlas_host) # reqs = cm1.get_requests(request_id=290) - old_request_id = 241 + old_request_id = 298163 # for old_request_id in [152]: # for old_request_id in [60]: # noqa E115 # for old_request_id in [200]: # noqa E115 @@ -43,6 +43,7 @@ def migrate(): print("num requests: %s" % len(reqs)) for req in reqs[:1]: + # print(req) req = convert_old_req_2_workflow_req(req) workflow = req['request_metadata']['workflow'] workflow.clean_works() diff --git a/main/lib/idds/tests/trigger_release.py b/main/lib/idds/tests/trigger_release.py index b5d74c34..368c6bfd 100644 --- a/main/lib/idds/tests/trigger_release.py +++ b/main/lib/idds/tests/trigger_release.py @@ -12,7 +12,7 @@ request_ids = [368, 369, 370, 371, 372, 373, 374, 375, 376] -request_ids = [475] +request_ids = [902] for request_id in request_ids: contents = get_contents(request_id=request_id, status=ContentStatus.Available) ret_contents = {} @@ -23,12 +23,12 @@ ret_contents[content['coll_id']].append(content) for ret_content in ret_contents: - print(ret_content) - break + print("coll_id: %s, num_contents: %s" % (ret_content, len(ret_contents[ret_content]))) + # break updated_contents = core_transforms.release_inputs_by_collection(ret_contents) for update_content in updated_contents: print(update_content) # break - update_contents(updated_contents) + # update_contents(updated_contents) diff --git a/main/lib/idds/version.py b/main/lib/idds/version.py index 31e441df..613d7ae0 100644 --- a/main/lib/idds/version.py +++ b/main/lib/idds/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.5" diff --git a/main/tools/env/environment.yml b/main/tools/env/environment.yml index 6b376940..29b8c900 100644 --- a/main/tools/env/environment.yml +++ b/main/tools/env/environment.yml @@ -24,6 +24,6 @@ dependencies: - sphinx-rtd-theme # sphinx readthedoc theme - nevergrad # nevergrad hyper parameter optimization - psycopg2-binary - - idds-common==0.10.3 - - idds-workflow==0.10.3 - - idds-client==0.10.3 \ No newline at end of file + - idds-common==0.10.5 + - idds-workflow==0.10.5 + - idds-client==0.10.5 \ No newline at end of file diff --git a/monitor/conf.js b/monitor/conf.js index ef50b781..74571ae7 100644 --- a/monitor/conf.js +++ b/monitor/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus708.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus708.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus708.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus708.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus708.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus708.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus785.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus785.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus785.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus785.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus785.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus785.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/monitor/version.py b/monitor/version.py index 31e441df..613d7ae0 100644 --- a/monitor/version.py +++ b/monitor/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.5" diff --git a/website/version.py b/website/version.py index 31e441df..613d7ae0 100644 --- a/website/version.py +++ b/website/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.5" diff --git a/workflow/lib/idds/workflow/version.py b/workflow/lib/idds/workflow/version.py index 31e441df..613d7ae0 100644 --- a/workflow/lib/idds/workflow/version.py +++ b/workflow/lib/idds/workflow/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.5" diff --git a/workflow/lib/idds/workflowv2/work.py b/workflow/lib/idds/workflowv2/work.py index b90c6f24..1a3095ff 100644 --- a/workflow/lib/idds/workflowv2/work.py +++ b/workflow/lib/idds/workflowv2/work.py @@ -152,6 +152,7 @@ def __init__(self, processing_metadata={}): self.processing = None self.internal_id = str(uuid.uuid4())[:8] + self.task_name = None self.processing_id = None self.workload_id = None self.status = ProcessingStatus.New @@ -368,6 +369,14 @@ def external_id(self): def external_id(self, value): self.add_metadata_item('external_id', value) + @property + def task_name(self): + return self.get_metadata_item('task_name', None) + + @task_name.setter + def task_name(self, value): + self.add_metadata_item('task_name', value) + @property def processing(self): return self._processing @@ -1515,6 +1524,9 @@ def get_output_collections(self): keys = self._other_output_collections return [self.collections[k] for k in keys] + def get_collections(self): + return [self.collections[k] for k in self.collections.keys()] + def is_input_collections_closed(self): colls = self.get_input_collections() for coll in colls: diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index cb6bed5d..61731976 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -227,7 +227,8 @@ def to_dict(self): elif isinstance(w, CompositeCondition): new_w = w.to_dict() elif isinstance(w, Workflow): - new_w = w.to_dict() + # new_w = w.to_dict() + new_w = w.get_internal_id() else: new_w = w new_value.append(new_w) @@ -247,12 +248,18 @@ def load_conditions(self, works): new_conditions.append(cond) else: if 'idds_method' in cond and 'idds_method_internal_id' in cond: + self.logger.debug("idds_method_internal_id: %s" % cond['idds_method_internal_id']) + self.logger.debug("idds_method: %s" % cond['idds_method']) + internal_id = cond['idds_method_internal_id'] work = self.get_work_from_id(internal_id, works) + + self.logger.debug("get_work_from_id: %s: [%s]" % (internal_id, [work])) + if work is not None: new_cond = getattr(work, cond['idds_method']) else: - self.logger.error("Work cannot be found for %s" % (internal_id)) + self.logger.error("Condition method work cannot be found for %s" % (internal_id)) new_cond = cond elif 'idds_attribute' in cond and 'idds_method_internal_id' in cond: internal_id = cond['idds_method_internal_id'] @@ -260,7 +267,7 @@ def load_conditions(self, works): if work is not None: new_cond = getattr(work, cond['idds_attribute']) else: - self.logger.error("Work cannot be found for %s" % (internal_id)) + self.logger.error("Condition attribute work cannot be found for %s" % (internal_id)) new_cond = cond elif 'idds_method' in cond and 'idds_method_condition' in cond: new_cond = cond['idds_method_condition'] @@ -271,37 +278,46 @@ def load_conditions(self, works): self.conditions = new_conditions new_true_works = [] + self.logger.debug("true_works: %s" % str(self.true_works)) + for w in self.true_works: + # self.logger.debug("true_work: %s" % str(w)) if isinstance(w, CompositeCondition): # work = w.load_conditions(works, works_template) w.load_conditions(works) work = w elif isinstance(w, Workflow): work = w + elif isinstance(w, Work): + work = w elif type(w) in [str]: work = self.get_work_from_id(w, works) if work is None: - self.logger.error("Work cannot be found for %s" % str(w)) + self.logger.error("True work cannot be found for %s" % str(w)) work = w else: - self.logger.error("Work cannot be found for %s" % str(w)) + self.logger.error("True work cannot be found for type(%s): %s" % (type(w), str(w))) work = w new_true_works.append(work) self.true_works = new_true_works new_false_works = [] for w in self.false_works: - if isinstance(w, CompositeCondition) or isinstance(w, Workflow): + if isinstance(w, CompositeCondition): # work = w.load_condtions(works, works_template) w.load_conditions(works) work = w + elif isinstance(w, Workflow): + work = w + elif isinstance(w, Work): + work = w elif type(w) in [str]: work = self.get_work_from_id(w, works) if work is None: - self.logger.error("Work cannot be found for %s" % str(w)) + self.logger.error("False work cannot be found for type(%s): %s" % (type(w), str(w))) work = w else: - self.logger.error("Work cannot be found for %s" % str(w)) + self.logger.error("False work cannot be found for %s" % str(w)) work = w new_false_works.append(work) self.false_works = new_false_works @@ -610,6 +626,8 @@ def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None self.works = {} self.work_sequence = {} # order list + self.next_works = [] + self.terminated_works = [] self.initial_works = [] # if the primary initial_work is not set, it's the first initial work. @@ -796,6 +814,14 @@ def load_works(self): if work.last_updated_at and (not self.last_updated_at or work.last_updated_at > self.last_updated_at): self.last_updated_at = work.last_updated_at + @property + def next_works(self): + return self.get_metadata_item('next_works', []) + + @next_works.setter + def next_works(self, value): + self.add_metadata_item('next_works', value) + @property def conditions(self): return self._conditions @@ -1262,6 +1288,11 @@ def load_parameter_links(self): if p_id in p_metadata: self.parameter_links[p_id].metadata = p_metadata[p_id] + def add_next_work(self, work_id): + next_works = self.next_works + next_works.append(work_id) + self.next_works = next_works + def enable_next_works(self, work, cond): self.log_debug("Checking Work %s condition: %s" % (work.get_internal_id(), json_dumps(cond, sort_keys=True, indent=4))) @@ -1808,6 +1839,9 @@ def __deepcopy__(self, memo): result.logger = logger return result + def get_template_id(self): + return self.template.get_template_id() + @property def metadata(self): run_metadata = {'parent_num_run': self.parent_num_run, @@ -1821,6 +1855,7 @@ def metadata(self): @metadata.setter def metadata(self, value): + self.template.load_metadata() run_metadata = value self.parent_num_run = run_metadata['parent_num_run'] self._num_run = run_metadata['num_run'] @@ -1845,6 +1880,12 @@ def independent_works(self, value): self.runs[str(self.num_run)].independent_works = value self.template.independent_works = value + def add_next_work(self, work_id): + if self.runs: + self.runs[str(self.num_run)].add_next_work(work_id) + else: + raise Exception("There are no runs. It should not have next work") + @property def last_updated_at(self): if self.runs: diff --git a/workflow/tools/env/environment.yml b/workflow/tools/env/environment.yml index a4792d34..b5f5172b 100644 --- a/workflow/tools/env/environment.yml +++ b/workflow/tools/env/environment.yml @@ -8,4 +8,4 @@ dependencies: - flake8 # Wrapper around PyFlakes&pep8 - pytest # python testing tool - nose # nose test tools - - idds-common==0.10.3 \ No newline at end of file + - idds-common==0.10.5 \ No newline at end of file