From a9a529d7e24a89db90242b43cce4fdf17ba1ddc0 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 3 Apr 2024 14:56:26 +0200 Subject: [PATCH 1/2] fix workflow wrapper --- workflow/tools/make/zipheader | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/workflow/tools/make/zipheader b/workflow/tools/make/zipheader index d4507d25..97a8f167 100644 --- a/workflow/tools/make/zipheader +++ b/workflow/tools/make/zipheader @@ -70,9 +70,13 @@ run_args=$@ echo "run_args: " $run_args cmdfile="run_workflow.sh" -cat <<- EOF > ./$cmdfile +cat <<- EOF > ${current_dir}/$cmdfile #/bin/bash +echo "current dir: " \$PWD + +# cd ${current_dir} + current_dir=\$PWD export PATH=\${current_dir}:\${current_dir}/tmp_bin:\${current_dir}/bin:\$PATH export PYTHONPATH=\${current_dir}:\${current_dir}/lib_py:\$PYTHONPATH @@ -91,15 +95,16 @@ $run_args EOF -chmod +x ./$cmdfile +chmod +x ${current_dir}/$cmdfile # exec python "$@" # python "$@" # exec "$@" -$pre_setup +# eval $pre_setup -$setup ./$cmdfile +cmd="$pre_setup $setup ${current_dir}/$cmdfile" +eval $cmd ret=$? echo pwd From 91dc366622978339edbd0186963dc1787533122a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 3 Apr 2024 14:56:47 +0200 Subject: [PATCH 2/2] rename group_kwargs to multi_jobs_kwargs_list --- main/lib/idds/agents/carrier/plugins/base.py | 4 +- .../test_iworkflow/optimize_iworkflow.py | 6 +- .../test_iworkflow_local_mul.py | 2 +- .../test_iworkflow/test_iworkflow_mul.py | 8 +- monitor/data/conf.js | 12 +-- workflow/bin/run_workflow | 70 ++++++++-------- workflow/lib/idds/iworkflow/asyncresult.py | 16 ++-- workflow/lib/idds/iworkflow/base.py | 16 ++-- workflow/lib/idds/iworkflow/work.py | 84 +++++++++---------- workflow/lib/idds/iworkflow/workflow.py | 26 +++--- 10 files changed, 122 insertions(+), 122 deletions(-) diff --git a/main/lib/idds/agents/carrier/plugins/base.py b/main/lib/idds/agents/carrier/plugins/base.py index 039d6a97..8b11ebef 100644 --- a/main/lib/idds/agents/carrier/plugins/base.py +++ b/main/lib/idds/agents/carrier/plugins/base.py @@ -28,8 +28,8 @@ def get_task_params(self, work): task_name = work.name in_files = [] - group_parameters = work.group_parameters - for p in group_parameters: + multi_jobs_kwargs_list = work.multi_jobs_kwargs_list + for p in multi_jobs_kwargs_list: p = json.dumps(p) p = encode_base64(p) in_files.append(p) diff --git a/main/lib/idds/tests/test_iworkflow/optimize_iworkflow.py b/main/lib/idds/tests/test_iworkflow/optimize_iworkflow.py index 52e99437..3632638b 100644 --- a/main/lib/idds/tests/test_iworkflow/optimize_iworkflow.py +++ b/main/lib/idds/tests/test_iworkflow/optimize_iworkflow.py @@ -82,16 +82,16 @@ def optimize_workflow(): for i in range(n_iterations): print("Iteration %s" % i) points = {} - group_kwargs = [] + multi_jobs_kwargs_list = [] for j in range(n_points_per_iteration): x_probe = bayesopt.suggest(util) u_id = get_unique_id_for_dict(x_probe) print('x_probe (%s): %s' % (u_id, x_probe)) points[u_id] = {'kwargs': x_probe} - group_kwargs.append(x_probe) + multi_jobs_kwargs_list.append(x_probe) results = optimize_work(opt_params=params, opt_method=opt_method, hist=True, saveModel=False, input_weight=None, - retMethod=opt_method, group_kwargs=group_kwargs) + retMethod=opt_method, multi_jobs_kwargs_list=multi_jobs_kwargs_list) print("points: %s" % str(points)) for u_id in points: diff --git a/main/lib/idds/tests/test_iworkflow/test_iworkflow_local_mul.py b/main/lib/idds/tests/test_iworkflow/test_iworkflow_local_mul.py index 5b3f345c..3631769c 100644 --- a/main/lib/idds/tests/test_iworkflow/test_iworkflow_local_mul.py +++ b/main/lib/idds/tests/test_iworkflow/test_iworkflow_local_mul.py @@ -48,7 +48,7 @@ def test_func1(name): @workflow(service='panda', local=True, cloud='US', queue='BNL_OSG_2') def test_workflow(): print("test workflow starts") - ret = test_func(name='idds', group_kwargs=[{'name': 'idds1'}, {'name': 'idds2'}, {'name': 'idds3'}, {'name': 'idds4'}, {'name': 'idds5'}]) + ret = test_func(name='idds', multi_jobs_kwargs_list=[{'name': 'idds1'}, {'name': 'idds2'}, {'name': 'idds3'}, {'name': 'idds4'}, {'name': 'idds5'}]) print(ret) print("test workflow ends") diff --git a/main/lib/idds/tests/test_iworkflow/test_iworkflow_mul.py b/main/lib/idds/tests/test_iworkflow/test_iworkflow_mul.py index 122728dc..9f12e94b 100644 --- a/main/lib/idds/tests/test_iworkflow/test_iworkflow_mul.py +++ b/main/lib/idds/tests/test_iworkflow/test_iworkflow_mul.py @@ -49,12 +49,12 @@ def test_func1(name, name1=None): def test_workflow(): print("test workflow starts") - group_kwargs = [{'name': 1, 'name1': 2}, - {'name': 3, 'name1': 4}] - ret = test_func(name='idds', group_kwargs=group_kwargs) + multi_jobs_kwargs_list = [{'name': 1, 'name1': 2}, + {'name': 3, 'name1': 4}] + ret = test_func(name='idds', multi_jobs_kwargs_list=multi_jobs_kwargs_list) print(ret) - ret = test_func1(name='idds', group_kwargs=group_kwargs) + ret = test_func1(name='idds', multi_jobs_kwargs_list=multi_jobs_kwargs_list) print(ret) print("test workflow ends") diff --git a/monitor/data/conf.js b/monitor/data/conf.js index dee49b18..dd4ac352 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus998.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus998.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus998.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus926.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus926.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus926.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus926.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus926.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus926.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/bin/run_workflow b/workflow/bin/run_workflow index 1732f3ca..fe5ece8f 100644 --- a/workflow/bin/run_workflow +++ b/workflow/bin/run_workflow @@ -36,50 +36,50 @@ from idds.iworkflow.work import Work setup_logging(__name__, stream=sys.stdout) -def get_context_args(context, original_args, update_args): - func_name, args, kwargs, group_kwargs = None, None, None, None +def get_context_args(context, original_args, current_job_kwargs): + func_name, args, kwargs, multi_jobs_kwargs_list = None, None, None, None if original_args: original_args = json_loads(original_args) - func_name, args, kwargs, group_kwargs = original_args + func_name, args, kwargs, multi_jobs_kwargs_list = original_args if args: args = pickle.loads(base64.b64decode(args)) if kwargs: kwargs = pickle.loads(base64.b64decode(kwargs)) - if group_kwargs: - group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs] + if multi_jobs_kwargs_list: + multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] - if update_args: - if update_args == "${IN/L}": - logging.info("update_args == original ${IN/L}, is not set") + if current_job_kwargs: + if current_job_kwargs == "${IN/L}": + logging.info("current_job_kwargs == original ${IN/L}, is not set") else: try: - update_args = json_loads(update_args) + current_job_kwargs = json_loads(current_job_kwargs) - if update_args: - update_args = pickle.loads(base64.b64decode(update_args)) + if current_job_kwargs: + current_job_kwargs = pickle.loads(base64.b64decode(current_job_kwargs)) - # update_kwargs = update_args - # if update_kwargs and isinstance(update_kwargs, dict): - # # kwargs = merge_dict(kwargs, update_kwargs) - # kwargs.update(update_kwargs) + # current_job_kwargs = current_job_kwargs + # if current_job_kwargs and isinstance(current_job_kwargs, dict): + # # kwargs = merge_dict(kwargs, current_job_kwargs) + # kwargs.update(current_job_kwargs) except Exception as ex: logging.error("Failed to update kwargs: %s" % ex) - return context, func_name, args, kwargs, group_kwargs, update_args + return context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs -def run_workflow(context, original_args, update_args): - context, func_name, args, kwargs, group_kwargs, update_args = get_context_args(context, original_args, update_args) +def run_workflow(context, original_args, current_job_kwargs): + context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) logging.info("context: %s" % context) logging.info("func_name: %s" % func_name) logging.info("args: %s" % str(args)) logging.info("kwargs: %s" % kwargs) - logging.info("group_kwargs: %s" % group_kwargs) + logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list) context.initialize() context.setup_source_files() - workflow = Workflow(func=func_name, args=args, kwargs=kwargs, group_kwargs=group_kwargs, update_kwargs=update_args, context=context) + workflow = Workflow(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) logging.info("workflow: %s" % workflow) with workflow: ret = workflow.run() @@ -87,18 +87,18 @@ def run_workflow(context, original_args, update_args): return 0 -def run_work(context, original_args, update_args): - context, func_name, args, kwargs, group_kwargs, update_args = get_context_args(context, original_args, update_args) +def run_work(context, original_args, current_job_kwargs): + context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) logging.info("context: %s" % context) logging.info("func_name: %s" % func_name) logging.info("args: %s" % str(args)) logging.info("kwargs: %s" % kwargs) - logging.info("group_kwargs: %s" % group_kwargs) + logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list) context.initialize() context.setup_source_files() - work = Work(func=func_name, args=args, kwargs=kwargs, group_kwargs=group_kwargs, update_kwargs=update_args, context=context) + work = Work(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) logging.info("work: %s" % work) ret = work.run() logging.info("run work result: %s" % str(ret)) @@ -119,13 +119,13 @@ def run_iworkflow(args): # orginal_args = str(binascii.unhexlify(args.original_args).decode()) else: original_args = None - if args.update_args: - # logging.info(args.update_args) - # update_args = str(binascii.unhexlify(args.update_args).decode()) - update_args = decode_base64(args.update_args) - logging.info(update_args) + if args.current_job_kwargs: + # logging.info(args.current_job_kwargs) + # current_job_kwargs = str(binascii.unhexlify(args.current_job_kwargs).decode()) + current_job_kwargs = decode_base64(args.current_job_kwargs) + logging.info(current_job_kwargs) else: - update_args = None + current_job_kwargs = None if args.type == 'workflow': logging.info("run workflow") @@ -134,8 +134,8 @@ def run_iworkflow(args): logging.info("context: %s" % json_dumps(context)) context.broker_password = password logging.info("original_args: %s" % original_args) - logging.info("update_args: %s" % update_args) - exit_code = run_workflow(context, original_args, update_args) + logging.info("current_job_kwargs: %s" % current_job_kwargs) + exit_code = run_workflow(context, original_args, current_job_kwargs) logging.info("exit code: %s" % exit_code) else: logging.info("run work") @@ -144,8 +144,8 @@ def run_iworkflow(args): logging.info("context: %s" % json_dumps(context)) context.broker_password = password logging.info("original_args: %s" % original_args) - logging.info("update_args: %s" % update_args) - exit_code = run_work(context, original_args, update_args) + logging.info("current_job_kwargs: %s" % current_job_kwargs) + exit_code = run_work(context, original_args, current_job_kwargs) logging.info("exit code: %s" % exit_code) return exit_code @@ -175,7 +175,7 @@ def get_parser(): oparser.add_argument('--type', dest='type', action='store', choices=['workflow', 'work'], default='workflow', help='The type in [workflow, work]. Default is workflow.') oparser.add_argument('--context', dest='context', help="The context.") oparser.add_argument('--original_args', dest='original_args', help="The original arguments.") - oparser.add_argument('--update_args', dest='update_args', nargs='?', const=None, help="The updated arguments.") + oparser.add_argument('--current_job_kwargs', dest='current_job_kwargs', nargs='?', const=None, help="The current job arguments.") return oparser diff --git a/workflow/lib/idds/iworkflow/asyncresult.py b/workflow/lib/idds/iworkflow/asyncresult.py index a063192c..41eaefd1 100644 --- a/workflow/lib/idds/iworkflow/asyncresult.py +++ b/workflow/lib/idds/iworkflow/asyncresult.py @@ -118,7 +118,7 @@ def get_all_results(self): class AsyncResult(Base): - def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], group_kwargs=[], run_group_kwarg=None, map_results=False, + def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs_kwargs_list=[], current_job_kwargs=None, map_results=False, wait_percent=1, internal_id=None, timeout=None): """ Init a workflow. @@ -145,8 +145,8 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], group_kwar if not self._wait_num: self._wait_num = 1 self._wait_keys = set(wait_keys) - self._group_kwargs = group_kwargs - self._run_group_kwarg = run_group_kwarg + self._multi_jobs_kwargs_list = multi_jobs_kwargs_list + self._current_job_kwargs = current_job_kwargs self._wait_percent = wait_percent self._num_wrong_keys = 0 @@ -168,8 +168,8 @@ def wait_keys(self): if len(self._wait_keys) > 0: self._wait_num = len(self._wait_keys) return self._wait_keys - if self._group_kwargs: - for kwargs in self._group_kwargs: + if self._multi_jobs_kwargs_list: + for kwargs in self._multi_jobs_kwargs_list: k = get_unique_id_for_dict(kwargs) k = "%s:%s" % (self._name, k) self.logger.info("args (%s) to key: %s" % (str(kwargs), k)) @@ -336,10 +336,10 @@ def publish(self, ret, key=None): conn = self.connect_to_messaging_broker() workflow_context = self._work_context if key is None: - if self._run_group_kwarg: - key = get_unique_id_for_dict(self._run_group_kwarg) + if self._current_job_kwargs: + key = get_unique_id_for_dict(self._current_job_kwargs) key = "%s:%s" % (self._name, key) - self.logger.info("publish args (%s) to key: %s" % (str(self._run_group_kwarg), key)) + self.logger.info("publish args (%s) to key: %s" % (str(self._current_job_kwargs), key)) if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: headers = {'persistent': 'true', diff --git a/workflow/lib/idds/iworkflow/base.py b/workflow/lib/idds/iworkflow/base.py index 0bb88b15..9d486f63 100644 --- a/workflow/lib/idds/iworkflow/base.py +++ b/workflow/lib/idds/iworkflow/base.py @@ -54,20 +54,20 @@ def get_func_name_and_args(self, func, args=None, kwargs=None, - group_kwargs=None): + multi_jobs_kwargs_list=None): if args is None: args = () if kwargs is None: kwargs = {} - if group_kwargs is None: - group_kwargs = [] + if multi_jobs_kwargs_list is None: + multi_jobs_kwargs_list = [] if not isinstance(args, (tuple, list)): raise TypeError('{0!r} is not a valid args list'.format(args)) if not isinstance(kwargs, dict): raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs)) - if not isinstance(group_kwargs, list): - raise TypeError('{0!r} is not a valid group_kwargs list'.format(group_kwargs)) + if not isinstance(multi_jobs_kwargs_list, list): + raise TypeError('{0!r} is not a valid multi_jobs_kwargs_list list'.format(multi_jobs_kwargs_list)) func_call, func_name = None, None if isinstance(func, str): @@ -84,10 +84,10 @@ def get_func_name_and_args(self, args = base64.b64encode(pickle.dumps(args)).decode("utf-8") if kwargs: kwargs = base64.b64encode(pickle.dumps(kwargs)).decode("utf-8") - if group_kwargs: - group_kwargs = [base64.b64encode(pickle.dumps(k)).decode("utf-8") for k in group_kwargs] + if multi_jobs_kwargs_list: + multi_jobs_kwargs_list = [base64.b64encode(pickle.dumps(k)).decode("utf-8") for k in multi_jobs_kwargs_list] - return func_call, (func_name, args, kwargs, group_kwargs) + return func_call, (func_name, args, kwargs, multi_jobs_kwargs_list) @property def logger(self): diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 07d8b901..ea044f10 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -336,8 +336,8 @@ def setup(self): class Work(Base): - def __init__(self, func=None, workflow_context=None, context=None, args=None, kwargs=None, group_kwargs=None, - update_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False): + def __init__(self, func=None, workflow_context=None, context=None, args=None, kwargs=None, multi_jobs_kwargs_list=None, + current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False): """ Init a workflow. """ @@ -345,11 +345,11 @@ def __init__(self, func=None, workflow_context=None, context=None, args=None, kw self.prepared = False # self._func = func - self._func, self._func_name_and_args = self.get_func_name_and_args(func, args, kwargs, group_kwargs) + self._func, self._func_name_and_args = self.get_func_name_and_args(func, args, kwargs, multi_jobs_kwargs_list) - self._update_kwargs = update_kwargs - if self._update_kwargs: - self._update_kwargs = base64.b64encode(pickle.dumps(self._update_kwargs)).decode("utf-8") + self._current_job_kwargs = current_job_kwargs + if self._current_job_kwargs: + self._current_job_kwargs = base64.b64encode(pickle.dumps(self._current_job_kwargs)).decode("utf-8") self._name = self._func_name_and_args[0] if self._name: @@ -561,12 +561,12 @@ def token(self, value): self._context.token = value @property - def group_parameters(self): + def multi_jobs_kwargs_list(self): return self._func_name_and_args[3] - @group_parameters.setter - def group_parameters(self, value): - raise Exception("Not allwed to update group parameters") + @multi_jobs_kwargs_list.setter + def multi_jobs_kwargs_list(self, value): + raise Exception("Not allwed to update multi_jobs_kwargs_list") def get_work_tag(self): return self._context.workflow_type.name @@ -723,18 +723,18 @@ def get_func_name(self): func_name = self._func_name_and_args[0] return func_name - def get_group_kwargs(self): - group_kwargs = self._func_name_and_args[3] - group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs] - return group_kwargs + def get_multi_jobs_kwargs_list(self): + multi_jobs_kwargs_list = self._func_name_and_args[3] + multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] + return multi_jobs_kwargs_list def wait_results(self): try: terminated_status = self.get_terminated_status() - group_kwargs = self.get_group_kwargs() - if group_kwargs: - async_ret = AsyncResult(self._context, name=self.get_func_name(), group_kwargs=group_kwargs, + multi_jobs_kwargs_list = self.get_multi_jobs_kwargs_list() + if multi_jobs_kwargs_list: + async_ret = AsyncResult(self._context, name=self.get_func_name(), multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=self.map_results, internal_id=self.internal_id) else: async_ret = AsyncResult(self._context, name=self.get_func_name(), wait_num=1, internal_id=self.internal_id) @@ -812,17 +812,17 @@ def run(self): """ self.pre_run() - func_name, args, kwargs, group_kwargs = self._func_name_and_args - update_kwargs = self._update_kwargs + func_name, args, kwargs, multi_jobs_kwargs_list = self._func_name_and_args + current_job_kwargs = self._current_job_kwargs if args: args = pickle.loads(base64.b64decode(args)) if kwargs: kwargs = pickle.loads(base64.b64decode(kwargs)) - if group_kwargs: - group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs] - if self._update_kwargs: - update_kwargs = pickle.loads(base64.b64decode(update_kwargs)) + if multi_jobs_kwargs_list: + multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] + if self._current_job_kwargs: + current_job_kwargs = pickle.loads(base64.b64decode(current_job_kwargs)) if self._func is None: func = self.load(func_name) @@ -831,25 +831,25 @@ def run(self): if self._context.distributed: rets = None kwargs_copy = kwargs.copy() - if update_kwargs and type(update_kwargs) in [dict]: - kwargs_copy.update(update_kwargs) + if current_job_kwargs and type(current_job_kwargs) in [dict]: + kwargs_copy.update(current_job_kwargs) rets = self.run_func(self._func, args, kwargs_copy) request_id = self._context.request_id transform_id = self._context.transform_id logging.info("publishing AsyncResult to (request_id: %s, transform_id: %s): %s" % (request_id, transform_id, rets)) - async_ret = AsyncResult(self._context, name=self.get_func_name(), internal_id=self.internal_id, run_group_kwarg=update_kwargs) + async_ret = AsyncResult(self._context, name=self.get_func_name(), internal_id=self.internal_id, current_job_kwargs=current_job_kwargs) async_ret.publish(rets) if not self.map_results: self._results = rets else: self._results = MapResult() - self._results.add_result(name=self.get_func_name(), args=update_kwargs, result=rets) + self._results.add_result(name=self.get_func_name(), args=current_job_kwargs, result=rets) return self._results else: - if not group_kwargs: + if not multi_jobs_kwargs_list: rets = self.run_func(self._func, args, kwargs) if not self.map_results: self._results = rets @@ -860,25 +860,25 @@ def run(self): else: if not self.map_results: self._results = [] - for group_kwarg in group_kwargs: + for one_job_kwargs in multi_jobs_kwargs_list: kwargs_copy = kwargs.copy() - kwargs_copy.update(group_kwarg) + kwargs_copy.update(one_job_kwargs) rets = self.run_func(self._func, args, kwargs_copy) self._results.append(rets) else: self._results = MapResult() - for group_kwarg in group_kwargs: + for one_job_kwargs in multi_jobs_kwargs_list: kwargs_copy = kwargs.copy() - kwargs_copy.update(group_kwarg) + kwargs_copy.update(one_job_kwargs) rets = self.run_func(self._func, args, kwargs_copy) - self._results.add_result(name=self.get_func_name(), args=group_kwarg, result=rets) + self._results.add_result(name=self.get_func_name(), args=one_job_kwargs, result=rets) return self._results def get_run_command(self): cmd = "run_workflow --type work " cmd += "--context %s --original_args %s " % (encode_base64(json_dumps(self._context)), encode_base64(json_dumps(self._func_name_and_args))) - cmd += "--update_args ${IN/L}" + cmd += "--current_job_kwargs ${IN/L}" return cmd def get_runner(self): @@ -928,13 +928,13 @@ def wrapper(*args, **kwargs): try: f = kwargs.pop('workflow', None) or WorkflowCanvas.get_current_workflow() workflow_context = f._context - group_kwargs = kwargs.pop('group_kwargs', []) + multi_jobs_kwargs_list = kwargs.pop('multi_jobs_kwargs_list', []) logging.debug("workflow context: %s" % workflow_context) logging.debug("work decorator: func: %s, map_results: %s" % (func, map_results)) if workflow_context: logging.debug("setup work") - w = Work(workflow_context=workflow_context, func=func, args=args, kwargs=kwargs, group_kwargs=group_kwargs, + w = Work(workflow_context=workflow_context, func=func, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=map_results, init_env=init_env) # if distributed: if workflow_context.distributed: @@ -944,26 +944,26 @@ def wrapper(*args, **kwargs): return w.run() else: logging.info("workflow context is not defined, run function locally") - if not group_kwargs: + if not multi_jobs_kwargs_list: return func(*args, **kwargs) if not kwargs: kwargs = {} if not map_results: rets = [] - for group_kwarg in group_kwargs: + for one_job_kwargs in multi_jobs_kwargs_list: kwargs_copy = kwargs.copy() - kwargs_copy.update(group_kwarg) + kwargs_copy.update(one_job_kwargs) ret = func(*args, **kwargs_copy) rets.append(ret) return rets else: rets = MapResult() - for group_kwarg in group_kwargs: + for one_job_kwargs in multi_jobs_kwargs_list: kwargs_copy = kwargs.copy() - kwargs_copy.update(group_kwarg) + kwargs_copy.update(one_job_kwargs) ret = func(*args, **kwargs_copy) - rets.add_result(name=get_func_name(func), args=group_kwarg, result=ret) + rets.add_result(name=get_func_name(func), args=one_job_kwargs, result=ret) return rets except Exception as ex: logging.error("Failed to run workflow %s: %s" % (func, ex)) diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 48a58d3e..cf4eb719 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -677,7 +677,7 @@ def prepare(self): class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, - args=None, kwargs={}, group_kwargs=[], update_kwargs=None, init_env=None, is_unique_func_name=False, + args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, init_env=None, is_unique_func_name=False, max_walltime=24 * 3600): """ Init a workflow. @@ -686,10 +686,10 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo self.prepared = False # self._func = func - self._func, self._func_name_and_args = self.get_func_name_and_args(func, args, kwargs, group_kwargs) - self._update_kwargs = update_kwargs - if self._update_kwargs: - self._update_kwargs = base64.b64encode(pickle.dumps(self._update_kwargs)).decode("utf-8") + self._func, self._func_name_and_args = self.get_func_name_and_args(func, args, kwargs, multi_jobs_kwargs_list) + self._current_job_kwargs = current_job_kwargs + if self._current_job_kwargs: + self._current_job_kwargs = base64.b64encode(pickle.dumps(self._current_job_kwargs)).decode("utf-8") self._name = self._func_name_and_args[0] if self._name: @@ -888,12 +888,12 @@ def get_work_name(self): return self._name @property - def group_parameters(self): + def multi_jobs_kwargs_list(self): return self._func_name_and_args[3] - @group_parameters.setter - def group_parameters(self, value): - raise Exception("Not allwed to update group parameters") + @multi_jobs_kwargs_list.setter + def multi_jobs_kwargs_list(self, value): + raise Exception("Not allwed to update multi_jobs_kwargs_list") def to_dict(self): func = self._func @@ -1100,13 +1100,13 @@ def run(self): if True: self.pre_run() - func_name, args, kwargs, group_kwargs = self._func_name_and_args + func_name, args, kwargs, multi_jobs_kwargs_list = self._func_name_and_args if args: args = pickle.loads(base64.b64decode(args)) if kwargs: kwargs = pickle.loads(base64.b64decode(kwargs)) - if group_kwargs: - group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs] + if multi_jobs_kwargs_list: + multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] if self._func is None: func = self.load(func_name) @@ -1131,7 +1131,7 @@ def get_run_command(self): cmd = "run_workflow --type workflow " cmd += "--context %s --original_args %s " % (encode_base64(json_dumps(self._context)), encode_base64(json_dumps(self._func_name_and_args))) - cmd += "--update_args ${IN/L}" + cmd += "--current_job_kwargs ${IN/L}" return cmd def get_runner(self):