From dd218fddbd1c7d187bb5cc275b90b5eb7cdedcd9 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 11 Mar 2022 17:16:26 +0100 Subject: [PATCH 1/5] resubmit broken tasks --- common/lib/idds/common/constants.py | 3 +- .../lib/idds/doma/workflowv2/domapandawork.py | 32 +++++++++++++++++-- main/lib/idds/agents/carrier/carrier.py | 3 ++ workflow/lib/idds/workflowv2/work.py | 9 ++++++ 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/common/lib/idds/common/constants.py b/common/lib/idds/common/constants.py index 4b8a6a98..7cfd4972 100644 --- a/common/lib/idds/common/constants.py +++ b/common/lib/idds/common/constants.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2020 +# - Wen Guan, , 2019 - 2022 """ Constants. @@ -314,6 +314,7 @@ class ProcessingStatus(IDDSEnum): TimeOut = 23 ToFinish = 24 ToForceFinish = 25 + Broken = 26 class ProcessingLocking(IDDSEnum): diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 108734a4..35cedc6a 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -76,6 +76,7 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None, # self.logger.setLevel(logging.DEBUG) self.task_name = task_name + self.real_task_name = None self.set_work_name(task_name) self.queue = task_queue self.dep_tasks_id_names_map = {} @@ -483,6 +484,18 @@ def submit_processing(self, processing): if task_id: proc.submitted_at = datetime.datetime.utcnow() + def resubmit_processing(self, processing): + proc = processing['processing_metadata']['processing'] + proc.workload_id = None + task_param = proc.processing_metadata['task_param'] + if self.retry_number > 0: + proc.task_name = self.task_name + "_" + str(self.retry_number) + task_param['taskName'] = proc.task_name + task_id = self.submit_panda_task(processing) + proc.workload_id = task_id + if task_id: + proc.submitted_at = datetime.datetime.utcnow() + def get_panda_task_id(self, processing): from pandatools import Client @@ -497,7 +510,10 @@ def get_panda_task_id(self, processing): task_id = None for req_id in results: task_name = results[req_id]['taskName'] - if proc.workload_id is None and task_name == self.task_name: + local_task_name = proc.task_name + if not local_task_name: + local_task_name = self.task_name + if proc.workload_id is None and task_name == local_task_name: task_id = results[req_id]['jediTaskID'] # processing['processing_metadata']['task_id'] = task_id # processing['processing_metadata']['workload_id'] = task_id @@ -531,9 +547,11 @@ def get_processing_status_from_panda_status(self, task_status): elif task_status in ['finished', 'paused']: # finished, finishing, waiting it to be done processing_status = ProcessingStatus.SubFinished - elif task_status in ['failed', 'aborted', 'broken', 'exhausted']: + elif task_status in ['failed', 'aborted', 'exhausted']: # aborting, tobroken processing_status = ProcessingStatus.Failed + elif task_status in ['broken']: + processing_status = ProcessingStatus.Broken else: # finished, finishing, aborting, topreprocess, preprocessing, tobroken # toretry, toincexec, rerefine, paused, throttled, passed @@ -922,7 +940,15 @@ def poll_panda_task(self, processing=None, input_output_maps=None): self.reactivate_processing(processing) processing_status = ProcessingStatus.Submitted self.retry_number += 1 - + if processing_status in [ProcessingStatus.Broken]: + self.logger.error("poll_panda_task, task_id: %s is broken. retry_number: %s, num_retries: %s" % (str(task_id), self.retry_number, self.num_retries)) + if self.num_retries == 0: + self.num_retries = 1 + if self.retry_number < self.num_retries: + self.retry_number += 1 + self.logger.error("poll_panda_task, task_id: %s is broken. resubmit the task. retry_number: %s, num_retries: %s" % (str(task_id), self.retry_number, self.num_retries)) + self.resubmit_processing(processing) + return ProcessingStatus.Submitting, [] all_jobs_ids = task_info['PandaID'] terminated_jobs, inputname_mapid_map = self.get_job_maps(input_output_maps) diff --git a/main/lib/idds/agents/carrier/carrier.py b/main/lib/idds/agents/carrier/carrier.py index a8604c2a..e868497b 100644 --- a/main/lib/idds/agents/carrier/carrier.py +++ b/main/lib/idds/agents/carrier/carrier.py @@ -404,6 +404,9 @@ def process_running_processing(self, processing): if not processing['submitted_at'] or processing['submitted_at'] < proc.submitted_at: processing_update['parameters']['submitted_at'] = proc.submitted_at + if proc.workload_id: + processing_update['parameters']['workload_id'] = proc.workload_id + processing_update['parameters']['next_poll_at'] = next_poll_at # processing_update['parameters']['expired_at'] = work.get_expired_at(processing) processing_update['parameters']['processing_metadata'] = processing['processing_metadata'] diff --git a/workflow/lib/idds/workflowv2/work.py b/workflow/lib/idds/workflowv2/work.py index b90c6f24..6614dd98 100644 --- a/workflow/lib/idds/workflowv2/work.py +++ b/workflow/lib/idds/workflowv2/work.py @@ -152,6 +152,7 @@ def __init__(self, processing_metadata={}): self.processing = None self.internal_id = str(uuid.uuid4())[:8] + self.task_name = None self.processing_id = None self.workload_id = None self.status = ProcessingStatus.New @@ -368,6 +369,14 @@ def external_id(self): def external_id(self, value): self.add_metadata_item('external_id', value) + @property + def task_name(self): + return self.get_metadata_item('task_name', None) + + @task_name.setter + def task_name(self, value): + self.add_metadata_item('task_name', value) + @property def processing(self): return self._processing From 72850a9b294c7881a164b7effeb1468e0a46745e Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 11 Mar 2022 17:16:39 +0100 Subject: [PATCH 2/5] improve tests --- main/lib/idds/tests/panda_test.py | 6 +++--- main/lib/idds/tests/test_domapanda.py | 2 ++ main/lib/idds/tests/test_domapanda_workflow.py | 2 ++ main/lib/idds/tests/trigger_release.py | 8 ++++---- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index c1e20d4e..f5a759bd 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -6,7 +6,7 @@ os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda' os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda' -from pandatools import Client # noqa E402 +from pandaclient import Client # noqa E402 """ jobids = [1408118] @@ -36,7 +36,7 @@ ret = Client.getTaskStatus(jediTaskID, verbose=False) print(ret) -sys.exit(0) +# sys.exit(0) """ jediTaskID = 998 @@ -104,7 +104,7 @@ # site = newOpts.get('site', None) # excludedSite = newOpts.get('excludedSite', None) # for JEDI -taskIDs = [7056, 7057] +taskIDs = [10624] for taskID in taskIDs: status, out = Client.retryTask(taskID, verbose=True, properErrorCode=True, newParams=newOpts) print(status) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 04a90bf1..bc3c778e 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -140,6 +140,7 @@ def setup_workflow(): output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}], log_collections=[], dependency_map=taskN2.dependencies, task_name=taskN2.name, task_queue=task_queue, + encode_command_line=True, task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", @@ -152,6 +153,7 @@ def setup_workflow(): output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}], log_collections=[], dependency_map=taskN3.dependencies, task_name=taskN3.name, task_queue=task_queue, + encode_command_line=True, task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", diff --git a/main/lib/idds/tests/test_domapanda_workflow.py b/main/lib/idds/tests/test_domapanda_workflow.py index 5c78a3f3..be0b7347 100644 --- a/main/lib/idds/tests/test_domapanda_workflow.py +++ b/main/lib/idds/tests/test_domapanda_workflow.py @@ -136,6 +136,7 @@ def setup_workflow(): output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}], log_collections=[], dependency_map=taskN2.dependencies, task_name=taskN2.name, task_queue=task_queue, + encode_command_line=True, task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", @@ -148,6 +149,7 @@ def setup_workflow(): output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}], log_collections=[], dependency_map=taskN3.dependencies, task_name=taskN3.name, task_queue=task_queue, + encode_command_line=True, task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", diff --git a/main/lib/idds/tests/trigger_release.py b/main/lib/idds/tests/trigger_release.py index b5d74c34..0e900603 100644 --- a/main/lib/idds/tests/trigger_release.py +++ b/main/lib/idds/tests/trigger_release.py @@ -12,7 +12,7 @@ request_ids = [368, 369, 370, 371, 372, 373, 374, 375, 376] -request_ids = [475] +request_ids = [898] for request_id in request_ids: contents = get_contents(request_id=request_id, status=ContentStatus.Available) ret_contents = {} @@ -23,12 +23,12 @@ ret_contents[content['coll_id']].append(content) for ret_content in ret_contents: - print(ret_content) - break + print("coll_id: %s, num_contents: %s" % (ret_content, len(ret_contents[ret_content]))) + # break updated_contents = core_transforms.release_inputs_by_collection(ret_contents) for update_content in updated_contents: print(update_content) # break - update_contents(updated_contents) + # update_contents(updated_contents) From f2ba3bafb669160e8e596e1d2b06943237652b3c Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 14 Mar 2022 18:27:31 +0100 Subject: [PATCH 3/5] fix suspend tasks --- doma/lib/idds/doma/workflowv2/domapandawork.py | 4 ++-- main/lib/idds/tests/trigger_release.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 35cedc6a..977c89a5 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -1079,8 +1079,8 @@ def poll_processing_updates(self, processing, input_output_maps): proc.polling_retries = 0 elif proc.tosuspend: self.logger.info("Suspending processing (processing id: %s, jediTaskId: %s)" % (processing['processing_id'], proc.workload_id)) - self.kill_processing_force(processing) - # self.kill_processing(processing) + # self.kill_processing_force(processing) + self.kill_processing(processing) proc.tosuspend = False proc.polling_retries = 0 elif proc.toresume: diff --git a/main/lib/idds/tests/trigger_release.py b/main/lib/idds/tests/trigger_release.py index 0e900603..0f8a7e8a 100644 --- a/main/lib/idds/tests/trigger_release.py +++ b/main/lib/idds/tests/trigger_release.py @@ -12,7 +12,7 @@ request_ids = [368, 369, 370, 371, 372, 373, 374, 375, 376] -request_ids = [898] +request_ids = [910] for request_id in request_ids: contents = get_contents(request_id=request_id, status=ContentStatus.Available) ret_contents = {} From 79cde054059cabce1b8f0c1bf1953bf266a1f401 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 15 Mar 2022 17:51:36 +0100 Subject: [PATCH 4/5] fix to rename pandatools to pandaclient --- .../lib/idds/atlas/workflow/atlaspandawork.py | 14 ++++++------ .../lib/idds/atlas/workflowv2/atlasdagwork.py | 2 +- .../atlas/workflowv2/atlaslocalpandawork.py | 2 +- .../idds/atlas/workflowv2/atlaspandawork.py | 14 ++++++------ doma/lib/idds/doma/workflow/domapandawork.py | 22 +++++++++---------- .../lib/idds/doma/workflowv2/domapandawork.py | 22 +++++++++---------- main/lib/idds/tests/panda_test.py | 11 +++++++--- main/lib/idds/tests/test_activelearning.py | 4 ++-- main/lib/idds/tests/trigger_release.py | 2 +- 9 files changed, 49 insertions(+), 44 deletions(-) diff --git a/atlas/lib/idds/atlas/workflow/atlaspandawork.py b/atlas/lib/idds/atlas/workflow/atlaspandawork.py index 621a8a76..d79f1352 100644 --- a/atlas/lib/idds/atlas/workflow/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflow/atlaspandawork.py @@ -392,7 +392,7 @@ def create_processing(self, input_output_maps=[]): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_param = proc.processing_metadata['task_param'] @@ -436,7 +436,7 @@ def submit_processing(self, processing): def poll_panda_task_status(self, processing): if 'processing' in processing['processing_metadata']: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] status, task_status = Client.getTaskStatus(proc.workload_id) @@ -468,7 +468,7 @@ def get_processing_status_from_panda_status(self, task_status): return processing_status def get_panda_task_id(self, processing): - from pandatools import Client + from pandaclient import Client start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10) start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') @@ -494,7 +494,7 @@ def get_panda_task_id(self, processing): def poll_panda_task(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: proc = processing['processing_metadata']['processing'] @@ -534,7 +534,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None): def kill_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -547,7 +547,7 @@ def kill_processing(self, processing): def kill_processing_force(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -560,7 +560,7 @@ def kill_processing_force(self, processing): def reactivate_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client # task_id = processing['processing_metadata']['task_id'] proc = processing['processing_metadata']['processing'] task_id = proc.workload_id diff --git a/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py b/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py index a1b2c764..ce36e153 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlasdagwork.py @@ -347,7 +347,7 @@ def create_processing(self, input_output_maps): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client task_param = processing['processing_metadata']['task_param'] return_code = Client.insertTaskParams(task_param, verbose=True) diff --git a/atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py b/atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py index 2bec4ca0..0ef14b6c 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlaslocalpandawork.py @@ -296,7 +296,7 @@ def get_rucio_download_client(self): def poll_panda_task_output(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: output_metadata = {} diff --git a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py index 05a566f4..31418348 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py @@ -454,7 +454,7 @@ def create_processing(self, input_output_maps=[]): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_param = proc.processing_metadata['task_param'] @@ -498,7 +498,7 @@ def submit_processing(self, processing): def poll_panda_task_status(self, processing): if 'processing' in processing['processing_metadata']: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] status, task_status = Client.getTaskStatus(proc.workload_id) @@ -530,7 +530,7 @@ def get_processing_status_from_panda_status(self, task_status): return processing_status def get_panda_task_id(self, processing): - from pandatools import Client + from pandaclient import Client start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10) start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') @@ -556,7 +556,7 @@ def get_panda_task_id(self, processing): def poll_panda_task(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: proc = processing['processing_metadata']['processing'] @@ -596,7 +596,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None): def kill_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -609,7 +609,7 @@ def kill_processing(self, processing): def kill_processing_force(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -622,7 +622,7 @@ def kill_processing_force(self, processing): def reactivate_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client # task_id = processing['processing_metadata']['task_id'] proc = processing['processing_metadata']['processing'] task_id = proc.workload_id diff --git a/doma/lib/idds/doma/workflow/domapandawork.py b/doma/lib/idds/doma/workflow/domapandawork.py index 344298fb..0cf013f9 100644 --- a/doma/lib/idds/doma/workflow/domapandawork.py +++ b/doma/lib/idds/doma/workflow/domapandawork.py @@ -430,7 +430,7 @@ def create_processing(self, input_output_maps=[]): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_param = proc.processing_metadata['task_param'] @@ -462,7 +462,7 @@ def submit_processing(self, processing): proc.submitted_at = datetime.datetime.utcnow() def get_panda_task_id(self, processing): - from pandatools import Client + from pandaclient import Client start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10) start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') @@ -487,7 +487,7 @@ def get_panda_task_id(self, processing): def poll_panda_task_status(self, processing): if 'processing' in processing['processing_metadata']: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] status, task_status = Client.getTaskStatus(proc.workload_id) @@ -641,7 +641,7 @@ def get_update_contents_from_map_id(self, map_id, input_output_maps, job_info): def map_panda_ids(self, unregistered_job_ids, input_output_maps): self.logger.debug("map_panda_ids, unregistered_job_ids[:10]: %s" % str(unregistered_job_ids[:10])) - from pandatools import Client + from pandaclient import Client # updated_map_ids = [] full_update_contents = [] @@ -669,7 +669,7 @@ def map_panda_ids(self, unregistered_job_ids, input_output_maps): def get_status_changed_contents(self, unterminated_job_ids, input_output_maps, panda_id_to_map_ids): self.logger.debug("get_status_changed_contents, unterminated_job_ids[:10]: %s" % str(unterminated_job_ids[:10])) - from pandatools import Client + from pandaclient import Client full_update_contents = [] chunksize = 2000 @@ -698,7 +698,7 @@ def get_final_update_contents(self, input_output_maps): def poll_panda_task_old(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client jobs_ids = None if processing: @@ -769,7 +769,7 @@ def poll_panda_task_old(self, processing=None, input_output_maps=None): def poll_panda_jobs(self, job_ids): job_ids = list(job_ids) self.logger.debug("poll_panda_jobs, poll_panda_jobs_chunk_size: %s, job_ids[:10]: %s" % (self.poll_panda_jobs_chunk_size, str(job_ids[:10]))) - from pandatools import Client + from pandaclient import Client # updated_map_ids = [] inputname_jobid_map = {} @@ -874,7 +874,7 @@ def get_update_contents(self, inputnames, inputname_mapid_map, inputname_jobid_m def poll_panda_task(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: proc = processing['processing_metadata']['processing'] @@ -971,7 +971,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None): def kill_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -984,7 +984,7 @@ def kill_processing(self, processing): def kill_processing_force(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -997,7 +997,7 @@ def kill_processing_force(self, processing): def reactivate_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client # task_id = processing['processing_metadata']['task_id'] proc = processing['processing_metadata']['processing'] task_id = proc.workload_id diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 977c89a5..44242dc2 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -453,7 +453,7 @@ def create_processing(self, input_output_maps=[]): def submit_panda_task(self, processing): try: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_param = proc.processing_metadata['task_param'] @@ -497,7 +497,7 @@ def resubmit_processing(self, processing): proc.submitted_at = datetime.datetime.utcnow() def get_panda_task_id(self, processing): - from pandatools import Client + from pandaclient import Client start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=10) start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') @@ -525,7 +525,7 @@ def get_panda_task_id(self, processing): def poll_panda_task_status(self, processing): if 'processing' in processing['processing_metadata']: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] status, task_status = Client.getTaskStatus(proc.workload_id) @@ -681,7 +681,7 @@ def get_update_contents_from_map_id(self, map_id, input_output_maps, job_info): def map_panda_ids(self, unregistered_job_ids, input_output_maps): self.logger.debug("map_panda_ids, unregistered_job_ids[:10]: %s" % str(unregistered_job_ids[:10])) - from pandatools import Client + from pandaclient import Client # updated_map_ids = [] full_update_contents = [] @@ -709,7 +709,7 @@ def map_panda_ids(self, unregistered_job_ids, input_output_maps): def get_status_changed_contents(self, unterminated_job_ids, input_output_maps, panda_id_to_map_ids): self.logger.debug("get_status_changed_contents, unterminated_job_ids[:10]: %s" % str(unterminated_job_ids[:10])) - from pandatools import Client + from pandaclient import Client full_update_contents = [] chunksize = 2000 @@ -738,7 +738,7 @@ def get_final_update_contents(self, input_output_maps): def poll_panda_task_old(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client jobs_ids = None if processing: @@ -809,7 +809,7 @@ def poll_panda_task_old(self, processing=None, input_output_maps=None): def poll_panda_jobs(self, job_ids): job_ids = list(job_ids) self.logger.debug("poll_panda_jobs, poll_panda_jobs_chunk_size: %s, job_ids[:10]: %s" % (self.poll_panda_jobs_chunk_size, str(job_ids[:10]))) - from pandatools import Client + from pandaclient import Client # updated_map_ids = [] inputname_jobid_map = {} @@ -914,7 +914,7 @@ def get_update_contents(self, inputnames, inputname_mapid_map, inputname_jobid_m def poll_panda_task(self, processing=None, input_output_maps=None): task_id = None try: - from pandatools import Client + from pandaclient import Client if processing: proc = processing['processing_metadata']['processing'] @@ -1019,7 +1019,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None): def kill_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -1032,7 +1032,7 @@ def kill_processing(self, processing): def kill_processing_force(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client proc = processing['processing_metadata']['processing'] task_id = proc.workload_id # task_id = processing['processing_metadata']['task_id'] @@ -1045,7 +1045,7 @@ def kill_processing_force(self, processing): def reactivate_processing(self, processing): try: if processing: - from pandatools import Client + from pandaclient import Client # task_id = processing['processing_metadata']['task_id'] proc = processing['processing_metadata']['processing'] task_id = proc.workload_id diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index f5a759bd..cb605b92 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -29,14 +29,19 @@ print(f.type) """ -jediTaskID = 8378 +jediTaskID = 10607 ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) -ret = Client.getTaskStatus(jediTaskID, verbose=False) +# ret = Client.getTaskStatus(jediTaskID, verbose=False) +# print(ret) + +task_info = ret[1] +jobids = task_info['PandaID'] +ret = Client.getJobStatus(ids=jobids, verbose=False) print(ret) -# sys.exit(0) +sys.exit(0) """ jediTaskID = 998 diff --git a/main/lib/idds/tests/test_activelearning.py b/main/lib/idds/tests/test_activelearning.py index 8ef1b0af..7303eafc 100644 --- a/main/lib/idds/tests/test_activelearning.py +++ b/main/lib/idds/tests/test_activelearning.py @@ -22,7 +22,7 @@ except ImportError: from urllib.parse import quote -from pandatools import Client +from pandaclient import Client # from idds.client.client import Client from idds.client.clientmanager import ClientManager @@ -120,7 +120,7 @@ def test_panda_work(): # print('output_collections: %s' % str(test_work.get_output_collections())) # print(json_dumps(test_work, sort_keys=True, indent=4)) - # from pandatools import Client + # from pandaclient import Client # Client.getJediTaskDetails(taskDict,fullFlag,withTaskInfo,verbose=False) # ret = Client.getJediTaskDetails({'jediTaskID': panda_task_id},False,True) # print(ret) diff --git a/main/lib/idds/tests/trigger_release.py b/main/lib/idds/tests/trigger_release.py index 0f8a7e8a..368c6bfd 100644 --- a/main/lib/idds/tests/trigger_release.py +++ b/main/lib/idds/tests/trigger_release.py @@ -12,7 +12,7 @@ request_ids = [368, 369, 370, 371, 372, 373, 374, 375, 376] -request_ids = [910] +request_ids = [902] for request_id in request_ids: contents = get_contents(request_id=request_id, status=ContentStatus.Available) ret_contents = {} From e162acf77e2f96312c4033c66282fc710a1b5dee Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 15 Mar 2022 22:21:51 +0100 Subject: [PATCH 5/5] new version 0.10.4 --- atlas/lib/idds/atlas/version.py | 2 +- atlas/tools/env/environment.yml | 4 ++-- client/lib/idds/client/version.py | 2 +- client/tools/env/environment.yml | 4 ++-- common/lib/idds/common/version.py | 2 +- doma/lib/idds/doma/version.py | 2 +- doma/tools/env/environment.yml | 4 ++-- main/lib/idds/tests/panda_test.py | 2 +- main/lib/idds/version.py | 2 +- main/tools/env/environment.yml | 6 +++--- monitor/version.py | 2 +- website/version.py | 2 +- workflow/lib/idds/workflow/version.py | 2 +- workflow/tools/env/environment.yml | 2 +- 14 files changed, 19 insertions(+), 19 deletions(-) diff --git a/atlas/lib/idds/atlas/version.py b/atlas/lib/idds/atlas/version.py index 31e441df..04cda44c 100644 --- a/atlas/lib/idds/atlas/version.py +++ b/atlas/lib/idds/atlas/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.4" diff --git a/atlas/tools/env/environment.yml b/atlas/tools/env/environment.yml index 995a61d0..dac6c626 100644 --- a/atlas/tools/env/environment.yml +++ b/atlas/tools/env/environment.yml @@ -11,5 +11,5 @@ dependencies: - nose # nose test tools - rucio-clients - rucio-clients-atlas - - idds-common==0.10.3 - - idds-workflow==0.10.3 \ No newline at end of file + - idds-common==0.10.4 + - idds-workflow==0.10.4 \ No newline at end of file diff --git a/client/lib/idds/client/version.py b/client/lib/idds/client/version.py index 31e441df..04cda44c 100644 --- a/client/lib/idds/client/version.py +++ b/client/lib/idds/client/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.4" diff --git a/client/tools/env/environment.yml b/client/tools/env/environment.yml index d161a7e2..a72d102b 100644 --- a/client/tools/env/environment.yml +++ b/client/tools/env/environment.yml @@ -14,5 +14,5 @@ dependencies: - pytest # python testing tool - nose # nose test tools - tabulate - - idds-common==0.10.3 - - idds-workflow==0.10.3 \ No newline at end of file + - idds-common==0.10.4 + - idds-workflow==0.10.4 \ No newline at end of file diff --git a/common/lib/idds/common/version.py b/common/lib/idds/common/version.py index 31e441df..04cda44c 100644 --- a/common/lib/idds/common/version.py +++ b/common/lib/idds/common/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.4" diff --git a/doma/lib/idds/doma/version.py b/doma/lib/idds/doma/version.py index c62c8f4f..bd6b26c6 100644 --- a/doma/lib/idds/doma/version.py +++ b/doma/lib/idds/doma/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2020 - 2021 -release_version = "0.10.3" +release_version = "0.10.4" diff --git a/doma/tools/env/environment.yml b/doma/tools/env/environment.yml index e8cd026f..70198623 100644 --- a/doma/tools/env/environment.yml +++ b/doma/tools/env/environment.yml @@ -10,5 +10,5 @@ dependencies: - pytest # python testing tool - nose # nose test tools - panda-client # panda client - - idds-common==0.10.3 - - idds-workflow==0.10.3 \ No newline at end of file + - idds-common==0.10.4 + - idds-workflow==0.10.4 \ No newline at end of file diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index cb605b92..7732c68c 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -29,7 +29,7 @@ print(f.type) """ -jediTaskID = 10607 +jediTaskID = 10517 # 10607 ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) diff --git a/main/lib/idds/version.py b/main/lib/idds/version.py index 31e441df..04cda44c 100644 --- a/main/lib/idds/version.py +++ b/main/lib/idds/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.4" diff --git a/main/tools/env/environment.yml b/main/tools/env/environment.yml index 6b376940..9762da66 100644 --- a/main/tools/env/environment.yml +++ b/main/tools/env/environment.yml @@ -24,6 +24,6 @@ dependencies: - sphinx-rtd-theme # sphinx readthedoc theme - nevergrad # nevergrad hyper parameter optimization - psycopg2-binary - - idds-common==0.10.3 - - idds-workflow==0.10.3 - - idds-client==0.10.3 \ No newline at end of file + - idds-common==0.10.4 + - idds-workflow==0.10.4 + - idds-client==0.10.4 \ No newline at end of file diff --git a/monitor/version.py b/monitor/version.py index 31e441df..04cda44c 100644 --- a/monitor/version.py +++ b/monitor/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.4" diff --git a/website/version.py b/website/version.py index 31e441df..04cda44c 100644 --- a/website/version.py +++ b/website/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.4" diff --git a/workflow/lib/idds/workflow/version.py b/workflow/lib/idds/workflow/version.py index 31e441df..04cda44c 100644 --- a/workflow/lib/idds/workflow/version.py +++ b/workflow/lib/idds/workflow/version.py @@ -9,4 +9,4 @@ # - Wen Guan, , 2019 - 2021 -release_version = "0.10.3" +release_version = "0.10.4" diff --git a/workflow/tools/env/environment.yml b/workflow/tools/env/environment.yml index a4792d34..661a6392 100644 --- a/workflow/tools/env/environment.yml +++ b/workflow/tools/env/environment.yml @@ -8,4 +8,4 @@ dependencies: - flake8 # Wrapper around PyFlakes&pep8 - pytest # python testing tool - nose # nose test tools - - idds-common==0.10.3 \ No newline at end of file + - idds-common==0.10.4 \ No newline at end of file