diff --git a/common/lib/idds/common/constants.py b/common/lib/idds/common/constants.py index 578cdf71..a57d24ec 100644 --- a/common/lib/idds/common/constants.py +++ b/common/lib/idds/common/constants.py @@ -534,6 +534,24 @@ class ReturnCode(IDDSEnum): Locked = 1 +class GracefulEvent(object): + def __init__(self): + self.__is_set = False + + def set(self): + self.__is_set = True + + def is_set(self): + return self.__is_set + + +class AsyncResultStatus(IDDSEnum): + Running = 0 + Finished = 1 + SubFinished = 2 + Failed = 3 + + def get_work_status_from_transform_processing_status(status): if status in [ProcessingStatus.New, TransformStatus.New]: return WorkStatus.New diff --git a/common/lib/idds/common/imports.py b/common/lib/idds/common/imports.py index 4229f4e3..45422370 100644 --- a/common/lib/idds/common/imports.py +++ b/common/lib/idds/common/imports.py @@ -83,7 +83,7 @@ def import_func(name: str) -> Callable[..., Any]: # module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]] if module_name_bits == '__main__': module_name_bits = filename.replace('.py', '').replace('.pyc', '') - module_name_bits = module_name_bits.replace('/', '') + module_name_bits = module_name_bits.replace('/', '.') module_name_bits = module_name_bits.split('.') attribute_bits = attribute_bits.split('.') module = None diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index 87bcdab0..52aa3c84 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -76,7 +76,12 @@ def setup_logging(name, stream=None, loglevel=None): loglevel = getattr(logging, loglevel) if stream is None: - if config_has_section('common') and config_has_option('common', 'logdir'): + if os.environ.get('IDDS_LOG_FILE', None): + idds_log_file = os.environ.get('IDDS_LOG_FILE', None) + logging.basicConfig(filename=idds_log_file, + level=loglevel, + format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s') + elif config_has_section('common') and config_has_option('common', 'logdir'): logging.basicConfig(filename=os.path.join(config_get('common', 'logdir'), name), level=loglevel, format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s') diff --git a/main/etc/idds/idds.cfg.template b/main/etc/idds/idds.cfg.template index f516f1b9..93d89718 100755 --- a/main/etc/idds/idds.cfg.template +++ b/main/etc/idds/idds.cfg.template @@ -27,6 +27,7 @@ loglevel = DEBUG # dev: aipanda104 # doma: aipanda105-107 # idds-mon: aipanda108 + [database] #default = mysql://idds:idds@pcuwvirt5.cern.ch/idds #default = mysql://idds:idds_passwd@aipanda182.cern.ch/idds diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 0b9e8a46..86001cba 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -538,7 +538,7 @@ def process_update_processing(self, event): pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True) if not pr: - self.logger.error("Cannot find processing for event: %s" % str(event)) + self.logger.warn("Cannot find processing for event: %s" % str(event)) pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(pr) diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 782f3b09..b6ff6970 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -261,7 +261,7 @@ def process_new_processing(self, event): self.logger.info("process_new_processing, event: %s" % str(event)) pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True) if not pr: - self.logger.error("Cannot find processing for event: %s" % str(event)) + self.logger.warn("Cannot find processing for event: %s" % str(event)) else: log_pre = self.get_log_prefix(pr) self.logger.info(log_pre + "process_new_processing") diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 6adbd37f..8e37e9a1 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -196,7 +196,7 @@ def process_trigger_processing_real(self, event): self.logger.info("process_trigger_processing, event: %s" % str(event)) pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True) if not pr: - self.logger.error("Cannot find processing for event: %s" % str(event)) + self.logger.warn("Cannot find processing for event: %s" % str(event)) pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(pr) diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 9b42cea0..63d73cf0 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -1193,7 +1193,7 @@ def handle_update_irequest_real(self, req, event): failed_tfs += 1 req_status = RequestStatus.Transforming - if req['request_type'] in [RequestType.iWorkflowLocal] and total_tfs == 0: + if req['request_type'] in [RequestType.iWorkflowLocal]: workflow = req['request_metadata'].get('workflow', None) if workflow and req['created_at'] + datetime.timedelta(seconds=workflow.max_walltime) < datetime.datetime.utcnow(): req_status = RequestStatus.Finished @@ -1369,7 +1369,7 @@ def process_abort_request(self, event): if event: req = self.get_request(request_id=event._request_id, locking=True) if not req: - self.logger.error("Cannot find request for event: %s" % str(event)) + self.logger.warn("Cannot find request for event: %s" % str(event)) pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(req) @@ -1517,7 +1517,7 @@ def process_close_request(self, event): if event: req = self.get_request(request_id=event._request_id, locking=True) if not req: - self.logger.error("Cannot find request for event: %s" % str(event)) + self.logger.warn("Cannot find request for event: %s" % str(event)) pro_ret = ReturnCode.Locked.value else: log_pre = self.get_log_prefix(req) diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index 62c7f4f6..ff8c00c2 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -496,7 +496,7 @@ def process_new_transform(self, event): tf_status = [TransformStatus.New, TransformStatus.Ready, TransformStatus.Extend] tf = self.get_transform(transform_id=event._transform_id, status=tf_status, locking=True) if not tf: - self.logger.error("Cannot find transform for event: %s" % str(event)) + self.logger.warn("Cannot find transform for event: %s" % str(event)) else: log_pre = self.get_log_prefix(tf) self.logger.info(log_pre + "process_new_transform") diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 1e51409c..1111bb72 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -55,6 +55,17 @@ task_ids = [168747, 168761, 168763] task_ids = [13413] task_ids = [168859, 168861, 168862] +task_ids = [i for i in range(9021, 9222)] +task_ids = [i for i in range(169155, 169178)] +task_ids = [169182, 169183, 169184] + +task_ids = [5975, 8442, 10741, 10742, 10744, 10745, 10746, 10747] +task_ids = [15507, 15516, 15520, 15526, 15534, 15535, 15539, 15679, 15715] +task_ids = [169181, 169198, 169199, 169201, 169206] + [i for i in range(169210, 169232)] +task_ids = [169236, 169237, 169238, 169239, 169240, 169241] +task_ids = [169272, 169273, 169312, 169313] +task_ids = [169307, 169308, 169309, 169310, 169311, 169312, 169313, 169314] +task_ids = [i for i in range(169359, 169363)] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 004cd3d2..2e97d848 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -47,6 +47,8 @@ task_queue2 = 'CC-IN2P3_Rubin_Himem' task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem' task_queue4 = 'CC-IN2P3_Rubin_Merge' + # task_queue5 = 'CC-IN2P3_Rubin_IO' + task_queue5 = 'CC-IN2P3_Rubin_Extra_Himem' elif len(sys.argv) > 1 and sys.argv[1] == "lancs": site = 'lancs' task_cloud = 'EU' @@ -55,8 +57,10 @@ task_queue1 = 'LANCS_Rubin_Medium' task_queue2 = 'LANCS_Rubin_Himem' task_queue3 = 'LANCS_Rubin_Extra_Himem' - task_queue3 = 'LANCS_Rubin_Himem' + # task_queue3 = 'LANCS_Rubin_Himem' task_queue4 = 'LANCS_Rubin_Merge' + # task_queue5 = 'LANCS_Rubin_IO' + task_queue5 = 'LANCS_Rubin_Extra_Himem' else: site = 'slac' # task_cloud = 'LSST' @@ -71,6 +75,7 @@ task_queue2 = 'SLAC_Rubin_Himem' task_queue3 = 'SLAC_Rubin_Extra_Himem' task_queue4 = 'SLAC_Rubin_Merge' + task_queue5 = 'SLAC_Rubin_IO' # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores' # task_queue = 'SLAC_Rubin_Merge' # task_queue = 'SLAC_TEST' @@ -187,6 +192,26 @@ def setup_workflow(): "submitted": False} for k in range(6) ] + taskN6 = PanDATask() + taskN6.step = "step6" + taskN6.name = site + "_" + taskN6.step + "_" + randStr() + taskN6.dependencies = [ + {"name": "00006" + str(k), + "order_id": k, + "dependencies": [], + "submitted": False} for k in range(6) + ] + + taskN7 = PanDATask() + taskN7.step = "step7" + taskN7.name = site + "_" + taskN7.step + "_" + randStr() + taskN7.dependencies = [ + {"name": "00007" + str(k), + "order_id": k, + "dependencies": [], + "submitted": False} for k in range(6) + ] + work1 = DomaPanDAWork(executable='echo', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], @@ -265,6 +290,39 @@ def setup_workflow(): "value": "log.tgz"}, task_cloud=task_cloud) + work6 = DomaPanDAWork(executable='echo', + primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, + output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], + log_collections=[], dependency_map=taskN6.dependencies, + task_name=taskN6.name, task_queue=task_queue5, + encode_command_line=True, + task_priority=981, + prodSourceLabel='managed', + task_log={"dataset": "PandaJob_#{pandaid}/", + "destination": "local", + "param_type": "log", + "token": "local", + "type": "template", + "value": "log.tgz"}, + task_cloud=task_cloud) + + work7 = DomaPanDAWork(executable='echo', + primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, + output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], + log_collections=[], dependency_map=taskN7.dependencies, + task_name=taskN7.name, task_queue=task_queue3, + encode_command_line=True, + task_priority=981, + core_count=2, + prodSourceLabel='managed', + task_log={"dataset": "PandaJob_#{pandaid}/", + "destination": "local", + "param_type": "log", + "token": "local", + "type": "template", + "value": "log.tgz"}, + task_cloud=task_cloud) + pending_time = 12 # pending_time = None workflow = Workflow(pending_time=pending_time) @@ -273,6 +331,8 @@ def setup_workflow(): workflow.add_work(work3) workflow.add_work(work4) workflow.add_work(work5) + workflow.add_work(work6) + workflow.add_work(work7) workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time() return workflow diff --git a/main/lib/idds/tests/test_get_source_code.py b/main/lib/idds/tests/test_get_source_code.py index d2de7890..772b09e2 100644 --- a/main/lib/idds/tests/test_get_source_code.py +++ b/main/lib/idds/tests/test_get_source_code.py @@ -17,3 +17,8 @@ def foo(arg1, arg2): print(inspect.signature(foo)) print(dill.dumps(foo)) + +foo_str = dill.dumps(foo) +foo_1 = dill.loads(foo_str) +ret = foo_1(1, 3) +print(ret) diff --git a/main/tools/env/setup_panda.sh b/main/tools/env/setup_panda.sh index 7d7bc097..be983c95 100644 --- a/main/tools/env/setup_panda.sh +++ b/main/tools/env/setup_panda.sh @@ -27,7 +27,7 @@ if [ "$instance" == "k8s" ]; then export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ -elif [ "$instance" == "slac" ]; then +elif [ "$instance" == "usdf_dev" ]; then export PANDA_AUTH=oidc export PANDA_BEHIND_REAL_LB=true export PANDA_VERIFY_HOST=off @@ -36,11 +36,15 @@ elif [ "$instance" == "slac" ]; then export PANDAMON_URL=https://rubin-panda-bigmon-dev.slac.stanford.edu export PANDA_AUTH_VO=Rubin + # export PANDA_AUTH_VO=Rubin.production + export PANDACACHE_URL=$PANDA_URL_SSL export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ + + export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_rubin_dev elif [ "$instance" == "usdf" ]; then export PANDA_AUTH=oidc export PANDA_BEHIND_REAL_LB=true @@ -49,13 +53,14 @@ elif [ "$instance" == "usdf" ]; then export PANDA_URL=https://usdf-panda-server.slac.stanford.edu:8443/server/panda export PANDACACHE_URL=$PANDA_URL_SSL export PANDAMON_URL=https://usdf-panda-bigmon.slac.stanford.edu:8443/ - export PANDA_AUTH_VO=Rubin:production + export PANDA_AUTH_VO=Rubin.production export PANDACACHE_URL=$PANDA_URL_SSL export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ + export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_rubin_prod elif [ "$instance" == "new" ]; then export PANDA_AUTH=oidc export PANDA_URL_SSL=https://ai-idds-05.cern.ch:25443/server/panda @@ -107,7 +112,7 @@ else # export IDDS_HOST=https://aipanda104.cern.ch:443/idds # doma - export IDDS_HOST=https://aipanda105.cern.ch:443/idds + # export IDDS_HOST=https://aipanda105.cern.ch:443/idds # export IDDS_BROKERS=atlas-test-mb.cern.ch:61013 # export IDDS_BROKER_DESTINATION=/topic/doma.idds diff --git a/monitor/data/conf.js b/monitor/data/conf.js index c5e4e602..3c9fa3ea 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus935.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus935.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus935.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus954.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus954.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus954.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/bin/run_workflow b/workflow/bin/run_workflow index fe5ece8f..e8eecd6d 100644 --- a/workflow/bin/run_workflow +++ b/workflow/bin/run_workflow @@ -37,13 +37,15 @@ setup_logging(__name__, stream=sys.stdout) def get_context_args(context, original_args, current_job_kwargs): - func_name, args, kwargs, multi_jobs_kwargs_list = None, None, None, None + func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = None, None, None, None, None if original_args: original_args = json_loads(original_args) - func_name, args, kwargs, multi_jobs_kwargs_list = original_args + func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = original_args if args: args = pickle.loads(base64.b64decode(args)) + if pre_kwargs: + pre_kwargs = pickle.loads(base64.b64decode(pre_kwargs)) if kwargs: kwargs = pickle.loads(base64.b64decode(kwargs)) if multi_jobs_kwargs_list: @@ -65,21 +67,23 @@ def get_context_args(context, original_args, current_job_kwargs): # kwargs.update(current_job_kwargs) except Exception as ex: logging.error("Failed to update kwargs: %s" % ex) - return context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs + return context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs def run_workflow(context, original_args, current_job_kwargs): - context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) + context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) logging.info("context: %s" % context) logging.info("func_name: %s" % func_name) + logging.info("pre_kwargs: %s" % pre_kwargs) logging.info("args: %s" % str(args)) logging.info("kwargs: %s" % kwargs) - logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list) + logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list)) + logging.info("current_job_kwargs: %s" % str(current_job_kwargs)) context.initialize() context.setup_source_files() - workflow = Workflow(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) + workflow = Workflow(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) logging.info("workflow: %s" % workflow) with workflow: ret = workflow.run() @@ -88,17 +92,19 @@ def run_workflow(context, original_args, current_job_kwargs): def run_work(context, original_args, current_job_kwargs): - context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) + context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) logging.info("context: %s" % context) logging.info("func_name: %s" % func_name) + logging.info("pre_kwargs: %s" % pre_kwargs) logging.info("args: %s" % str(args)) logging.info("kwargs: %s" % kwargs) - logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list) + logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list)) + logging.info("current_job_kwargs: %s" % str(current_job_kwargs)) context.initialize() context.setup_source_files() - work = Work(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) + work = Work(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) logging.info("work: %s" % work) ret = work.run() logging.info("run work result: %s" % str(ret)) diff --git a/workflow/lib/idds/iworkflow/asyncresult.py b/workflow/lib/idds/iworkflow/asyncresult.py index 06666233..2f5c2204 100644 --- a/workflow/lib/idds/iworkflow/asyncresult.py +++ b/workflow/lib/idds/iworkflow/asyncresult.py @@ -18,7 +18,7 @@ from queue import Queue -from idds.common.constants import WorkflowType +from idds.common.constants import WorkflowType, GracefulEvent from idds.common.utils import json_dumps, json_loads, setup_logging, get_unique_id_for_dict from .base import Base @@ -95,9 +95,10 @@ def has_result(self, name=None, args=None, key=None): return True return False - def get_result(self, name=None, args=None, key=None): - logging.debug("get_result: key %s, name: %s, args: %s" % (key, name, args)) - logging.debug("get_result: results: %s, name_results: %s" % (self._results, self._name_results)) + def get_result(self, name=None, args=None, key=None, verbose=False): + if verbose: + logging.info("get_result: key %s, name: %s, args: %s" % (key, name, args)) + logging.info("get_result: results: %s, name_results: %s" % (self._results, self._name_results)) name_key = key if name_key is not None: @@ -110,6 +111,8 @@ def get_result(self, name=None, args=None, key=None): ret = self._name_results.get(name_key, None) else: ret = self._results.get(key, None) + if verbose: + logging.info("get_result: name key %s, args key %s, ret: %s" % (name_key, key, ret)) return ret def get_all_results(self): @@ -133,8 +136,9 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs self._queue = Queue() self._connections = [] - self._graceful_stop = None + self._graceful_stop = False self._subscribe_thread = None + self._subscribed = False self._results = [] self._bad_results = [] @@ -182,10 +186,66 @@ def wait_keys(self): def wait_keys(self, value): self._wait_keys = set(value) + @property + def is_all_results_available(self): + percent = self.get_results_percentage() + if percent >= self._wait_percent: + return True + + @is_all_results_available.setter + def is_all_results_available(self, value): + raise Exception("Not allowd to set is_all_results_available") + + @property + def is_finished(self): + if self._graceful_stop and self._graceful_stop.is_set(): + percent = self.get_results_percentage() + if percent >= self._wait_percent: + return True + return False + + @is_finished.setter + def is_finished(self, value): + raise Exception("Not allowd to set is_finished") + + @property + def is_subfinished(self): + if self._graceful_stop and self._graceful_stop.is_set(): + percent = self.get_results_percentage() + if percent > 0 and percent < self._wait_percent: + return True + return False + + @is_subfinished.setter + def is_subfinished(self, value): + raise Exception("Not allowd to set is_subfinished") + + @property + def is_failed(self): + if self._graceful_stop and self._graceful_stop.is_set(): + percent = self.get_results_percentage() + if percent <= 0: + return True + return False + + @is_failed.setter + def is_failed(self, value): + raise Exception("Not allowd to set is_failed") + + @property + def is_terminated(self): + return self._graceful_stop and self._graceful_stop.is_set() + + @is_terminated.setter + def is_terminated(self, value): + raise Exception("Not allowd to set is_terminated") + @property def results(self): + has_new_data = False while not self._queue.empty(): ret = self._queue.get() + has_new_data = True try: internal_id = ret['internal_id'] if internal_id == self.internal_id: @@ -221,6 +281,10 @@ def results(self): ret_map = MapResult() for k in rets: ret_map.add_result(key=k, result=rets[k]) + + if has_new_data: + self.logger.debug('percent %s, results: %s' % (self._results_percentage, str(ret_map))) + return ret_map else: rets = [] @@ -233,6 +297,9 @@ def results(self): rets = [rets_dict[k] for k in rets_dict] self._results_percentage = len(rets) * 1.0 / self._wait_num + if has_new_data: + self.logger.debug('percent %s, results: %s' % (self._results_percentage, str(rets))) + if self._wait_num == 1: if rets: return rets[0] @@ -375,7 +442,7 @@ def run_subscriber(self): try: self.logger.info("run subscriber") self.subscribe_to_messaging_brokers() - while not self._graceful_stop.is_set(): + while self._graceful_stop and not self._graceful_stop.is_set(): has_failed_conns = False for conn in self._connections: if not conn.is_connected(): @@ -383,16 +450,23 @@ def run_subscriber(self): if has_failed_conns: self.subscribe_to_messaging_brokers() time.sleep(1) + self.stop() except Exception as ex: self.logger.error("run subscriber failed with error: %s" % str(ex)) self.logger.error(traceback.format_exc()) + self.stop() - def get_results(self, nologs=False): + def get_results(self, nologs=True): old_nologs = self._nologs self._nologs = nologs rets = self.results if not self._nologs: - self.logger.debug('results: %s' % str(rets)) + self.logger.debug('percent %s, results: %s' % (self.get_results_percentage(), str(rets))) + + percent = self.get_results_percentage() + if percent >= self._wait_percent: + self.stop() + self.logger.info("Got results: %s (number of wrong keys: %s)" % (percent, self._num_wrong_keys)) self._nologs = old_nologs return rets @@ -400,23 +474,24 @@ def get_results_percentage(self): return self._results_percentage def subscribe(self): - self._graceful_stop = threading.Event() - thread = threading.Thread(target=self.run_subscriber, name="RunSubscriber") - thread.start() - time.sleep(1) - self._subscribed = True + if not self._subscribed: + self._graceful_stop = GracefulEvent() + thread = threading.Thread(target=self.run_subscriber, name="RunSubscriber") + thread.start() + time.sleep(1) + self._subscribed = True def stop(self): if self._graceful_stop: self._graceful_stop.set() self.disconnect() + self._subscribed = False def __del__(self): self.stop() def wait_results(self, timeout=None, force_return_results=False): - if not self._subscribed: - self.subscribe() + self.subscribe() get_results = False time_log = time.time() @@ -424,14 +499,14 @@ def wait_results(self, timeout=None, force_return_results=False): if timeout is None: self.logger.info("waiting for results") try: - while not get_results and not self._graceful_stop.is_set(): + while not get_results and self._graceful_stop and not self._graceful_stop.is_set(): self.get_results(nologs=True) percent = self.get_results_percentage() if time.time() - time_log > 600: # 10 minutes self.logger.info("waiting for results: %s (number of wrong keys: %s)" % (percent, self._num_wrong_keys)) time_log = time.time() time.sleep(1) - if percent >= self._wait_percent: + if self.is_all_results_available: get_results = True self.waiting_result_terminated = True self.logger.info("Got result percentage %s is not smaller then wait_percent %s, set waiting_result_terminated to True" % (percent, self._wait_percent)) @@ -452,7 +527,7 @@ def wait_results(self, timeout=None, force_return_results=False): self.logger.error(traceback.format_exc()) self._graceful_stop.set() - if get_results or self._graceful_stop.is_set() or percent >= self._wait_percent or force_return_results: + if get_results or self._graceful_stop.is_set() or self.is_all_results_available or force_return_results: # stop the subscriber self._graceful_stop.set() self.logger.info("Got results: %s (number of wrong keys: %s)" % (percent, self._num_wrong_keys)) diff --git a/workflow/lib/idds/iworkflow/base.py b/workflow/lib/idds/iworkflow/base.py index 9d486f63..a478ea70 100644 --- a/workflow/lib/idds/iworkflow/base.py +++ b/workflow/lib/idds/iworkflow/base.py @@ -8,6 +8,7 @@ # Authors: # - Wen Guan, , 2024 +import copy import base64 import logging import inspect @@ -52,18 +53,23 @@ def internal_id(self, value): def get_func_name_and_args(self, func, + pre_kwargs=None, args=None, kwargs=None, multi_jobs_kwargs_list=None): if args is None: args = () + if pre_kwargs is None: + pre_kwargs = {} if kwargs is None: kwargs = {} if multi_jobs_kwargs_list is None: multi_jobs_kwargs_list = [] if not isinstance(args, (tuple, list)): raise TypeError('{0!r} is not a valid args list'.format(args)) + if not isinstance(pre_kwargs, dict): + raise TypeError('{0!r} is not a valid pre_kwargs dict'.format(pre_kwargs)) if not isinstance(kwargs, dict): raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs)) if not isinstance(multi_jobs_kwargs_list, list): @@ -82,12 +88,14 @@ def get_func_name_and_args(self, if args: args = base64.b64encode(pickle.dumps(args)).decode("utf-8") + if pre_kwargs: + pre_kwargs = base64.b64encode(pickle.dumps(pre_kwargs)).decode("utf-8") if kwargs: kwargs = base64.b64encode(pickle.dumps(kwargs)).decode("utf-8") if multi_jobs_kwargs_list: multi_jobs_kwargs_list = [base64.b64encode(pickle.dumps(k)).decode("utf-8") for k in multi_jobs_kwargs_list] - return func_call, (func_name, args, kwargs, multi_jobs_kwargs_list) + return func_call, (func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list) @property def logger(self): @@ -164,17 +172,26 @@ def load(self, func_name): return func - def run_func(self, func, args, kwargs): + def run_func(self, func, pre_kwargs, args, kwargs): """ Run the function. :raise Exception. """ try: - return func(*args, **kwargs) + logging.info("func type: %s: %s" % (type(func), str(func))) + logging.info("pre_kwargs type: %s: %s" % (type(pre_kwargs), str(pre_kwargs))) + logging.info("args type: %s: %s" % (type(args), str(args))) + logging.info("kwargs type: %s: %s" % (type(kwargs), str(kwargs))) + kwargs_copy = copy.deepcopy(pre_kwargs) + kwargs_copy.update(kwargs) + if kwargs_copy: + return func(*args, **kwargs_copy) + else: + return func(*args) except Exception as ex: logging.error("Failed to run the function: %s" % str(ex)) - logging.debug(traceback.format_exc()) + logging.error(traceback.format_exc()) class Context(DictBase): diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 55e47e91..48404ed3 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -10,6 +10,7 @@ # - Lino Oscar Gerlach, , 2024 import base64 +import copy import datetime import functools import json @@ -20,7 +21,7 @@ import traceback from idds.common import exceptions -from idds.common.constants import WorkflowType, TransformStatus +from idds.common.constants import WorkflowType, TransformStatus, AsyncResultStatus from idds.common.imports import get_func_name from idds.common.utils import setup_logging, json_dumps, json_loads, encode_base64, modified_environ from .asyncresult import AsyncResult, MapResult @@ -337,7 +338,7 @@ def setup(self): class Work(Base): - def __init__(self, func=None, workflow_context=None, context=None, args=None, kwargs=None, multi_jobs_kwargs_list=None, + def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=None, args=None, kwargs=None, multi_jobs_kwargs_list=None, current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False): """ Init a workflow. @@ -346,7 +347,8 @@ def __init__(self, func=None, workflow_context=None, context=None, args=None, kw self.prepared = False # self._func = func - self._func, self._func_name_and_args = self.get_func_name_and_args(func, args, kwargs, multi_jobs_kwargs_list) + self._func, self._func_name_and_args = self.get_func_name_and_args(func, pre_kwargs, args, kwargs, multi_jobs_kwargs_list) + self._func = None self._current_job_kwargs = current_job_kwargs if self._current_job_kwargs: @@ -364,8 +366,12 @@ def __init__(self, func=None, workflow_context=None, context=None, args=None, kw else: self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env) + self._async_ret = None + self.map_results = map_results self._results = None + self._async_result_initialized = False + self._async_result_status = None @property def logger(self): @@ -563,7 +569,7 @@ def token(self, value): @property def multi_jobs_kwargs_list(self): - return self._func_name_and_args[3] + return self._func_name_and_args[4] @multi_jobs_kwargs_list.setter def multi_jobs_kwargs_list(self, value): @@ -633,6 +639,7 @@ def submit(self): :raise Exception when failing to submit the workflow. """ try: + # self._func = None if self._context.get_service() == 'panda': tf_id = self.submit_to_panda_server() else: @@ -678,6 +685,7 @@ def get_status_from_panda_server(self): else: tf = None logging.error("Failed to get transform (request_id: %s, transform_id: %s) status from PanDA-iDDS: %s" % (request_id, transform_id, ret)) + return TransformStatus.Transforming if not tf: logging.info("Get transform (request_id: %s, transform_id: %s) from PanDA-iDDS: %s" % (request_id, transform_id, tf)) @@ -715,57 +723,152 @@ def get_status(self): except Exception as ex: logging.info("Failed to get transform status: %s" % str(ex)) + def get_finished_status(self): + return [TransformStatus.Finished] + + def get_subfinished_status(self): + return [TransformStatus.SubFinished] + + def get_failed_status(self): + return [None, TransformStatus.Failed, TransformStatus.Cancelled, + TransformStatus.Suspended, TransformStatus.Expired] + def get_terminated_status(self): return [None, TransformStatus.Finished, TransformStatus.SubFinished, TransformStatus.Failed, TransformStatus.Cancelled, TransformStatus.Suspended, TransformStatus.Expired] + def is_terminated(self): + status = self.get_status() + if status in self.get_terminated_status(): + self.stop_async_result() + return True + if self._async_ret: + self._async_ret.get_results(nologs=True) + if self._async_ret.is_terminated: + self.stop_async_result() + return True + if self._async_result_status in [AsyncResultStatus.Finished, AsyncResultStatus.SubFinished, AsyncResultStatus.Failed]: + return True + return False + + def is_finished(self): + status = self.get_status() + if status in self.get_finished_status(): + self.stop_async_result() + return True + if self._async_ret: + self._async_ret.get_results(nologs=True) + if self._async_ret.is_finished: + self.stop_async_result() + return True + if self._async_result_status in [AsyncResultStatus.Finished]: + return True + return False + + def is_subfinished(self): + status = self.get_status() + if status in self.get_subfinished_status(): + self.stop_async_result() + return True + if self._async_ret: + self._async_ret.get_results(nologs=True) + if self._async_ret.is_subfinished: + self.stop_async_result() + return True + if self._async_result_status in [AsyncResultStatus.SubFinished]: + return True + return False + + def is_failed(self): + status = self.get_status() + if status in self.get_failed_status(): + self.stop_async_result() + return True + if self._async_ret: + self._async_ret.get_results(nologs=True) + if self._async_ret.is_failed: + self.stop_async_result() + return True + if self._async_result_status in [AsyncResultStatus.Failed]: + return True + return False + def get_func_name(self): func_name = self._func_name_and_args[0] return func_name def get_multi_jobs_kwargs_list(self): - multi_jobs_kwargs_list = self._func_name_and_args[3] + multi_jobs_kwargs_list = self.multi_jobs_kwargs_list multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] return multi_jobs_kwargs_list + def init_async_result(self): + if not self._async_result_initialized: + multi_jobs_kwargs_list = self.get_multi_jobs_kwargs_list() + if multi_jobs_kwargs_list: + self._async_ret = AsyncResult(self._context, name=self.get_func_name(), multi_jobs_kwargs_list=multi_jobs_kwargs_list, + map_results=self.map_results, internal_id=self.internal_id) + else: + self._async_ret = AsyncResult(self._context, name=self.get_func_name(), wait_num=1, internal_id=self.internal_id) + + self._async_result_initialized = True + self._async_result_status = AsyncResultStatus.Running + self._async_ret.subscribe() + + def stop_async_result(self): + if self._async_ret: + self._async_ret.stop() + self._results = self._async_ret.get_results() + if self._async_ret.is_finished: + self._async_result_status = AsyncResultStatus.Finished + elif self._async_ret.is_subfinished: + self._async_result_status = AsyncResultStatus.SubFinished + elif self._async_ret.is_failed: + self._async_result_status = AsyncResultStatus.Failed + self._async_ret = None + # self._async_result_initialized = False + def wait_results(self): try: terminated_status = self.get_terminated_status() - multi_jobs_kwargs_list = self.get_multi_jobs_kwargs_list() - if multi_jobs_kwargs_list: - async_ret = AsyncResult(self._context, name=self.get_func_name(), multi_jobs_kwargs_list=multi_jobs_kwargs_list, - map_results=self.map_results, internal_id=self.internal_id) - else: - async_ret = AsyncResult(self._context, name=self.get_func_name(), wait_num=1, internal_id=self.internal_id) + # multi_jobs_kwargs_list = self.get_multi_jobs_kwargs_list() + # if multi_jobs_kwargs_list: + # async_ret = AsyncResult(self._context, name=self.get_func_name(), multi_jobs_kwargs_list=multi_jobs_kwargs_list, + # map_results=self.map_results, internal_id=self.internal_id) + # else: + # async_ret = AsyncResult(self._context, name=self.get_func_name(), wait_num=1, internal_id=self.internal_id) - async_ret.subscribe() + # async_ret.subscribe() + self.init_async_result() status = self.get_status() time_last_check_status = time.time() logging.info("waiting for results") while status not in terminated_status: # time.sleep(10) - ret = async_ret.wait_results(timeout=10) + ret = self._async_ret.wait_results(timeout=10) if ret: logging.info("Recevied result: %s" % str(ret)) break - if async_ret.waiting_result_terminated: + if self._async_ret.waiting_result_terminated: logging.info("waiting_result_terminated is set, Received result is: %s" % str(ret)) if time.time() - time_last_check_status > 600: # 10 minutes status = self.get_status() time_last_check_status = time.time() - async_ret.stop() - self._results = async_ret.wait_results(force_return_results=True) + self._results = self._async_ret.wait_results(force_return_results=True) + self.stop_async_result() return self._results except Exception as ex: logging.error("wait_results got some errors: %s" % str(ex)) - async_ret.stop() + self.stop_async_result() return ex def get_results(self): + if self._async_ret: + self._results = self._async_ret.get_results() return self._results def setup(self): @@ -814,11 +917,13 @@ def run(self): """ self.pre_run() - func_name, args, kwargs, multi_jobs_kwargs_list = self._func_name_and_args + func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = self._func_name_and_args current_job_kwargs = self._current_job_kwargs if args: args = pickle.loads(base64.b64decode(args)) + if pre_kwargs: + pre_kwargs = pickle.loads(base64.b64decode(pre_kwargs)) if kwargs: kwargs = pickle.loads(base64.b64decode(kwargs)) if multi_jobs_kwargs_list: @@ -832,11 +937,15 @@ def run(self): if self._context.distributed: rets = None - kwargs_copy = kwargs.copy() + args_copy = copy.deepcopy(args) + pre_kwargs_copy = copy.deepcopy(pre_kwargs) + kwargs_copy = copy.deepcopy(kwargs) if current_job_kwargs and type(current_job_kwargs) in [dict]: kwargs_copy.update(current_job_kwargs) + elif current_job_kwargs and type(current_job_kwargs) in [tuple, list]: + args_copy = copy.deepcopy(current_job_kwargs) - rets = self.run_func(self._func, args, kwargs_copy) + rets = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy) request_id = self._context.request_id transform_id = self._context.transform_id @@ -852,7 +961,7 @@ def run(self): return self._results else: if not multi_jobs_kwargs_list: - rets = self.run_func(self._func, args, kwargs) + rets = self.run_func(self._func, pre_kwargs, args, kwargs) if not self.map_results: self._results = rets else: @@ -863,16 +972,28 @@ def run(self): if not self.map_results: self._results = [] for one_job_kwargs in multi_jobs_kwargs_list: - kwargs_copy = kwargs.copy() - kwargs_copy.update(one_job_kwargs) - rets = self.run_func(self._func, args, kwargs_copy) + kwargs_copy = copy.deepcopy(kwargs) + args_copy = copy.deepcopy(args) + pre_kwargs_copy = copy.deepcopy(pre_kwargs) + if type(one_job_kwargs) in [dict]: + kwargs_copy.update(one_job_kwargs) + elif type(one_job_kwargs) in [tuple, list]: + args_copy = copy.deepcopy(one_job_kwargs) + + rets = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy) self._results.append(rets) else: self._results = MapResult() for one_job_kwargs in multi_jobs_kwargs_list: - kwargs_copy = kwargs.copy() - kwargs_copy.update(one_job_kwargs) - rets = self.run_func(self._func, args, kwargs_copy) + kwargs_copy = copy.deepcopy(kwargs) + args_copy = copy.deepcopy(args) + pre_kwargs_copy = copy.deepcopy(pre_kwargs) + if type(one_job_kwargs) in [dict]: + kwargs_copy.update(one_job_kwargs) + elif type(one_job_kwargs) in [tuple, list]: + args_copy = copy.deepcopy(one_job_kwargs) + + rets = self.run_func(self._func, pre_kwargs_copy, args_copy, kwargs_copy) self._results.add_result(name=self.get_func_name(), args=one_job_kwargs, result=rets) return self._results @@ -918,17 +1039,18 @@ def run_work_distributed(w): # foo = work(arg)(foo) -def work(func=None, *, map_results=False, lazy=False, init_env=None): +def work(func=None, *, workflow=None, pre_kwargs={}, return_work=False, map_results=False, lazy=False, init_env=None, no_wraps=False): if func is None: - return functools.partial(work, map_results=map_results, lazy=lazy, init_env=init_env) + return functools.partial(work, workflow=workflow, pre_kwargs=pre_kwargs, return_work=return_work, no_wraps=no_wraps, + map_results=map_results, lazy=lazy, init_env=init_env) if 'IDDS_IGNORE_WORK_DECORATOR' in os.environ: return func - @functools.wraps(func) + # @functools.wraps(func) def wrapper(*args, **kwargs): try: - f = kwargs.pop('workflow', None) or WorkflowCanvas.get_current_workflow() + f = workflow or kwargs.pop('workflow', None) or WorkflowCanvas.get_current_workflow() workflow_context = f._context multi_jobs_kwargs_list = kwargs.pop('multi_jobs_kwargs_list', []) logging.debug("workflow context: %s" % workflow_context) @@ -936,9 +1058,13 @@ def wrapper(*args, **kwargs): logging.debug("work decorator: func: %s, map_results: %s" % (func, map_results)) if workflow_context: logging.debug("setup work") - w = Work(workflow_context=workflow_context, func=func, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, - map_results=map_results, init_env=init_env) + w = Work(workflow_context=workflow_context, func=func, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, + multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=map_results, init_env=init_env) # if distributed: + + if return_work: + return w + if workflow_context.distributed: ret = run_work_distributed(w) return ret @@ -947,24 +1073,42 @@ def wrapper(*args, **kwargs): else: logging.info("workflow context is not defined, run function locally") if not multi_jobs_kwargs_list: - return func(*args, **kwargs) + kwargs_copy = copy.deepcopy(pre_kwargs) + kwargs_copy.update(kwargs) + return func(*args, **kwargs_copy) if not kwargs: kwargs = {} if not map_results: rets = [] for one_job_kwargs in multi_jobs_kwargs_list: - kwargs_copy = kwargs.copy() - kwargs_copy.update(one_job_kwargs) - ret = func(*args, **kwargs_copy) + kwargs_copy = copy.deepcopy(kwargs) + args_copy = copy.deepcopy(args) + pre_kwargs_copy = copy.deepcopy(pre_kwargs) + if type(one_job_kwargs) in [dict]: + kwargs_copy.update(one_job_kwargs) + elif type(one_job_kwargs) in [tuple, list]: + args_copy = copy.deepcopy(one_job_kwargs) + + pre_kwargs_copy.update(kwargs_copy) + + ret = func(*args_copy, **pre_kwargs_copy) rets.append(ret) return rets else: rets = MapResult() for one_job_kwargs in multi_jobs_kwargs_list: - kwargs_copy = kwargs.copy() - kwargs_copy.update(one_job_kwargs) - ret = func(*args, **kwargs_copy) + kwargs_copy = copy.deepcopy(kwargs) + args_copy = copy.deepcopy(args) + pre_kwargs_copy = copy.deepcopy(pre_kwargs) + if type(one_job_kwargs) in [dict]: + kwargs_copy.update(one_job_kwargs) + elif type(one_job_kwargs) in [tuple, list]: + args_copy = copy.deepcopy(one_job_kwargs) + + pre_kwargs_copy.update(kwargs_copy) + + ret = func(*args_copy, **pre_kwargs_copy) rets.add_result(name=get_func_name(func), args=one_job_kwargs, result=ret) return rets except Exception as ex: @@ -972,4 +1116,7 @@ def wrapper(*args, **kwargs): raise ex except: raise - return wrapper + if no_wraps: + return wrapper + else: + return functools.wraps(func)(wrapper) diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 9d718ae5..3a3f6bcc 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -676,8 +676,8 @@ def prepare(self): class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, - args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, init_env=None, is_unique_func_name=False, - max_walltime=24 * 3600): + pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, + init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None): """ Init a workflow. """ @@ -685,7 +685,7 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo self.prepared = False # self._func = func - self._func, self._func_name_and_args = self.get_func_name_and_args(func, args, kwargs, multi_jobs_kwargs_list) + self._func, self._func_name_and_args = self.get_func_name_and_args(func, pre_kwargs, args, kwargs, multi_jobs_kwargs_list) self._current_job_kwargs = current_job_kwargs if self._current_job_kwargs: self._current_job_kwargs = base64.b64encode(pickle.dumps(self._current_job_kwargs)).decode("utf-8") @@ -696,10 +696,11 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo if not is_unique_func_name: if self._name: self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S") - source_dir = self.get_source_dir(self._func, source_dir) + source_dir = self.get_source_dir(self._func, source_dir, source_dir_parent_level=source_dir_parent_level) workflow_type = WorkflowType.iWorkflow if local: + self._func = None workflow_type = WorkflowType.iWorkflowLocal if context is not None: @@ -888,7 +889,7 @@ def get_work_name(self): @property def multi_jobs_kwargs_list(self): - return self._func_name_and_args[3] + return self._func_name_and_args[4] @multi_jobs_kwargs_list.setter def multi_jobs_kwargs_list(self, value): @@ -901,7 +902,7 @@ def to_dict(self): self._func = func return obj - def get_source_dir(self, func, source_dir): + def get_source_dir(self, func, source_dir, source_dir_parent_level=None): if source_dir: return source_dir if func: @@ -911,7 +912,11 @@ def get_source_dir(self, func, source_dir): if not source_file: return None file_path = os.path.abspath(source_file) - return os.path.dirname(file_path) + source_dir = os.path.dirname(file_path) + if source_dir_parent_level and source_dir_parent_level > 0: + for _ in range(0, source_dir_parent_level): + source_dir = os.path.dirname(source_dir) + return source_dir return None def prepare(self): @@ -1040,6 +1045,9 @@ def close(self): except Exception as ex: logging.error("Failed to close request(%s): %s" % (self._context.request_id, str(ex))) + def __del__(self): + self.close() + def setup(self): """ :returns command: `str` to setup the workflow. @@ -1098,9 +1106,11 @@ def run(self): if True: self.pre_run() - func_name, args, kwargs, multi_jobs_kwargs_list = self._func_name_and_args + func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = self._func_name_and_args if args: args = pickle.loads(base64.b64decode(args)) + if pre_kwargs: + pre_kwargs = pickle.loads(base64.b64decode(pre_kwargs)) if kwargs: kwargs = pickle.loads(base64.b64decode(kwargs)) if multi_jobs_kwargs_list: @@ -1109,7 +1119,7 @@ def run(self): if self._func is None: func = self.load(func_name) self._func = func - ret = self.run_func(self._func, args, kwargs) + ret = self.run_func(self._func, pre_kwargs, args, kwargs) return ret @@ -1154,19 +1164,22 @@ def get_func_name(self): # foo = workflow(arg)(foo) -def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True, init_env=None): +def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, + max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False, + source_dir_parent_level=None): if func is None: return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, - max_walltime=max_walltime, distributed=distributed, init_env=init_env) + max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps, + return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level) if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ: return func - @functools.wraps(func) + # @functools.wraps(func) def wrapper(*args, **kwargs): try: f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed, - args=args, kwargs=kwargs, init_env=init_env) + pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level) f.queue = queue f.site = site f.cloud = cloud @@ -1178,6 +1191,10 @@ def wrapper(*args, **kwargs): logging.info("Registering workflow") f.submit() + logging.info("return_workflow %s" % return_workflow) + if return_workflow: + return f + if not local: logging.info("Run workflow at remote sites") return f @@ -1191,7 +1208,10 @@ def wrapper(*args, **kwargs): raise ex except: raise - return wrapper + if no_wraps: + return wrapper + else: + return functools.wraps(func)(wrapper) def workflow_old(func=None, *, lazy=False, service='panda', source_dir=None, primary=False, distributed=True): diff --git a/workflow/tools/make/zipheader b/workflow/tools/make/zipheader index 97a8f167..0561725c 100644 --- a/workflow/tools/make/zipheader +++ b/workflow/tools/make/zipheader @@ -32,6 +32,8 @@ else fi fi; +export IDDS_LOG_LEVEL=debug + export PANDA_CONFIG_ROOT=$(pwd); export PANDA_VERIFY_HOST=off; export PANDA_BEHIND_REAL_LB=true;