From 55440c214f16658d2d1aacd30f1196284c7bc5e0 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 16 Mar 2022 13:07:05 +0100 Subject: [PATCH 01/14] fix poll panda jobs --- .../lib/idds/doma/workflowv2/domapandawork.py | 33 +++++++++++++++---- main/lib/idds/tests/panda_test.py | 21 +++++++++++- monitor/conf.js | 12 +++---- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 44242dc2..c62b668c 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -679,16 +679,37 @@ def get_update_contents_from_map_id(self, map_id, input_output_maps, job_info): update_contents.append(content) return update_contents + def get_panda_job_status(self, jobids): + self.logger.debug("get_panda_job_status, jobids[:10]: %s" % str(jobids[:10])) + from pandaclient import Client + ret = Client.getJobStatus(jobids, verbose=0) + if ret[0] == 0: + left_jobids = [] + ret_jobs = [] + jobs_list = ret[1] + for jobid, jobinfo in zip(jobids, jobs_list): + if jobinfo is None: + left_jobids.append(jobid) + else: + ret_jobs.append(jobinfo) + if left_jobids: + ret1 = Client.getFullJobStatus(ids=left_jobids, verbose=False) + if ret1[0] == 0: + left_jobs_list = ret1[1] + ret_jobs = ret_jobs + left_jobs_list + return ret_jobs + return [] + def map_panda_ids(self, unregistered_job_ids, input_output_maps): self.logger.debug("map_panda_ids, unregistered_job_ids[:10]: %s" % str(unregistered_job_ids[:10])) - from pandaclient import Client # updated_map_ids = [] full_update_contents = [] chunksize = 2000 chunks = [unregistered_job_ids[i:i + chunksize] for i in range(0, len(unregistered_job_ids), chunksize)] for chunk in chunks: - jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + # jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + jobs_list = self.get_panda_job_status(chunk) for job_info in jobs_list: if job_info and job_info.Files and len(job_info.Files) > 0: for job_file in job_info.Files: @@ -709,13 +730,13 @@ def map_panda_ids(self, unregistered_job_ids, input_output_maps): def get_status_changed_contents(self, unterminated_job_ids, input_output_maps, panda_id_to_map_ids): self.logger.debug("get_status_changed_contents, unterminated_job_ids[:10]: %s" % str(unterminated_job_ids[:10])) - from pandaclient import Client full_update_contents = [] chunksize = 2000 chunks = [unterminated_job_ids[i:i + chunksize] for i in range(0, len(unterminated_job_ids), chunksize)] for chunk in chunks: - jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + # jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + jobs_list = self.get_panda_job_status(chunk) for job_info in jobs_list: panda_id = job_info.PandaID map_id = panda_id_to_map_ids[panda_id] @@ -809,14 +830,14 @@ def poll_panda_task_old(self, processing=None, input_output_maps=None): def poll_panda_jobs(self, job_ids): job_ids = list(job_ids) self.logger.debug("poll_panda_jobs, poll_panda_jobs_chunk_size: %s, job_ids[:10]: %s" % (self.poll_panda_jobs_chunk_size, str(job_ids[:10]))) - from pandaclient import Client # updated_map_ids = [] inputname_jobid_map = {} chunksize = self.poll_panda_jobs_chunk_size chunks = [job_ids[i:i + chunksize] for i in range(0, len(job_ids), chunksize)] for chunk in chunks: - jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + # jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + jobs_list = self.get_panda_job_status(chunk) if jobs_list: self.logger.debug("poll_panda_jobs, input jobs: %s, output_jobs: %s" % (len(chunk), len(jobs_list))) for job_info in jobs_list: diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 7732c68c..67413795 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -29,7 +29,8 @@ print(f.type) """ -jediTaskID = 10517 # 10607 +jediTaskID = 10517 # 10607 +jediTaskID = 10607 ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) @@ -41,6 +42,24 @@ ret = Client.getJobStatus(ids=jobids, verbose=False) print(ret) +if ret[0] == 0: + jobs = ret[1] + left_jobids = [] + ret_jobs = [] + print(len(jobs)) + for jobid, jobinfo in zip(jobids, jobs): + if jobinfo is None: + left_jobids.append(jobid) + else: + ret_jobs.append(jobinfo) + if left_jobids: + print(len(left_jobids)) + ret = Client.getFullJobStatus(ids=left_jobids, verbose=False) + print(ret) + print(len(ret[1])) + ret_jobs = ret_jobs + ret[1] + print(len(ret_jobs)) + sys.exit(0) """ diff --git a/monitor/conf.js b/monitor/conf.js index ef50b781..2f61fb7e 100644 --- a/monitor/conf.js +++ b/monitor/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus708.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus708.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus708.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus708.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus708.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus708.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus733.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus733.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus733.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus733.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus733.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus733.cern.ch:443/idds/monitor/null/null/false/false/true" } From 59bf823410c3633947db94ed45a599c2cab82e65 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 22 Mar 2022 12:15:39 +0100 Subject: [PATCH 02/14] fix to truncate too long error msg --- main/lib/idds/agents/clerk/clerk.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index e2e1f208..83349aa2 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -167,7 +167,7 @@ def process_new_request(self, req): ret_req = {'request_id': req['request_id'], 'parameters': {'status': RequestStatus.Failed, 'locking': RequestLocking.Idle, - 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=900)}}} + 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=800)}}} return ret_req def process_new_requests(self): @@ -425,7 +425,7 @@ def process_running_request_real(self, req): 'locking': RequestLocking.Idle, 'next_poll_at': next_poll_at, 'request_metadata': req['request_metadata'], - 'errors': {'msg': truncate_string(req_msg, 900)}} + 'errors': {'msg': truncate_string(req_msg, 800)}} new_messages = [] if req_status == RequestStatus.ToExpire: @@ -479,7 +479,7 @@ def process_running_request_message(self, req, messages): ret_req = {'request_id': req['request_id'], 'parameters': {'status': RequestStatus.Failed, 'locking': RequestLocking.Idle, - 'errors': {'msg': '%s: %s' % (ex, traceback.format_exc())}}} + 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=800)}}} return ret_req def release_inputs(self, request_id): @@ -509,7 +509,7 @@ def process_running_request(self, req): ret_req = {'request_id': req['request_id'], 'parameters': {'status': RequestStatus.Failed, 'locking': RequestLocking.Idle, - 'errors': {'msg': '%s: %s' % (ex, traceback.format_exc())}}} + 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=800)}}} return ret_req def process_operating_request_real(self, req): @@ -580,7 +580,7 @@ def process_operating_request(self, req): ret_req = {'request_id': req['request_id'], 'parameters': {'status': RequestStatus.Failed, 'locking': RequestLocking.Idle, - 'errors': {'msg': '%s: %s' % (ex, traceback.format_exc())}}} + 'errors': {'msg': truncate_string('%s: %s' % (ex, traceback.format_exc()), length=800)}}} return ret_req def process_running_requests(self): From 789fd5ddaaafc731cbf0a8bb20dcb15ff7e12875 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 23 Mar 2022 23:03:36 +0100 Subject: [PATCH 03/14] add argcomplete for client --- client/tools/env/environment.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/tools/env/environment.yml b/client/tools/env/environment.yml index a72d102b..fcea8065 100644 --- a/client/tools/env/environment.yml +++ b/client/tools/env/environment.yml @@ -14,5 +14,6 @@ dependencies: - pytest # python testing tool - nose # nose test tools - tabulate + - argcomplete - idds-common==0.10.4 - - idds-workflow==0.10.4 \ No newline at end of file + - idds-workflow==0.10.4 From c6ab976cda748a7ab00e6054335282eee84990c5 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 24 Mar 2022 14:10:14 +0100 Subject: [PATCH 04/14] fix to avoid import errors when no idds cfg is defined --- common/lib/idds/common/config.py | 69 ++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/common/lib/idds/common/config.py b/common/lib/idds/common/config.py index 77192e9a..8d4853c1 100644 --- a/common/lib/idds/common/config.py +++ b/common/lib/idds/common/config.py @@ -35,6 +35,7 @@ def config_has_section(section): . :returns: True/False. """ + __CONFIG = get_config() return __CONFIG.has_section(section) @@ -47,6 +48,7 @@ def config_has_option(section, option): . :returns: True/False. """ + __CONFIG = get_config() return __CONFIG.has_option(section, option) @@ -58,6 +60,7 @@ def config_list_options(section): . :returns: list of (name, value). """ + __CONFIG = get_config() return __CONFIG.items(section) @@ -69,6 +72,7 @@ def config_get(section, option): . :returns: the configuration value. """ + __CONFIG = get_config() return __CONFIG.get(section, option) @@ -80,6 +84,7 @@ def config_get_int(section, option): . :returns: the integer configuration value. """ + __CONFIG = get_config() return __CONFIG.getint(section, option) @@ -91,6 +96,7 @@ def config_get_float(section, option): . :returns: the float configuration value. """ + __CONFIG = get_config() return __CONFIG.getfloat(section, option) @@ -102,6 +108,7 @@ def config_get_bool(section, option): . :returns: the boolean configuration value. """ + __CONFIG = get_config() return __CONFIG.getboolean(section, option) @@ -153,36 +160,38 @@ def get_local_config_value(configuration, section, name, current, default): return value -__CONFIG = ConfigParser.SafeConfigParser() +def get_config(): + __CONFIG = ConfigParser.SafeConfigParser() -__HAS_CONFIG = False -if os.environ.get('IDDS_CONFIG', None): - configfile = os.environ['IDDS_CONFIG'] - if not __CONFIG.read(configfile) == [configfile]: - raise Exception('IDDS_CONFIG is defined as %s, ' % configfile, - 'but could not load configurations from it.') - __HAS_CONFIG = True -else: - configfiles = ['%s/etc/idds/idds.cfg' % os.environ.get('IDDS_HOME', ''), - '/etc/idds/idds.cfg', - '%s/etc/idds/idds.cfg' % os.environ.get('VIRTUAL_ENV', '')] - - for configfile in configfiles: - if __CONFIG.read(configfile) == [configfile]: - __HAS_CONFIG = True - # print("Configuration file %s is used" % configfile) - break - -if not __HAS_CONFIG: - local_cfg = get_local_cfg_file() - if os.path.exists(local_cfg): - __CONFIG.read(local_cfg) + __HAS_CONFIG = False + if os.environ.get('IDDS_CONFIG', None): + configfile = os.environ['IDDS_CONFIG'] + if not __CONFIG.read(configfile) == [configfile]: + raise Exception('IDDS_CONFIG is defined as %s, ' % configfile, + 'but could not load configurations from it.') __HAS_CONFIG = True else: - raise Exception("Could not load configuration file." - "For iDDS client, please run 'idds setup' to create local config file." - "For an iDDS server, IDDS looks for a configuration file, in order:" - "\n\t${IDDS_CONFIG}" - "\n\t${IDDS_HOME}/etc/idds/idds.cfg" - "\n\t/etc/idds/idds.cfg" - "\n\t${VIRTUAL_ENV}/etc/idds/idds.cfg") + configfiles = ['%s/etc/idds/idds.cfg' % os.environ.get('IDDS_HOME', ''), + '/etc/idds/idds.cfg', + '%s/etc/idds/idds.cfg' % os.environ.get('VIRTUAL_ENV', '')] + + for configfile in configfiles: + if __CONFIG.read(configfile) == [configfile]: + __HAS_CONFIG = True + # print("Configuration file %s is used" % configfile) + break + + if not __HAS_CONFIG: + local_cfg = get_local_cfg_file() + if os.path.exists(local_cfg): + __CONFIG.read(local_cfg) + __HAS_CONFIG = True + else: + raise Exception("Could not load configuration file." + "For iDDS client, please run 'idds setup' to create local config file." + "For an iDDS server, IDDS looks for a configuration file, in order:" + "\n\t${IDDS_CONFIG}" + "\n\t${IDDS_HOME}/etc/idds/idds.cfg" + "\n\t/etc/idds/idds.cfg" + "\n\t${VIRTUAL_ENV}/etc/idds/idds.cfg") + return __CONFIG From cecba18230f37c7c0cf6537def263166426a37e7 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 24 Mar 2022 18:21:50 +0100 Subject: [PATCH 05/14] fix to finish atlas panda tasks --- atlas/lib/idds/atlas/workflowv2/atlaspandawork.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py index 31418348..f00325de 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py @@ -774,7 +774,8 @@ def syn_work_status(self, registered_input_output_maps, all_updates_flushed=True self.logger.debug("syn_work_status(%s): has_to_release_inputs: %s" % (str(self.get_processing_ids()), str(self.has_to_release_inputs()))) self.logger.debug("syn_work_status(%s): to_release_input_contents: %s" % (str(self.get_processing_ids()), str(to_release_input_contents))) - if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents: + # if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents: + if self.is_processings_terminated(): # if not self.is_all_outputs_flushed(registered_input_output_maps): if not all_updates_flushed: self.logger.warn("The work processings %s is terminated. but not all outputs are flushed. Wait to flush the outputs then finish the transform" % str(self.get_processing_ids())) From e0ab3fe719db32844b056fc2c50ca3b8338d79f9 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 24 Mar 2022 18:28:40 +0100 Subject: [PATCH 06/14] fix load workflow metadata --- workflow/lib/idds/workflowv2/workflow.py | 28 +++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index cb6bed5d..24067196 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -227,7 +227,8 @@ def to_dict(self): elif isinstance(w, CompositeCondition): new_w = w.to_dict() elif isinstance(w, Workflow): - new_w = w.to_dict() + # new_w = w.to_dict() + new_w = w.get_internal_id() else: new_w = w new_value.append(new_w) @@ -610,6 +611,8 @@ def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None self.works = {} self.work_sequence = {} # order list + self.next_works = [] + self.terminated_works = [] self.initial_works = [] # if the primary initial_work is not set, it's the first initial work. @@ -796,6 +799,14 @@ def load_works(self): if work.last_updated_at and (not self.last_updated_at or work.last_updated_at > self.last_updated_at): self.last_updated_at = work.last_updated_at + @property + def next_works(self): + return self.get_metadata_item('next_works', []) + + @next_works.setter + def next_works(self, value): + self.add_metadata_item('next_works', value) + @property def conditions(self): return self._conditions @@ -1262,6 +1273,11 @@ def load_parameter_links(self): if p_id in p_metadata: self.parameter_links[p_id].metadata = p_metadata[p_id] + def add_next_work(self, work_id): + next_works = self.next_works + next_works.append(work_id) + self.next_works = next_works + def enable_next_works(self, work, cond): self.log_debug("Checking Work %s condition: %s" % (work.get_internal_id(), json_dumps(cond, sort_keys=True, indent=4))) @@ -1808,6 +1824,9 @@ def __deepcopy__(self, memo): result.logger = logger return result + def get_template_id(self): + return self.template.get_template_id() + @property def metadata(self): run_metadata = {'parent_num_run': self.parent_num_run, @@ -1821,6 +1840,7 @@ def metadata(self): @metadata.setter def metadata(self, value): + self.template.load_metadata() run_metadata = value self.parent_num_run = run_metadata['parent_num_run'] self._num_run = run_metadata['num_run'] @@ -1845,6 +1865,12 @@ def independent_works(self, value): self.runs[str(self.num_run)].independent_works = value self.template.independent_works = value + def add_next_work(self, work_id): + if self.runs: + self.runs[str(self.num_run)].add_next_work(work_id) + else: + raise Exception("There are no runs. It should not have next work") + @property def last_updated_at(self): if self.runs: From 5a82e3b5ed749e5df7890111e5e03fe5c02d511a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 24 Mar 2022 18:35:31 +0100 Subject: [PATCH 07/14] fix workflow load conditions --- workflow/lib/idds/workflowv2/workflow.py | 29 ++++++++++++++++++------ 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index 24067196..6cea63f5 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -248,12 +248,18 @@ def load_conditions(self, works): new_conditions.append(cond) else: if 'idds_method' in cond and 'idds_method_internal_id' in cond: + self.logger.debug("idds_method_internal_id: %s" % cond['idds_method_internal_id']) + self.logger.debug("idds_method: %s" % cond['idds_method']) + internal_id = cond['idds_method_internal_id'] work = self.get_work_from_id(internal_id, works) + + self.logger.debug("get_work_from_id: %s: %s" % (internal_id, str(work))) + if work is not None: new_cond = getattr(work, cond['idds_method']) else: - self.logger.error("Work cannot be found for %s" % (internal_id)) + self.logger.error("Condition method work cannot be found for %s" % (internal_id)) new_cond = cond elif 'idds_attribute' in cond and 'idds_method_internal_id' in cond: internal_id = cond['idds_method_internal_id'] @@ -261,7 +267,7 @@ def load_conditions(self, works): if work is not None: new_cond = getattr(work, cond['idds_attribute']) else: - self.logger.error("Work cannot be found for %s" % (internal_id)) + self.logger.error("Condition attribute work cannot be found for %s" % (internal_id)) new_cond = cond elif 'idds_method' in cond and 'idds_method_condition' in cond: new_cond = cond['idds_method_condition'] @@ -272,37 +278,46 @@ def load_conditions(self, works): self.conditions = new_conditions new_true_works = [] + self.logger.debug("true_works: %s" % str(self.true_works)) + for w in self.true_works: + self.logger.debug("true_work: %s" % str(w)) if isinstance(w, CompositeCondition): # work = w.load_conditions(works, works_template) w.load_conditions(works) work = w elif isinstance(w, Workflow): work = w + elif isinstance(w, Work): + work = w elif type(w) in [str]: work = self.get_work_from_id(w, works) if work is None: - self.logger.error("Work cannot be found for %s" % str(w)) + self.logger.error("True work cannot be found for %s" % str(w)) work = w else: - self.logger.error("Work cannot be found for %s" % str(w)) + self.logger.error("True work cannot be found for type(%s): %s" % (type(w), str(w))) work = w new_true_works.append(work) self.true_works = new_true_works new_false_works = [] for w in self.false_works: - if isinstance(w, CompositeCondition) or isinstance(w, Workflow): + if isinstance(w, CompositeCondition): # work = w.load_condtions(works, works_template) w.load_conditions(works) work = w + elif isinstance(w, Workflow): + work = w + elif isinstance(w, Work): + work = w elif type(w) in [str]: work = self.get_work_from_id(w, works) if work is None: - self.logger.error("Work cannot be found for %s" % str(w)) + self.logger.error("False work cannot be found for type(%s): %s" % (type(w), str(w))) work = w else: - self.logger.error("Work cannot be found for %s" % str(w)) + self.logger.error("False work cannot be found for %s" % str(w)) work = w new_false_works.append(work) self.false_works = new_false_works From 93d85a607ab7d8c3f4aca55ba89586359fbc68bb Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 28 Mar 2022 12:23:09 +0200 Subject: [PATCH 08/14] fix no output collection --- atlas/lib/idds/atlas/workflowv2/atlaspandawork.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py index f00325de..bcd505bb 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py @@ -173,6 +173,17 @@ def parse_task_parameters(self, task_parameters): log_col = {'scope': scope, 'name': name} self.add_log_collections(log_col) + if not self.get_primary_output_collection(): + all_colls = self.get_collections() + if all_colls: + one_coll = all_colls[0] + output_coll_scope = one_coll.scope + else: + output_coll_scope = 'pseudo.scope' + name = 'pseudo_output.' + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S_%f") + str(random.randint(1, 1000)) + output_coll = {'scope': output_coll_scope, 'name': name, 'type': CollectionType.PseudoDataset} + self.set_primary_output_collection(output_coll) + if not self.get_primary_input_collection(): output_colls = self.get_output_collections() output_coll = output_colls[0] From 98b3c0e04468beb95d0485b62d86eaee77bbe65a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 28 Mar 2022 12:24:27 +0200 Subject: [PATCH 09/14] add get_collections --- workflow/lib/idds/workflowv2/work.py | 3 +++ workflow/lib/idds/workflowv2/workflow.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/workflow/lib/idds/workflowv2/work.py b/workflow/lib/idds/workflowv2/work.py index 6614dd98..42de2537 100644 --- a/workflow/lib/idds/workflowv2/work.py +++ b/workflow/lib/idds/workflowv2/work.py @@ -1524,6 +1524,9 @@ def get_output_collections(self): keys = self._other_output_collections return [self.collections[k] for k in keys] + def get_collections(self): + return [self.collections[k] for k in self.collections[k].keys()] + def is_input_collections_closed(self): colls = self.get_input_collections() for coll in colls: diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index 6cea63f5..61731976 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -254,7 +254,7 @@ def load_conditions(self, works): internal_id = cond['idds_method_internal_id'] work = self.get_work_from_id(internal_id, works) - self.logger.debug("get_work_from_id: %s: %s" % (internal_id, str(work))) + self.logger.debug("get_work_from_id: %s: [%s]" % (internal_id, [work])) if work is not None: new_cond = getattr(work, cond['idds_method']) @@ -281,7 +281,7 @@ def load_conditions(self, works): self.logger.debug("true_works: %s" % str(self.true_works)) for w in self.true_works: - self.logger.debug("true_work: %s" % str(w)) + # self.logger.debug("true_work: %s" % str(w)) if isinstance(w, CompositeCondition): # work = w.load_conditions(works, works_template) w.load_conditions(works) From 5db389553a93d3be365cb79afd4d347e3483f050 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 28 Mar 2022 12:25:14 +0200 Subject: [PATCH 10/14] fix dependency lib --- atlas/tools/env/environment.yml | 3 ++- main/lib/idds/tests/core_tests.py | 15 ++++++++++++--- main/lib/idds/tests/test_migrate_requests.py | 5 +++-- monitor/conf.js | 12 ++++++------ 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/atlas/tools/env/environment.yml b/atlas/tools/env/environment.yml index dac6c626..6c242d33 100644 --- a/atlas/tools/env/environment.yml +++ b/atlas/tools/env/environment.yml @@ -9,7 +9,8 @@ dependencies: - flake8 # Wrapper around PyFlakes&pep8 - pytest # python testing tool - nose # nose test tools + - stomp.py - rucio-clients - rucio-clients-atlas - idds-common==0.10.4 - - idds-workflow==0.10.4 \ No newline at end of file + - idds-workflow==0.10.4 diff --git a/main/lib/idds/tests/core_tests.py b/main/lib/idds/tests/core_tests.py index 1b116ad6..0ab3e32c 100644 --- a/main/lib/idds/tests/core_tests.py +++ b/main/lib/idds/tests/core_tests.py @@ -104,7 +104,12 @@ def show_works(req): # 283511, 283517 # reqs = get_requests(request_id=599, with_detail=True, with_metadata=True) -reqs = get_requests(request_id=283511, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=283511, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=298163, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=298557, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=299111, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=299235, with_request=True, with_detail=False, with_metadata=True) +reqs = get_requests(request_id=965, with_request=True, with_detail=False, with_metadata=True) for req in reqs: # print(req['request_id']) # print(rets) @@ -132,7 +137,9 @@ def show_works(req): """ -tfs = get_transforms(request_id=241) +""" +# tfs = get_transforms(request_id=241) +tfs = get_transforms(transform_id=176320) for tf in tfs: # print(tf) # print(tf['transform_metadata']['work'].to_dict()) @@ -140,6 +147,7 @@ def show_works(req): pass sys.exit(0) +""" """ msgs = retrieve_messages(workload_id=25972557) @@ -160,7 +168,8 @@ def show_works(req): sys.exit(0) """ -prs = get_processings(request_id=219) +# prs = get_processings(request_id=219) +prs = get_processings(transform_id=176320) i = 0 for pr in prs: # if pr['request_id'] == 91: diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index 0f0181df..c89c7bad 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -29,9 +29,9 @@ def migrate(): # atlas atlas_host = 'https://aipanda181.cern.ch:443/idds' # noqa F841 - cm1 = ClientManager(host=dev_host) + cm1 = ClientManager(host=atlas_host) # reqs = cm1.get_requests(request_id=290) - old_request_id = 241 + old_request_id = 298163 # for old_request_id in [152]: # for old_request_id in [60]: # noqa E115 # for old_request_id in [200]: # noqa E115 @@ -43,6 +43,7 @@ def migrate(): print("num requests: %s" % len(reqs)) for req in reqs[:1]: + # print(req) req = convert_old_req_2_workflow_req(req) workflow = req['request_metadata']['workflow'] workflow.clean_works() diff --git a/monitor/conf.js b/monitor/conf.js index 2f61fb7e..153ed377 100644 --- a/monitor/conf.js +++ b/monitor/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus733.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus733.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus733.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus733.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus733.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus733.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus713.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus713.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus713.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus713.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus713.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus713.cern.ch:443/idds/monitor/null/null/false/false/true" } From a3aa3c5676b70b4d2ad2728c4b026c59cc523fd3 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 28 Mar 2022 12:28:34 +0200 Subject: [PATCH 11/14] fix flake8 error --- workflow/lib/idds/workflowv2/work.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/lib/idds/workflowv2/work.py b/workflow/lib/idds/workflowv2/work.py index 42de2537..1a3095ff 100644 --- a/workflow/lib/idds/workflowv2/work.py +++ b/workflow/lib/idds/workflowv2/work.py @@ -1525,7 +1525,7 @@ def get_output_collections(self): return [self.collections[k] for k in keys] def get_collections(self): - return [self.collections[k] for k in self.collections[k].keys()] + return [self.collections[k] for k in self.collections.keys()] def is_input_collections_closed(self): colls = self.get_input_collections() From 729bac87d7958abe8205d8a8613a8ff309735fd9 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 28 Mar 2022 13:24:47 +0200 Subject: [PATCH 12/14] fix auth not support error --- common/lib/idds/common/exceptions.py | 10 ++++++++++ main/lib/idds/rest/v1/auth.py | 2 +- monitor/conf.js | 12 ++++++------ 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/common/lib/idds/common/exceptions.py b/common/lib/idds/common/exceptions.py index 9fab3d8a..4c54a629 100644 --- a/common/lib/idds/common/exceptions.py +++ b/common/lib/idds/common/exceptions.py @@ -252,3 +252,13 @@ def __init__(self, *args, **kwargs): super(AuthenticationPending, self).__init__(*args, **kwargs) self._message = "Authentication pending." self.error_code = 601 + + +class AuthenticationNotSupported(IDDSException): + """ + Authentication not supported + """ + def __init__(self, *args, **kwargs): + super(AuthenticationNotSupported, self).__init__(*args, **kwargs) + self._message = "Authentication not supported." + self.error_code = 602 diff --git a/main/lib/idds/rest/v1/auth.py b/main/lib/idds/rest/v1/auth.py index 88f9a7d6..098b5401 100644 --- a/main/lib/idds/rest/v1/auth.py +++ b/main/lib/idds/rest/v1/auth.py @@ -42,7 +42,7 @@ def get(self, vo, auth_type='oidc'): else: raise exceptions.IDDSException("Failed to get oidc sign url: %s" % str(sign_url)) else: - raise exceptions.NotSupportedAuthentication("auth_type %s is not supported." % str(auth_type)) + raise exceptions.AuthenticationNotSupported("auth_type %s is not supported to call this function." % str(auth_type)) except exceptions.NoObject as error: return self.generate_http_response(HTTP_STATUS_CODE.NotFound, exc_cls=error.__class__.__name__, exc_msg=error) except exceptions.IDDSException as error: diff --git a/monitor/conf.js b/monitor/conf.js index 153ed377..74571ae7 100644 --- a/monitor/conf.js +++ b/monitor/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus713.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus713.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus713.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus713.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus713.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus713.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus785.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus785.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus785.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus785.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus785.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus785.cern.ch:443/idds/monitor/null/null/false/false/true" } From 825f2b989c017a652bce729153d05524d0b7dc78 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 28 Mar 2022 13:25:52 +0200 Subject: [PATCH 13/14] new version 0.10.5 --- atlas/lib/idds/atlas/version.py | 2 +- atlas/tools/env/environment.yml | 4 ++-- client/lib/idds/client/version.py | 2 +- client/tools/env/environment.yml | 4 ++-- common/lib/idds/common/version.py | 2 +- doma/lib/idds/doma/version.py | 2 +- doma/tools/env/environment.yml | 4 ++-- main/lib/idds/version.py | 2 +- main/tools/env/environment.yml | 6 +++--- monitor/version.py | 2 +- website/version.py | 2 +- workflow/lib/idds/workflow/version.py | 2 +- workflow/tools/env/environment.yml | 2 +- 13 files changed, 18 insertions(+), 18 deletions(-) diff --git a/atlas/lib/idds/atlas/version.py b/atlas/lib/idds/atlas/version.py index 04cda44c..613d7ae0 100644 --- a/atlas/lib/idds/atlas/version.py +++ b/atlas/lib/idds/atlas/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.4" +release_version = "0.10.5" diff --git a/atlas/tools/env/environment.yml b/atlas/tools/env/environment.yml index 6c242d33..9f741990 100644 --- a/atlas/tools/env/environment.yml +++ b/atlas/tools/env/environment.yml @@ -12,5 +12,5 @@ dependencies: - stomp.py - rucio-clients - rucio-clients-atlas - - idds-common==0.10.4 - - idds-workflow==0.10.4 + - idds-common==0.10.5 + - idds-workflow==0.10.5 \ No newline at end of file diff --git a/client/lib/idds/client/version.py b/client/lib/idds/client/version.py index 04cda44c..613d7ae0 100644 --- a/client/lib/idds/client/version.py +++ b/client/lib/idds/client/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.4" +release_version = "0.10.5" diff --git a/client/tools/env/environment.yml b/client/tools/env/environment.yml index fcea8065..fdcb0c92 100644 --- a/client/tools/env/environment.yml +++ b/client/tools/env/environment.yml @@ -15,5 +15,5 @@ dependencies: - nose # nose test tools - tabulate - argcomplete - - idds-common==0.10.4 - - idds-workflow==0.10.4 + - idds-common==0.10.5 + - idds-workflow==0.10.5 \ No newline at end of file diff --git a/common/lib/idds/common/version.py b/common/lib/idds/common/version.py index 04cda44c..613d7ae0 100644 --- a/common/lib/idds/common/version.py +++ b/common/lib/idds/common/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.4" +release_version = "0.10.5" diff --git a/doma/lib/idds/doma/version.py b/doma/lib/idds/doma/version.py index bd6b26c6..ee9fe789 100644 --- a/doma/lib/idds/doma/version.py +++ b/doma/lib/idds/doma/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2020 - 2021 -release_version = "0.10.4" +release_version = "0.10.5" diff --git a/doma/tools/env/environment.yml b/doma/tools/env/environment.yml index 70198623..a831698b 100644 --- a/doma/tools/env/environment.yml +++ b/doma/tools/env/environment.yml @@ -10,5 +10,5 @@ dependencies: - pytest # python testing tool - nose # nose test tools - panda-client # panda client - - idds-common==0.10.4 - - idds-workflow==0.10.4 \ No newline at end of file + - idds-common==0.10.5 + - idds-workflow==0.10.5 \ No newline at end of file diff --git a/main/lib/idds/version.py b/main/lib/idds/version.py index 04cda44c..613d7ae0 100644 --- a/main/lib/idds/version.py +++ b/main/lib/idds/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.4" +release_version = "0.10.5" diff --git a/main/tools/env/environment.yml b/main/tools/env/environment.yml index 9762da66..29b8c900 100644 --- a/main/tools/env/environment.yml +++ b/main/tools/env/environment.yml @@ -24,6 +24,6 @@ dependencies: - sphinx-rtd-theme # sphinx readthedoc theme - nevergrad # nevergrad hyper parameter optimization - psycopg2-binary - - idds-common==0.10.4 - - idds-workflow==0.10.4 - - idds-client==0.10.4 \ No newline at end of file + - idds-common==0.10.5 + - idds-workflow==0.10.5 + - idds-client==0.10.5 \ No newline at end of file diff --git a/monitor/version.py b/monitor/version.py index 04cda44c..613d7ae0 100644 --- a/monitor/version.py +++ b/monitor/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.4" +release_version = "0.10.5" diff --git a/website/version.py b/website/version.py index 04cda44c..613d7ae0 100644 --- a/website/version.py +++ b/website/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.4" +release_version = "0.10.5" diff --git a/workflow/lib/idds/workflow/version.py b/workflow/lib/idds/workflow/version.py index 04cda44c..613d7ae0 100644 --- a/workflow/lib/idds/workflow/version.py +++ b/workflow/lib/idds/workflow/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.4" +release_version = "0.10.5" diff --git a/workflow/tools/env/environment.yml b/workflow/tools/env/environment.yml index 661a6392..b5f5172b 100644 --- a/workflow/tools/env/environment.yml +++ b/workflow/tools/env/environment.yml @@ -8,4 +8,4 @@ dependencies: - flake8 # Wrapper around PyFlakes&pep8 - pytest # python testing tool - nose # nose test tools - - idds-common==0.10.4 \ No newline at end of file + - idds-common==0.10.5 \ No newline at end of file From be667f1d3f3737c749830d9e5e340d1c2b06cedf Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 28 Mar 2022 13:27:20 +0200 Subject: [PATCH 14/14] new version 0.10.5 --- main/etc/idds/auth/auth.cfg.template | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/main/etc/idds/auth/auth.cfg.template b/main/etc/idds/auth/auth.cfg.template index f1a3b246..d66d6fdd 100644 --- a/main/etc/idds/auth/auth.cfg.template +++ b/main/etc/idds/auth/auth.cfg.template @@ -1,5 +1,12 @@ [common] -allow_vos = panda_dev,Rubin,Rubin:production +allow_vos = atlas,panda_dev,Rubin,Rubin:production + +[atlas] +client_secret = <> +audience = https://pandaserver-doma.cern.ch +client_id = <> +oidc_config_url = https://panda-iam-doma.cern.ch/.well-known/openid-configuration +vo = atlas [panda_dev] client_secret = <>