diff --git a/client/lib/idds/client/base.py b/client/lib/idds/client/base.py index 0e563c9f..90f63268 100644 --- a/client/lib/idds/client/base.py +++ b/client/lib/idds/client/base.py @@ -16,7 +16,10 @@ import logging import os +import random import requests +import time + try: # Python 2 from urllib import urlencode, quote @@ -48,7 +51,7 @@ def __init__(self, host=None, auth=None, timeout=None, client_proxy=None): self.client_proxy = client_proxy self.timeout = timeout self.session = requests.session() - self.retries = 2 + self.retries = 3 self.auth_type = None self.oidc_token_file = None @@ -241,6 +244,9 @@ def get_request_response(self, url, type='GET', data=None, headers=None, auth_se logging.warning('ConnectionError: ' + str(error)) if retry >= self.retries - 1: raise exceptions.ConnectionException('ConnectionError: ' + str(error)) + else: + random_sleep = random.uniform(0, 30) + time.sleep(random_sleep) if result is not None: if return_result_directly: diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 0ec89a16..6cdaf01d 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -142,6 +142,8 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None, self.dependency_map = {} self.dependency_map_deleted = [] + self.additional_task_parameters = {} + def my_condition(self): if self.is_finished(): return True @@ -335,6 +337,15 @@ def set_agent_attributes(self, attrs, req_attributes=None): self.num_retries = int(self.agent_attributes['num_retries']) if 'poll_panda_jobs_chunk_size' in self.agent_attributes and self.agent_attributes['poll_panda_jobs_chunk_size']: self.poll_panda_jobs_chunk_size = int(self.agent_attributes['poll_panda_jobs_chunk_size']) + if 'additional_task_parameters' in self.agent_attributes and self.agent_attributes['additional_task_parameters']: + if not self.additional_task_parameters: + self.additional_task_parameters = {} + try: + for key, value in self.agent_attributes['additional_task_parameters'].items(): + if key not in self.additional_task_parameters: + self.additional_task_parameters[key] = value + except Exception as ex: + self.logger.warn(f"Failed to set additional_task_parameters: {ex}") def depend_on(self, work): self.logger.debug("checking depending on") @@ -765,6 +776,14 @@ def create_processing(self, input_output_maps=[]): task_param_map['reqID'] = self.get_request_id() + if self.additional_task_parameters: + try: + for key, value in self.additional_task_parameters.items(): + if key not in task_param_map: + task_param_map[key] = value + except Exception as ex: + self.logger.warn(f"failed to set task parameter map with additional_task_parameters: {ex}") + processing_metadata = {'task_param': task_param_map} proc = Processing(processing_metadata=processing_metadata) proc.workload_id = None diff --git a/main/config_default/logrotate_daemon b/main/config_default/logrotate_daemon index 668f7dd9..1d4cf4b4 100755 --- a/main/config_default/logrotate_daemon +++ b/main/config_default/logrotate_daemon @@ -3,4 +3,12 @@ # while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds >> /var/log/idds/logrotate.log 2>&1; sleep 3600; done -while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds; sleep 86400; done +# one week +while true; do + /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds + sleep 604800 + + # random sleep to avoid all severs restart at the same time + RANDOM_SLEEP=$((RANDOM % 3600)) + sleep $RANDOM_SLEEP +done diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index e65085df..0d13308a 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -691,14 +691,14 @@ class Content_ext(BASE, ModelBase): req_id = Column(BigInteger()) jedi_task_id = Column(BigInteger()) actual_core_count = Column(Integer()) - max_rss = Column(Integer()) - max_vmem = Column(Integer()) - max_swap = Column(Integer()) - max_pss = Column(Integer()) - avg_rss = Column(Integer()) - avg_vmem = Column(Integer()) - avg_swap = Column(Integer()) - avg_pss = Column(Integer()) + max_rss = Column(BigInteger()) + max_vmem = Column(BigInteger()) + max_swap = Column(BigInteger()) + max_pss = Column(BigInteger()) + avg_rss = Column(BigInteger()) + avg_vmem = Column(BigInteger()) + avg_swap = Column(BigInteger()) + avg_pss = Column(BigInteger()) max_walltime = Column(Integer()) disk_io = Column(Integer()) failed_attempt = Column(Integer()) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 1d2285a1..9944a9ed 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -14,11 +14,11 @@ # os.environ['PANDA_URL_SSL'] = 'https://panda-doma-k8s-panda.cern.ch/server/panda' # os.environ['PANDA_URL'] = 'http://panda-doma-k8s-panda.cern.ch:25080/server/panda' -# os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' -# os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' +os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' +os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' -os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' -os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' +# os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' +# os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' from pandaclient import Client # noqa E402 @@ -85,6 +85,7 @@ task_ids = [476, 477, 478] task_ids = [937, 938, 940, 941] task_ids = [124, 619] +task_ids = [22707, 22708, 22709, 22710, 23211, 23212, 22155, 22158] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 370c53e0..10c357b2 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -90,7 +90,7 @@ task_queue5 = 'SLAC_Rubin_IO' # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores' # task_queue = 'SLAC_Rubin_Merge' - # task_queue = 'SLAC_TEST' + task_queue2 = 'SLAC_TEST' # task_queue4 = task_queue3 = task_queue2 = task_queue1 = task_queue # task_cloud = None diff --git a/main/lib/idds/tests/test_domapanda_big.py b/main/lib/idds/tests/test_domapanda_big.py index c6d87723..feb568e4 100644 --- a/main/lib/idds/tests/test_domapanda_big.py +++ b/main/lib/idds/tests/test_domapanda_big.py @@ -45,8 +45,9 @@ task_queue = 'CC-IN2P3_Rubin' task_queue1 = 'CC-IN2P3_Rubin_Medium' task_queue2 = 'CC-IN2P3_Rubin_Himem' - task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem' - task_queue4 = 'CC-IN2P3_Rubin_Merge' + task_queue3 = 'CC-IN2P3_Rubin_Big_Himem' + task_queue4 = 'CC-IN2P3_Rubin_Extra_Himem' + task_queue5 = 'CC-IN2P3_Rubin_Merge' elif len(sys.argv) > 1 and sys.argv[1] == "lancs": site = 'lancs' task_cloud = 'EU' @@ -54,9 +55,10 @@ task_queue = 'LANCS_Rubin' task_queue1 = 'LANCS_Rubin_Medium' task_queue2 = 'LANCS_Rubin_Himem' - task_queue3 = 'LANCS_Rubin_Extra_Himem' - task_queue3 = 'LANCS_Rubin_Himem' - task_queue4 = 'LANCS_Rubin_Merge' + task_queue3 = 'LANCS_Rubin_Big_Himem' + task_queue4 = 'LANCS_Rubin_Extra_Himem' + # task_queue3 = 'LANCS_Rubin_Himem' + task_queue5 = 'LANCS_Rubin_Merge' elif len(sys.argv) > 1 and sys.argv[1] == "ral": site = 'RAL' task_cloud = 'EU' @@ -64,7 +66,7 @@ task_queue = 'RAL_Rubin' task_queue1 = 'RAL_Rubin_Medium' task_queue2 = 'RAL_Rubin_Himem' - task_queue3 = 'RAL_Rubin_Extra_Himem' + task_queue3 = 'RAL_Rubin_Big_Himem' # task_queue3 = 'RAL_Rubin_Himem' task_queue4 = 'RAL_Rubin_Merge' # task_queue5 = 'RAL_Rubin_IO' @@ -81,8 +83,9 @@ task_queue = 'SLAC_Rubin' task_queue1 = 'SLAC_Rubin_Medium' task_queue2 = 'SLAC_Rubin_Himem' - task_queue3 = 'SLAC_Rubin_Extra_Himem' - task_queue4 = 'SLAC_Rubin_Merge' + task_queue3 = 'SLAC_Rubin_Big_Himem' + task_queue4 = 'SLAC_Rubin_Extra_Himem' + task_queue5 = 'SLAC_Rubin_Merge' # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores' # task_queue = 'SLAC_Rubin_Merge' # task_queue = 'SLAC_TEST' @@ -185,7 +188,7 @@ def setup_workflow(): taskN4.dependencies = [ {"name": "00004" + str(k), "dependencies": [], - "submitted": False} for k in range(100) + "submitted": False} for k in range(10000) ] taskN5 = PanDATask() @@ -197,6 +200,15 @@ def setup_workflow(): "submitted": False} for k in range(100) ] + taskN6 = PanDATask() + taskN6.step = "step6" + taskN6.name = site + "_" + taskN5.step + "_" + randStr() + taskN6.dependencies = [ + {"name": "00005" + str(k), + "dependencies": [], + "submitted": False} for k in range(100) + ] + work1 = DomaPanDAWork(executable='echo; sleep 180', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], @@ -212,7 +224,7 @@ def setup_workflow(): "type": "template", "value": "log.tgz"}, task_cloud=task_cloud) - work2 = DomaPanDAWork(executable='echo; sleep 180', + work2 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}], log_collections=[], dependency_map=taskN2.dependencies, @@ -227,7 +239,7 @@ def setup_workflow(): "type": "template", "value": "log.tgz"}, task_cloud=task_cloud) - work3 = DomaPanDAWork(executable='echo; sleep 180', + work3 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}], log_collections=[], dependency_map=taskN3.dependencies, @@ -243,7 +255,7 @@ def setup_workflow(): "value": "log.tgz"}, task_cloud=task_cloud) - work4 = DomaPanDAWork(executable='echo; sleep 180', + work4 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], log_collections=[], dependency_map=taskN4.dependencies, @@ -259,7 +271,7 @@ def setup_workflow(): "value": "log.tgz"}, task_cloud=task_cloud) - work5 = DomaPanDAWork(executable='echo; sleep 180', + work5 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], log_collections=[], dependency_map=taskN5.dependencies, @@ -275,14 +287,33 @@ def setup_workflow(): "value": "log.tgz"}, task_cloud=task_cloud) + work6 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841 + primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, + output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], + log_collections=[], dependency_map=taskN6.dependencies, + task_name=taskN6.name, task_queue=task_queue5, + encode_command_line=True, + task_priority=981, + prodSourceLabel='managed', + task_log={"dataset": "PandaJob_#{pandaid}/", + "destination": "local", + "param_type": "log", + "token": "local", + "type": "template", + "value": "log.tgz"}, + task_cloud=task_cloud) + pending_time = 12 # pending_time = None workflow = Workflow(pending_time=pending_time) workflow.add_work(work1) + """ workflow.add_work(work2) workflow.add_work(work3) workflow.add_work(work4) workflow.add_work(work5) + workflow.add_work(work6) + """ workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time() return workflow diff --git a/main/lib/idds/tests/test_get_request_info_panda.py b/main/lib/idds/tests/test_get_request_info_panda.py index c8a942d4..fe72f50e 100644 --- a/main/lib/idds/tests/test_get_request_info_panda.py +++ b/main/lib/idds/tests/test_get_request_info_panda.py @@ -1,3 +1,4 @@ +import json # from idds.common.constants import RequestStatus # noqa F401 # from idds.common.utils import json_loads # noqa F401 @@ -8,7 +9,7 @@ idds_client = pandaclient.idds_api.get_api(idds_utils.json_dumps, idds_host=None, compress=True, manager=True) # wms_workflow_id = 4112 -wms_workflow_id = 5194 +wms_workflow_id = 2154 # only check the request status ret = idds_client.get_requests(request_id=wms_workflow_id) print(ret) @@ -20,19 +21,18 @@ print(ret) workloads = [] +transforms = [] for workload in ret[1][1]: workloads.append(workload['transform_workload_id']) + transforms.append(workload['transform_id']) print(workloads) +print(transforms) -# show one workload file information -workload_0 = workloads[0] -ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_0) -print(ret) - -workload_1 = workloads[1] -ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_1) -print(ret) +for transform_id in transforms: + ret = idds_client.get_transform(request_id=wms_workflow_id, transform_id=transform_id) + print(json.dumps(ret, indent=4, sort_keys=True)) -workload_2 = workloads[2] -ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_2) -print(ret) +# show one workload file information +for workload_id in workloads: + ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_id) + print(ret) diff --git a/monitor/data/conf.js b/monitor/data/conf.js index d8ee0c03..0856aecf 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus947.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus947.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus947.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus947.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus947.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus947.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus956.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus956.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus956.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus956.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus956.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus956.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/bin/run_workflow b/workflow/bin/run_workflow index 7663b907..8e667bf3 100644 --- a/workflow/bin/run_workflow +++ b/workflow/bin/run_workflow @@ -91,6 +91,8 @@ def run_workflow(name, context, original_args, current_job_kwargs): with workflow: ret = workflow.run() logging.info("run workflow result: %s" % str(ret)) + if not ret: + return -1 return 0 @@ -113,6 +115,8 @@ def run_work(name, context, original_args, current_job_kwargs): logging.info("work: %s" % work) ret = work.run() logging.info("run work result: %s" % str(ret)) + if not ret: + return -1 return 0 diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index c8cbed0b..b1be2b9d 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -47,7 +47,7 @@ def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=N self._priority = 500 self._core_count = 1 - self._total_memory = 1000 # MB + self._total_memory = None # MB self._max_walltime = 7 * 24 * 3600 self._max_attempt = 5 @@ -1005,7 +1005,7 @@ def run(self): except: logging.error("Unknow error") logging.error(traceback.format_exc()) - logging.info("finish work run().") + logging.info(f"finish work run() with ret: {ret}") return ret def run_local(self): @@ -1059,7 +1059,7 @@ def run_local(self): else: self._results = MapResult() self._results.add_result(name=self.get_func_name(), args=current_job_kwargs, result=ret_output) - return self._results + return ret_status else: if not multi_jobs_kwargs_list: ret_status, rets, ret_err = self.run_func(self._func, pre_kwargs, args, kwargs) @@ -1068,7 +1068,7 @@ def run_local(self): else: self._results = MapResult() self._results.add_result(name=self.get_func_name(), args=kwargs, result=rets) - return self._results + return ret_status else: if not self.map_results: self._results = [] @@ -1096,7 +1096,7 @@ def run_local(self): ret_status, rets, ret_error = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy) self._results.add_result(name=self.get_func_name(), args=one_job_kwargs, result=rets) - return self._results + return ret_status def get_run_command(self): cmd = "run_workflow --type work --name %s " % self.name diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 047ae8ec..16d49f20 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -583,6 +583,7 @@ def download_source_files_from_panda(self, filename): logging.info(f"Extract {full_output_filename} to {target_dir}") os.remove(full_output_filename) logging.info("Remove %s" % full_output_filename) + return target_dir def setup_panda(self): """ @@ -618,7 +619,8 @@ def setup_panda_source_files(self): :returns command: `str` to setup the workflow. """ if self.remote_source_file: - self.download_source_files_from_panda(self.remote_source_file) + target_dir = self.download_source_files_from_panda(self.remote_source_file) + self._source_dir = target_dir return None def setup_idds_source_files(self): @@ -1226,7 +1228,7 @@ def pre_run(self): return False def run(self): - logging.info("Start work run().") + logging.info("Start workflow run().") ret = None try: ret = self.run_local() @@ -1236,7 +1238,7 @@ def run(self): except: logging.error("Unknow error") logging.error(traceback.format_exc()) - logging.info("finish work run().") + logging.info(f"finish workflow run() with ret: {ret}.") return ret def run_local(self): @@ -1269,7 +1271,7 @@ def run_local(self): logging.info(f"run workflow successfully. output: {output}, error: {error}") else: logging.error(f"run workflow failed. output: {output}, error: {error}") - return output + return status # Context Manager ----------------------------------------------- def __enter__(self): @@ -1322,12 +1324,14 @@ def get_func_name(self): def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False, source_dir_parent_level=None, exclude_source_files=[], enable_separate_log=False, clean_env=None, + core_count=1, total_memory=4000, # MB container_options=None): if func is None: return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps, return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level, exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log, + core_count=core_count, total_memory=total_memory, container_options=container_options) if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ: @@ -1344,6 +1348,8 @@ def wrapper(*args, **kwargs): f.queue = queue f.site = site f.cloud = cloud + f.core_count = core_count + f.total_memory = total_memory logging.info("return_workflow %s" % return_workflow) if return_workflow: