Skip to content

Commit

Permalink
Merge pull request #295 from wguanicedew/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Apr 3, 2024
2 parents e82b143 + 91dc366 commit 1ee443a
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 126 deletions.
4 changes: 2 additions & 2 deletions main/lib/idds/agents/carrier/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def get_task_params(self, work):
task_name = work.name

in_files = []
group_parameters = work.group_parameters
for p in group_parameters:
multi_jobs_kwargs_list = work.multi_jobs_kwargs_list
for p in multi_jobs_kwargs_list:
p = json.dumps(p)
p = encode_base64(p)
in_files.append(p)
Expand Down
6 changes: 3 additions & 3 deletions main/lib/idds/tests/test_iworkflow/optimize_iworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ def optimize_workflow():
for i in range(n_iterations):
print("Iteration %s" % i)
points = {}
group_kwargs = []
multi_jobs_kwargs_list = []
for j in range(n_points_per_iteration):
x_probe = bayesopt.suggest(util)
u_id = get_unique_id_for_dict(x_probe)
print('x_probe (%s): %s' % (u_id, x_probe))
points[u_id] = {'kwargs': x_probe}
group_kwargs.append(x_probe)
multi_jobs_kwargs_list.append(x_probe)

results = optimize_work(opt_params=params, opt_method=opt_method, hist=True, saveModel=False, input_weight=None,
retMethod=opt_method, group_kwargs=group_kwargs)
retMethod=opt_method, multi_jobs_kwargs_list=multi_jobs_kwargs_list)
print("points: %s" % str(points))

for u_id in points:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_func1(name):
@workflow(service='panda', local=True, cloud='US', queue='BNL_OSG_2')
def test_workflow():
print("test workflow starts")
ret = test_func(name='idds', group_kwargs=[{'name': 'idds1'}, {'name': 'idds2'}, {'name': 'idds3'}, {'name': 'idds4'}, {'name': 'idds5'}])
ret = test_func(name='idds', multi_jobs_kwargs_list=[{'name': 'idds1'}, {'name': 'idds2'}, {'name': 'idds3'}, {'name': 'idds4'}, {'name': 'idds5'}])
print(ret)
print("test workflow ends")

Expand Down
8 changes: 4 additions & 4 deletions main/lib/idds/tests/test_iworkflow/test_iworkflow_mul.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ def test_func1(name, name1=None):

def test_workflow():
print("test workflow starts")
group_kwargs = [{'name': 1, 'name1': 2},
{'name': 3, 'name1': 4}]
ret = test_func(name='idds', group_kwargs=group_kwargs)
multi_jobs_kwargs_list = [{'name': 1, 'name1': 2},
{'name': 3, 'name1': 4}]
ret = test_func(name='idds', multi_jobs_kwargs_list=multi_jobs_kwargs_list)
print(ret)

ret = test_func1(name='idds', group_kwargs=group_kwargs)
ret = test_func1(name='idds', multi_jobs_kwargs_list=multi_jobs_kwargs_list)
print(ret)
print("test workflow ends")

Expand Down
12 changes: 6 additions & 6 deletions monitor/data/conf.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

var appConfig = {
'iddsAPI_request': "https://lxplus998.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus998.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus998.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/false/false/true"
'iddsAPI_request': "https://lxplus926.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus926.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus926.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus926.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus926.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus926.cern.ch:443/idds/monitor/null/null/false/false/true"
}
70 changes: 35 additions & 35 deletions workflow/bin/run_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -36,69 +36,69 @@ from idds.iworkflow.work import Work
setup_logging(__name__, stream=sys.stdout)


def get_context_args(context, original_args, update_args):
func_name, args, kwargs, group_kwargs = None, None, None, None
def get_context_args(context, original_args, current_job_kwargs):
func_name, args, kwargs, multi_jobs_kwargs_list = None, None, None, None
if original_args:
original_args = json_loads(original_args)
func_name, args, kwargs, group_kwargs = original_args
func_name, args, kwargs, multi_jobs_kwargs_list = original_args

if args:
args = pickle.loads(base64.b64decode(args))
if kwargs:
kwargs = pickle.loads(base64.b64decode(kwargs))
if group_kwargs:
group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs]
if multi_jobs_kwargs_list:
multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list]

if update_args:
if update_args == "${IN/L}":
logging.info("update_args == original ${IN/L}, is not set")
if current_job_kwargs:
if current_job_kwargs == "${IN/L}":
logging.info("current_job_kwargs == original ${IN/L}, is not set")
else:
try:
update_args = json_loads(update_args)
current_job_kwargs = json_loads(current_job_kwargs)

if update_args:
update_args = pickle.loads(base64.b64decode(update_args))
if current_job_kwargs:
current_job_kwargs = pickle.loads(base64.b64decode(current_job_kwargs))

# update_kwargs = update_args
# if update_kwargs and isinstance(update_kwargs, dict):
# # kwargs = merge_dict(kwargs, update_kwargs)
# kwargs.update(update_kwargs)
# current_job_kwargs = current_job_kwargs
# if current_job_kwargs and isinstance(current_job_kwargs, dict):
# # kwargs = merge_dict(kwargs, current_job_kwargs)
# kwargs.update(current_job_kwargs)
except Exception as ex:
logging.error("Failed to update kwargs: %s" % ex)
return context, func_name, args, kwargs, group_kwargs, update_args
return context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs


def run_workflow(context, original_args, update_args):
context, func_name, args, kwargs, group_kwargs, update_args = get_context_args(context, original_args, update_args)
def run_workflow(context, original_args, current_job_kwargs):
context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
logging.info("context: %s" % context)
logging.info("func_name: %s" % func_name)
logging.info("args: %s" % str(args))
logging.info("kwargs: %s" % kwargs)
logging.info("group_kwargs: %s" % group_kwargs)
logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list)

context.initialize()
context.setup_source_files()

workflow = Workflow(func=func_name, args=args, kwargs=kwargs, group_kwargs=group_kwargs, update_kwargs=update_args, context=context)
workflow = Workflow(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context)
logging.info("workflow: %s" % workflow)
with workflow:
ret = workflow.run()
logging.info("run workflow result: %s" % str(ret))
return 0


def run_work(context, original_args, update_args):
context, func_name, args, kwargs, group_kwargs, update_args = get_context_args(context, original_args, update_args)
def run_work(context, original_args, current_job_kwargs):
context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
logging.info("context: %s" % context)
logging.info("func_name: %s" % func_name)
logging.info("args: %s" % str(args))
logging.info("kwargs: %s" % kwargs)
logging.info("group_kwargs: %s" % group_kwargs)
logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list)

context.initialize()
context.setup_source_files()

work = Work(func=func_name, args=args, kwargs=kwargs, group_kwargs=group_kwargs, update_kwargs=update_args, context=context)
work = Work(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context)
logging.info("work: %s" % work)
ret = work.run()
logging.info("run work result: %s" % str(ret))
Expand All @@ -119,13 +119,13 @@ def run_iworkflow(args):
# orginal_args = str(binascii.unhexlify(args.original_args).decode())
else:
original_args = None
if args.update_args:
# logging.info(args.update_args)
# update_args = str(binascii.unhexlify(args.update_args).decode())
update_args = decode_base64(args.update_args)
logging.info(update_args)
if args.current_job_kwargs:
# logging.info(args.current_job_kwargs)
# current_job_kwargs = str(binascii.unhexlify(args.current_job_kwargs).decode())
current_job_kwargs = decode_base64(args.current_job_kwargs)
logging.info(current_job_kwargs)
else:
update_args = None
current_job_kwargs = None

if args.type == 'workflow':
logging.info("run workflow")
Expand All @@ -134,8 +134,8 @@ def run_iworkflow(args):
logging.info("context: %s" % json_dumps(context))
context.broker_password = password
logging.info("original_args: %s" % original_args)
logging.info("update_args: %s" % update_args)
exit_code = run_workflow(context, original_args, update_args)
logging.info("current_job_kwargs: %s" % current_job_kwargs)
exit_code = run_workflow(context, original_args, current_job_kwargs)
logging.info("exit code: %s" % exit_code)
else:
logging.info("run work")
Expand All @@ -144,8 +144,8 @@ def run_iworkflow(args):
logging.info("context: %s" % json_dumps(context))
context.broker_password = password
logging.info("original_args: %s" % original_args)
logging.info("update_args: %s" % update_args)
exit_code = run_work(context, original_args, update_args)
logging.info("current_job_kwargs: %s" % current_job_kwargs)
exit_code = run_work(context, original_args, current_job_kwargs)
logging.info("exit code: %s" % exit_code)
return exit_code

Expand Down Expand Up @@ -175,7 +175,7 @@ def get_parser():
oparser.add_argument('--type', dest='type', action='store', choices=['workflow', 'work'], default='workflow', help='The type in [workflow, work]. Default is workflow.')
oparser.add_argument('--context', dest='context', help="The context.")
oparser.add_argument('--original_args', dest='original_args', help="The original arguments.")
oparser.add_argument('--update_args', dest='update_args', nargs='?', const=None, help="The updated arguments.")
oparser.add_argument('--current_job_kwargs', dest='current_job_kwargs', nargs='?', const=None, help="The current job arguments.")
return oparser


Expand Down
16 changes: 8 additions & 8 deletions workflow/lib/idds/iworkflow/asyncresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def get_all_results(self):

class AsyncResult(Base):

def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], group_kwargs=[], run_group_kwarg=None, map_results=False,
def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], multi_jobs_kwargs_list=[], current_job_kwargs=None, map_results=False,
wait_percent=1, internal_id=None, timeout=None):
"""
Init a workflow.
Expand All @@ -145,8 +145,8 @@ def __init__(self, work_context, name=None, wait_num=1, wait_keys=[], group_kwar
if not self._wait_num:
self._wait_num = 1
self._wait_keys = set(wait_keys)
self._group_kwargs = group_kwargs
self._run_group_kwarg = run_group_kwarg
self._multi_jobs_kwargs_list = multi_jobs_kwargs_list
self._current_job_kwargs = current_job_kwargs

self._wait_percent = wait_percent
self._num_wrong_keys = 0
Expand All @@ -168,8 +168,8 @@ def wait_keys(self):
if len(self._wait_keys) > 0:
self._wait_num = len(self._wait_keys)
return self._wait_keys
if self._group_kwargs:
for kwargs in self._group_kwargs:
if self._multi_jobs_kwargs_list:
for kwargs in self._multi_jobs_kwargs_list:
k = get_unique_id_for_dict(kwargs)
k = "%s:%s" % (self._name, k)
self.logger.info("args (%s) to key: %s" % (str(kwargs), k))
Expand Down Expand Up @@ -336,10 +336,10 @@ def publish(self, ret, key=None):
conn = self.connect_to_messaging_broker()
workflow_context = self._work_context
if key is None:
if self._run_group_kwarg:
key = get_unique_id_for_dict(self._run_group_kwarg)
if self._current_job_kwargs:
key = get_unique_id_for_dict(self._current_job_kwargs)
key = "%s:%s" % (self._name, key)
self.logger.info("publish args (%s) to key: %s" % (str(self._run_group_kwarg), key))
self.logger.info("publish args (%s) to key: %s" % (str(self._current_job_kwargs), key))

if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
headers = {'persistent': 'true',
Expand Down
16 changes: 8 additions & 8 deletions workflow/lib/idds/iworkflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ def get_func_name_and_args(self,
func,
args=None,
kwargs=None,
group_kwargs=None):
multi_jobs_kwargs_list=None):

if args is None:
args = ()
if kwargs is None:
kwargs = {}
if group_kwargs is None:
group_kwargs = []
if multi_jobs_kwargs_list is None:
multi_jobs_kwargs_list = []
if not isinstance(args, (tuple, list)):
raise TypeError('{0!r} is not a valid args list'.format(args))
if not isinstance(kwargs, dict):
raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs))
if not isinstance(group_kwargs, list):
raise TypeError('{0!r} is not a valid group_kwargs list'.format(group_kwargs))
if not isinstance(multi_jobs_kwargs_list, list):
raise TypeError('{0!r} is not a valid multi_jobs_kwargs_list list'.format(multi_jobs_kwargs_list))

func_call, func_name = None, None
if isinstance(func, str):
Expand All @@ -84,10 +84,10 @@ def get_func_name_and_args(self,
args = base64.b64encode(pickle.dumps(args)).decode("utf-8")
if kwargs:
kwargs = base64.b64encode(pickle.dumps(kwargs)).decode("utf-8")
if group_kwargs:
group_kwargs = [base64.b64encode(pickle.dumps(k)).decode("utf-8") for k in group_kwargs]
if multi_jobs_kwargs_list:
multi_jobs_kwargs_list = [base64.b64encode(pickle.dumps(k)).decode("utf-8") for k in multi_jobs_kwargs_list]

return func_call, (func_name, args, kwargs, group_kwargs)
return func_call, (func_name, args, kwargs, multi_jobs_kwargs_list)

@property
def logger(self):
Expand Down
Loading

0 comments on commit 1ee443a

Please sign in to comment.