From 3c7605d10f89e8db64ec71804fd8e6f07bbeb2fc Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 23 May 2024 13:25:28 +0200 Subject: [PATCH] optimize several log level from error to warning --- main/etc/idds/idds.cfg.template | 1 + main/lib/idds/agents/carrier/poller.py | 2 +- main/lib/idds/agents/carrier/submitter.py | 2 +- main/lib/idds/agents/carrier/trigger.py | 2 +- main/lib/idds/agents/clerk/clerk.py | 6 +- .../idds/agents/transformer/transformer.py | 2 +- main/lib/idds/tests/panda_test.py | 11 ++++ main/lib/idds/tests/test_domapanda.py | 62 ++++++++++++++++++- main/lib/idds/tests/test_get_source_code.py | 5 ++ main/tools/env/setup_panda.sh | 11 +++- monitor/data/conf.js | 12 ++-- workflow/bin/run_workflow | 24 ++++--- workflow/tools/make/zipheader | 2 + 13 files changed, 116 insertions(+), 26 deletions(-) diff --git a/main/etc/idds/idds.cfg.template b/main/etc/idds/idds.cfg.template index f516f1b9..93d89718 100755 --- a/main/etc/idds/idds.cfg.template +++ b/main/etc/idds/idds.cfg.template @@ -27,6 +27,7 @@ loglevel = DEBUG # dev: aipanda104 # doma: aipanda105-107 # idds-mon: aipanda108 + [database] #default = mysql://idds:idds@pcuwvirt5.cern.ch/idds #default = mysql://idds:idds_passwd@aipanda182.cern.ch/idds diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 0b9e8a46..86001cba 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -538,7 +538,7 @@ def process_update_processing(self, event): pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True) if not pr: - self.logger.error("Cannot find processing for event: %s" % str(event)) + self.logger.warn("Cannot find processing for event: %s" % str(event)) pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(pr) diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 782f3b09..b6ff6970 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -261,7 +261,7 @@ def process_new_processing(self, event): self.logger.info("process_new_processing, event: %s" % str(event)) pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True) if not pr: - self.logger.error("Cannot find processing for event: %s" % str(event)) + self.logger.warn("Cannot find processing for event: %s" % str(event)) else: log_pre = self.get_log_prefix(pr) self.logger.info(log_pre + "process_new_processing") diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 6adbd37f..8e37e9a1 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -196,7 +196,7 @@ def process_trigger_processing_real(self, event): self.logger.info("process_trigger_processing, event: %s" % str(event)) pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True) if not pr: - self.logger.error("Cannot find processing for event: %s" % str(event)) + self.logger.warn("Cannot find processing for event: %s" % str(event)) pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(pr) diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 9b42cea0..63d73cf0 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -1193,7 +1193,7 @@ def handle_update_irequest_real(self, req, event): failed_tfs += 1 req_status = RequestStatus.Transforming - if req['request_type'] in [RequestType.iWorkflowLocal] and total_tfs == 0: + if req['request_type'] in [RequestType.iWorkflowLocal]: workflow = req['request_metadata'].get('workflow', None) if workflow and req['created_at'] + datetime.timedelta(seconds=workflow.max_walltime) < datetime.datetime.utcnow(): req_status = RequestStatus.Finished @@ -1369,7 +1369,7 @@ def process_abort_request(self, event): if event: req = self.get_request(request_id=event._request_id, locking=True) if not req: - self.logger.error("Cannot find request for event: %s" % str(event)) + self.logger.warn("Cannot find request for event: %s" % str(event)) pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(req) @@ -1517,7 +1517,7 @@ def process_close_request(self, event): if event: req = self.get_request(request_id=event._request_id, locking=True) if not req: - self.logger.error("Cannot find request for event: %s" % str(event)) + self.logger.warn("Cannot find request for event: %s" % str(event)) pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(req) diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index 62c7f4f6..ff8c00c2 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -496,7 +496,7 @@ def process_new_transform(self, event): tf_status = [TransformStatus.New, TransformStatus.Ready, TransformStatus.Extend] tf = self.get_transform(transform_id=event._transform_id, status=tf_status, locking=True) if not tf: - self.logger.error("Cannot find transform for event: %s" % str(event)) + self.logger.warn("Cannot find transform for event: %s" % str(event)) else: log_pre = self.get_log_prefix(tf) self.logger.info(log_pre + "process_new_transform") diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 1e51409c..7c3233b7 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -55,6 +55,17 @@ task_ids = [168747, 168761, 168763] task_ids = [13413] task_ids = [168859, 168861, 168862] +task_ids = [i for i in range(9021, 9222)] +task_ids = [i for i in range(169155, 169178)] +task_ids = [169182, 169183, 169184] + +task_ids = [5975, 8442, 10741, 10742, 10744, 10745, 10746, 10747] +task_ids = [15507, 15516, 15520, 15526, 15534, 15535, 15539, 15679, 15715] +task_ids = [169181, 169198, 169199, 169201, 169206] + [i for i in range(169210, 169232)] +task_ids = [169236, 169237, 169238, 169239, 169240, 169241] +task_ids = [169272, 169273, 169312, 169313] +task_ids = [169307, 169308, 169309, 169310, 169311, 169312, 169313, 169314] +task_ids = [i for i in range(169359, 169361)] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 004cd3d2..2e97d848 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -47,6 +47,8 @@ task_queue2 = 'CC-IN2P3_Rubin_Himem' task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem' task_queue4 = 'CC-IN2P3_Rubin_Merge' + # task_queue5 = 'CC-IN2P3_Rubin_IO' + task_queue5 = 'CC-IN2P3_Rubin_Extra_Himem' elif len(sys.argv) > 1 and sys.argv[1] == "lancs": site = 'lancs' task_cloud = 'EU' @@ -55,8 +57,10 @@ task_queue1 = 'LANCS_Rubin_Medium' task_queue2 = 'LANCS_Rubin_Himem' task_queue3 = 'LANCS_Rubin_Extra_Himem' - task_queue3 = 'LANCS_Rubin_Himem' + # task_queue3 = 'LANCS_Rubin_Himem' task_queue4 = 'LANCS_Rubin_Merge' + # task_queue5 = 'LANCS_Rubin_IO' + task_queue5 = 'LANCS_Rubin_Extra_Himem' else: site = 'slac' # task_cloud = 'LSST' @@ -71,6 +75,7 @@ task_queue2 = 'SLAC_Rubin_Himem' task_queue3 = 'SLAC_Rubin_Extra_Himem' task_queue4 = 'SLAC_Rubin_Merge' + task_queue5 = 'SLAC_Rubin_IO' # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores' # task_queue = 'SLAC_Rubin_Merge' # task_queue = 'SLAC_TEST' @@ -187,6 +192,26 @@ def setup_workflow(): "submitted": False} for k in range(6) ] + taskN6 = PanDATask() + taskN6.step = "step6" + taskN6.name = site + "_" + taskN6.step + "_" + randStr() + taskN6.dependencies = [ + {"name": "00006" + str(k), + "order_id": k, + "dependencies": [], + "submitted": False} for k in range(6) + ] + + taskN7 = PanDATask() + taskN7.step = "step7" + taskN7.name = site + "_" + taskN7.step + "_" + randStr() + taskN7.dependencies = [ + {"name": "00007" + str(k), + "order_id": k, + "dependencies": [], + "submitted": False} for k in range(6) + ] + work1 = DomaPanDAWork(executable='echo', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], @@ -265,6 +290,39 @@ def setup_workflow(): "value": "log.tgz"}, task_cloud=task_cloud) + work6 = DomaPanDAWork(executable='echo', + primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, + output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], + log_collections=[], dependency_map=taskN6.dependencies, + task_name=taskN6.name, task_queue=task_queue5, + encode_command_line=True, + task_priority=981, + prodSourceLabel='managed', + task_log={"dataset": "PandaJob_#{pandaid}/", + "destination": "local", + "param_type": "log", + "token": "local", + "type": "template", + "value": "log.tgz"}, + task_cloud=task_cloud) + + work7 = DomaPanDAWork(executable='echo', + primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, + output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], + log_collections=[], dependency_map=taskN7.dependencies, + task_name=taskN7.name, task_queue=task_queue3, + encode_command_line=True, + task_priority=981, + core_count=2, + prodSourceLabel='managed', + task_log={"dataset": "PandaJob_#{pandaid}/", + "destination": "local", + "param_type": "log", + "token": "local", + "type": "template", + "value": "log.tgz"}, + task_cloud=task_cloud) + pending_time = 12 # pending_time = None workflow = Workflow(pending_time=pending_time) @@ -273,6 +331,8 @@ def setup_workflow(): workflow.add_work(work3) workflow.add_work(work4) workflow.add_work(work5) + workflow.add_work(work6) + workflow.add_work(work7) workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time() return workflow diff --git a/main/lib/idds/tests/test_get_source_code.py b/main/lib/idds/tests/test_get_source_code.py index d2de7890..772b09e2 100644 --- a/main/lib/idds/tests/test_get_source_code.py +++ b/main/lib/idds/tests/test_get_source_code.py @@ -17,3 +17,8 @@ def foo(arg1, arg2): print(inspect.signature(foo)) print(dill.dumps(foo)) + +foo_str = dill.dumps(foo) +foo_1 = dill.loads(foo_str) +ret = foo_1(1, 3) +print(ret) diff --git a/main/tools/env/setup_panda.sh b/main/tools/env/setup_panda.sh index 7d7bc097..be983c95 100644 --- a/main/tools/env/setup_panda.sh +++ b/main/tools/env/setup_panda.sh @@ -27,7 +27,7 @@ if [ "$instance" == "k8s" ]; then export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ -elif [ "$instance" == "slac" ]; then +elif [ "$instance" == "usdf_dev" ]; then export PANDA_AUTH=oidc export PANDA_BEHIND_REAL_LB=true export PANDA_VERIFY_HOST=off @@ -36,11 +36,15 @@ elif [ "$instance" == "slac" ]; then export PANDAMON_URL=https://rubin-panda-bigmon-dev.slac.stanford.edu export PANDA_AUTH_VO=Rubin + # export PANDA_AUTH_VO=Rubin.production + export PANDACACHE_URL=$PANDA_URL_SSL export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ + + export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_rubin_dev elif [ "$instance" == "usdf" ]; then export PANDA_AUTH=oidc export PANDA_BEHIND_REAL_LB=true @@ -49,13 +53,14 @@ elif [ "$instance" == "usdf" ]; then export PANDA_URL=https://usdf-panda-server.slac.stanford.edu:8443/server/panda export PANDACACHE_URL=$PANDA_URL_SSL export PANDAMON_URL=https://usdf-panda-bigmon.slac.stanford.edu:8443/ - export PANDA_AUTH_VO=Rubin:production + export PANDA_AUTH_VO=Rubin.production export PANDACACHE_URL=$PANDA_URL_SSL export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ + export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_rubin_prod elif [ "$instance" == "new" ]; then export PANDA_AUTH=oidc export PANDA_URL_SSL=https://ai-idds-05.cern.ch:25443/server/panda @@ -107,7 +112,7 @@ else # export IDDS_HOST=https://aipanda104.cern.ch:443/idds # doma - export IDDS_HOST=https://aipanda105.cern.ch:443/idds + # export IDDS_HOST=https://aipanda105.cern.ch:443/idds # export IDDS_BROKERS=atlas-test-mb.cern.ch:61013 # export IDDS_BROKER_DESTINATION=/topic/doma.idds diff --git a/monitor/data/conf.js b/monitor/data/conf.js index c5e4e602..3c9fa3ea 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus935.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus935.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus935.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus954.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus954.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus954.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/bin/run_workflow b/workflow/bin/run_workflow index fe5ece8f..e8eecd6d 100644 --- a/workflow/bin/run_workflow +++ b/workflow/bin/run_workflow @@ -37,13 +37,15 @@ setup_logging(__name__, stream=sys.stdout) def get_context_args(context, original_args, current_job_kwargs): - func_name, args, kwargs, multi_jobs_kwargs_list = None, None, None, None + func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = None, None, None, None, None if original_args: original_args = json_loads(original_args) - func_name, args, kwargs, multi_jobs_kwargs_list = original_args + func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = original_args if args: args = pickle.loads(base64.b64decode(args)) + if pre_kwargs: + pre_kwargs = pickle.loads(base64.b64decode(pre_kwargs)) if kwargs: kwargs = pickle.loads(base64.b64decode(kwargs)) if multi_jobs_kwargs_list: @@ -65,21 +67,23 @@ def get_context_args(context, original_args, current_job_kwargs): # kwargs.update(current_job_kwargs) except Exception as ex: logging.error("Failed to update kwargs: %s" % ex) - return context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs + return context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs def run_workflow(context, original_args, current_job_kwargs): - context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) + context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) logging.info("context: %s" % context) logging.info("func_name: %s" % func_name) + logging.info("pre_kwargs: %s" % pre_kwargs) logging.info("args: %s" % str(args)) logging.info("kwargs: %s" % kwargs) - logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list) + logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list)) + logging.info("current_job_kwargs: %s" % str(current_job_kwargs)) context.initialize() context.setup_source_files() - workflow = Workflow(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) + workflow = Workflow(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) logging.info("workflow: %s" % workflow) with workflow: ret = workflow.run() @@ -88,17 +92,19 @@ def run_workflow(context, original_args, current_job_kwargs): def run_work(context, original_args, current_job_kwargs): - context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) + context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) logging.info("context: %s" % context) logging.info("func_name: %s" % func_name) + logging.info("pre_kwargs: %s" % pre_kwargs) logging.info("args: %s" % str(args)) logging.info("kwargs: %s" % kwargs) - logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list) + logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list)) + logging.info("current_job_kwargs: %s" % str(current_job_kwargs)) context.initialize() context.setup_source_files() - work = Work(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) + work = Work(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) logging.info("work: %s" % work) ret = work.run() logging.info("run work result: %s" % str(ret)) diff --git a/workflow/tools/make/zipheader b/workflow/tools/make/zipheader index 97a8f167..0561725c 100644 --- a/workflow/tools/make/zipheader +++ b/workflow/tools/make/zipheader @@ -32,6 +32,8 @@ else fi fi; +export IDDS_LOG_LEVEL=debug + export PANDA_CONFIG_ROOT=$(pwd); export PANDA_VERIFY_HOST=off; export PANDA_BEHIND_REAL_LB=true;