diff --git a/main/lib/idds/tests/test_iworkflow/test_iworkflow_local.py b/main/lib/idds/tests/test_iworkflow/test_iworkflow_local.py new file mode 100644 index 00000000..b03c8684 --- /dev/null +++ b/main/lib/idds/tests/test_iworkflow/test_iworkflow_local.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +Test workflow. +""" + +import inspect # noqa F401 +import logging +import os # noqa F401 +import shutil # noqa F401 +import sys # noqa F401 + +# from nose.tools import assert_equal +from idds.common.utils import setup_logging, run_process, json_dumps, json_loads, create_archive_file # noqa F401 + +from idds.iworkflow.workflow import Workflow, workflow # workflow # noqa F401 +from idds.iworkflow.work import work + + +setup_logging(__name__) + + +@work +def test_func(name): + print('test_func starts') + print(name) + print('test_func ends') + return 'test result: %s' % name + + +def test_func1(name): + print('test_func1 starts') + print(name) + print('test_func1 ends') + return 'test result: %s' % name + + +# @workflow(service='idds', local=True, cloud='US', queue='FUNCX_TEST') # queue = 'BNL_OSG_2' +@workflow(service='panda', local=True, cloud='US', queue='BNL_OSG_2') +def test_workflow(): + print("test workflow starts") + ret = test_func(name='idds') + print(ret) + print("test workflow ends") + + +@work +def get_params(): + list_params = [i for i in range(10)] + return list_params + + +def test_workflow_mulitple_work(): + print("test workflow multiple work starts") + list_params = get_params() + + ret = test_func(list_params) + print(ret) + print("test workflow multiple work ends") + + +def submit_workflow(wf): + req_id = wf.submit() + print("req id: %s" % req_id) + + +def run_workflow_wrapper(wf): + cmd = wf.get_runner() + logging.info(f'To run workflow: {cmd}') + + exit_code = run_process(cmd, wait=True) + logging.info(f'Run workflow finished with exit code: {exit_code}') + return exit_code + + +def run_workflow_remote_wrapper(wf): + cmd = wf.get_runner() + logging.info('To run workflow: %s' % cmd) + + work_dir = '/tmp/idds' + shutil.rmtree(work_dir) + os.makedirs(work_dir) + os.chdir(work_dir) + logging.info("current dir: %s" % os.getcwd()) + + # print(dir(wf)) + # print(inspect.getmodule(wf)) + # print(inspect.getfile(wf)) + setup = wf.setup_source_files() + logging.info("setup: %s" % setup) + + exc_cmd = 'cd %s' % work_dir + exc_cmd += "; wget https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper" + exc_cmd += "; chmod +x run_workflow_wrapper; bash run_workflow_wrapper %s" % cmd + logging.info("exc_cmd: %s" % exc_cmd) + exit_code = run_process(exc_cmd, wait=True) + logging.info(f'Run workflow finished with exit code: {exit_code}') + return exit_code + + +def test_create_archive_file(wf): + archive_name = wf._context.get_archive_name() + source_dir = wf._context._source_dir + logging.info("archive_name :%s, source dir: %s" % (archive_name, source_dir)) + archive_file = create_archive_file('/tmp', archive_name, [source_dir]) + logging.info("created archive file: %s" % archive_file) + + +if __name__ == '__main__': + logging.info("start") + os.chdir(os.path.dirname(os.path.realpath(__file__))) + ret = test_workflow() + print('result: %s' % str(ret)) diff --git a/main/lib/idds/tests/test_iworkflow/test_iworkflow_local_mul.py b/main/lib/idds/tests/test_iworkflow/test_iworkflow_local_mul.py new file mode 100644 index 00000000..5b3f345c --- /dev/null +++ b/main/lib/idds/tests/test_iworkflow/test_iworkflow_local_mul.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +Test workflow. +""" + +import inspect # noqa F401 +import logging +import os # noqa F401 +import shutil # noqa F401 +import sys # noqa F401 + +# from nose.tools import assert_equal +from idds.common.utils import setup_logging, run_process, json_dumps, json_loads, create_archive_file # noqa F401 + +from idds.iworkflow.workflow import Workflow, workflow # workflow # noqa F401 +from idds.iworkflow.work import work + + +setup_logging(__name__) + + +@work +def test_func(name): + print('test_func starts') + print(name) + print('test_func ends') + return 'test result: %s' % name + + +def test_func1(name): + print('test_func1 starts') + print(name) + print('test_func1 ends') + return 'test result: %s' % name + + +# @workflow(service='idds', local=True, cloud='US', queue='FUNCX_TEST') # queue = 'BNL_OSG_2' +@workflow(service='panda', local=True, cloud='US', queue='BNL_OSG_2') +def test_workflow(): + print("test workflow starts") + ret = test_func(name='idds', group_kwargs=[{'name': 'idds1'}, {'name': 'idds2'}, {'name': 'idds3'}, {'name': 'idds4'}, {'name': 'idds5'}]) + print(ret) + print("test workflow ends") + + +@work +def get_params(): + list_params = [i for i in range(10)] + return list_params + + +def test_workflow_mulitple_work(): + print("test workflow multiple work starts") + list_params = get_params() + + ret = test_func(list_params) + print(ret) + print("test workflow multiple work ends") + + +def submit_workflow(wf): + req_id = wf.submit() + print("req id: %s" % req_id) + + +def run_workflow_wrapper(wf): + cmd = wf.get_runner() + logging.info(f'To run workflow: {cmd}') + + exit_code = run_process(cmd, wait=True) + logging.info(f'Run workflow finished with exit code: {exit_code}') + return exit_code + + +def run_workflow_remote_wrapper(wf): + cmd = wf.get_runner() + logging.info('To run workflow: %s' % cmd) + + work_dir = '/tmp/idds' + shutil.rmtree(work_dir) + os.makedirs(work_dir) + os.chdir(work_dir) + logging.info("current dir: %s" % os.getcwd()) + + # print(dir(wf)) + # print(inspect.getmodule(wf)) + # print(inspect.getfile(wf)) + setup = wf.setup_source_files() + logging.info("setup: %s" % setup) + + exc_cmd = 'cd %s' % work_dir + exc_cmd += "; wget https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper" + exc_cmd += "; chmod +x run_workflow_wrapper; bash run_workflow_wrapper %s" % cmd + logging.info("exc_cmd: %s" % exc_cmd) + exit_code = run_process(exc_cmd, wait=True) + logging.info(f'Run workflow finished with exit code: {exit_code}') + return exit_code + + +def test_create_archive_file(wf): + archive_name = wf._context.get_archive_name() + source_dir = wf._context._source_dir + logging.info("archive_name :%s, source dir: %s" % (archive_name, source_dir)) + archive_file = create_archive_file('/tmp', archive_name, [source_dir]) + logging.info("created archive file: %s" % archive_file) + + +if __name__ == '__main__': + logging.info("start") + os.chdir(os.path.dirname(os.path.realpath(__file__))) + ret = test_workflow() + print('result: %s' % str(ret)) diff --git a/main/tools/container/singularity/commands b/main/tools/container/singularity/commands index 46abe6b9..cb522dbe 100644 --- a/main/tools/container/singularity/commands +++ b/main/tools/container/singularity/commands @@ -1,4 +1,5 @@ +# aipanda180 # cp -r /eos/user/w/wguan/idds_ml/singularity/* /opt/singularity cd /opt/singularity/ diff --git a/main/tools/container/singularity/idds_ml_ax_al9.def b/main/tools/container/singularity/idds_ml_ax_al9.def new file mode 100644 index 00000000..a420dba5 --- /dev/null +++ b/main/tools/container/singularity/idds_ml_ax_al9.def @@ -0,0 +1,32 @@ +Bootstrap: docker +From: almalinux:9.2 + +%post + # yum update -q -y + yum install -q -y wget make git gcc openssl-devel bzip2-devel libffi-devel which pip + + ln -s /usr/bin/python3 /usr/bin/python + + pip install --upgrade pip + pip install nevergrad + pip install theano keras h5py matplotlib tabulate + pip install bayesian-optimization + pip install xgboost + pip install lightgbm + pip install ax-platform + + pip install torch pandas numpy matplotlib wandb botorch + + # clean the cache + rm -fr /root/.cache + +%environment + # export LC_ALL=C + # export PATH=/usr/games:$PATH + +%labels + Maintainer iDDS_HPO_Nevergrad(wen.guan@cern.ch) + Version v1.0 + +%runscript + echo "iDDS ML hyper parameter optimization plugin" diff --git a/main/tools/container/singularity/ml_command b/main/tools/container/singularity/ml_command new file mode 100644 index 00000000..b27baa56 --- /dev/null +++ b/main/tools/container/singularity/ml_command @@ -0,0 +1,62 @@ +[root@aipanda180 singularity]# apptainer remote add --no-login SylabsCloud cloud.sylabs.io +INFO: Remote "SylabsCloud" added. + +[root@aipanda180 singularity]# apptainer remote login +Generate an access token at https://cloud.sylabs.io/auth/tokens, and paste it here. +Token entered will be hidden for security. +Access Token: +INFO: Access Token Verified! +INFO: Token stored in /root/.apptainer/remote.yaml + + +[root@aipanda180 singularity]# singularity keys newpair +Enter your name (e.g., John Doe) : Wen Guan +Enter your email address (e.g., john.doe@example.com) : wguan.icedew@gmail.com +Enter optional comment (e.g., development keys) : dev +Enter a passphrase : +Retype your passphrase : +WARNING: passphrases do not match +Enter a passphrase : +Retype your passphrase : +Generating Entity and OpenPGP Key Pair... done +[root@aipanda180 singularity]# singularity keys list +Public key listing (/root/.apptainer/keys/pgp-public): + +0) User: Wen Guan (dev) + Creation time: 2024-04-02 11:47:04 +0200 CEST + Fingerprint: 11B3BCA23474A37E2BB72F8EAD61E4FD656ABA65 + Length (in bits): 4096 + + +[root@aipanda180 singularity]# singularity keys push 11B3BCA23474A37E2BB72F8EAD61E4FD656ABA65 +public key `11B3BCA23474A37E2BB72F8EAD61E4FD656ABA65' pushed to server successfully +[root@aipanda180 singularity]# ls /root/.apptainer/ +cache keys remote-cache remote.yaml +[root@aipanda180 singularity]# ls /root/.apptainer/keys/ +pgp-public pgp-secret + + +[root@aipanda180 singularity]# singularity sign idds_ml_ax_al9.simg +INFO: Signing image with PGP key material +Enter key passphrase : +INFO: Signature created and applied to image 'idds_ml_ax_al9.simg' +[root@aipanda180 singularity]# singularity verify idds_ml_ax_al9.simg +INFO: Verifying image with PGP key material +[LOCAL] Signing entity: Wen Guan (dev) +[LOCAL] Fingerprint: 11B3BCA23474A37E2BB72F8EAD61E4FD656ABA65 +Objects verified: +ID |GROUP |LINK |TYPE +------------------------------------------------ +1 |1 |NONE |Def.FILE +2 |1 |NONE |JSON.Generic +3 |1 |NONE |FS +INFO: Verified signature(s) from image 'idds_ml_ax_al9.simg' + + +[root@aipanda180 singularity]# cp /root/.apptainer/ +cache/ keys/ remote-cache/ remote.yaml +[root@aipanda180 singularity]# cp /root/.apptainer/remote.yaml /afs/cern.ch/user/w/wguan/private/apptainer/ +[root@aipanda180 singularity]# cp -r /root/.apptainer/keys /afs/cern.ch/user/w/wguan/private/apptainer/ + + +[root@aipanda180 singularity]# singularity push idds_ml_ax_al9.simg library://wguanicedew/ml/idds_ml_ax_al9.sif:latest diff --git a/main/tools/env/setup_panda.sh b/main/tools/env/setup_panda.sh index 6c18c1e0..a379dbea 100644 --- a/main/tools/env/setup_panda.sh +++ b/main/tools/env/setup_panda.sh @@ -71,12 +71,12 @@ else export PANDA_CONFIG_ROOT=~/.panda/ # export IDDS_HOST=https://aipanda015.cern.ch:443/idds - + # dev - export IDDS_HOST=https://aipanda104.cern.ch:443/idds + # export IDDS_HOST=https://aipanda104.cern.ch:443/idds # doma - # export IDDS_HOST=https://aipanda105.cern.ch:443/idds + export IDDS_HOST=https://aipanda105.cern.ch:443/idds # export IDDS_BROKERS=atlas-test-mb.cern.ch:61013 # export IDDS_BROKER_DESTINATION=/topic/doma.idds diff --git a/monitor/data/conf.js b/monitor/data/conf.js index b10e8434..dee49b18 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus964.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus964.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus964.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus964.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus964.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus964.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus998.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus998.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus998.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus998.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/bin/run_workflow b/workflow/bin/run_workflow index 08e8538f..1732f3ca 100644 --- a/workflow/bin/run_workflow +++ b/workflow/bin/run_workflow @@ -17,8 +17,10 @@ from __future__ import print_function import argparse import argcomplete +import base64 # import json import logging +import pickle import os import sys import time @@ -39,16 +41,28 @@ def get_context_args(context, original_args, update_args): if original_args: original_args = json_loads(original_args) func_name, args, kwargs, group_kwargs = original_args + + if args: + args = pickle.loads(base64.b64decode(args)) + if kwargs: + kwargs = pickle.loads(base64.b64decode(kwargs)) + if group_kwargs: + group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs] + if update_args: if update_args == "${IN/L}": logging.info("update_args == original ${IN/L}, is not set") else: try: update_args = json_loads(update_args) - update_kwargs = update_args - if update_kwargs and isinstance(update_kwargs, dict): - # kwargs = merge_dict(kwargs, update_kwargs) - kwargs.update(update_kwargs) + + if update_args: + update_args = pickle.loads(base64.b64decode(update_args)) + + # update_kwargs = update_args + # if update_kwargs and isinstance(update_kwargs, dict): + # # kwargs = merge_dict(kwargs, update_kwargs) + # kwargs.update(update_kwargs) except Exception as ex: logging.error("Failed to update kwargs: %s" % ex) return context, func_name, args, kwargs, group_kwargs, update_args diff --git a/workflow/lib/idds/iworkflow/base.py b/workflow/lib/idds/iworkflow/base.py index f42255be..0bb88b15 100644 --- a/workflow/lib/idds/iworkflow/base.py +++ b/workflow/lib/idds/iworkflow/base.py @@ -8,9 +8,11 @@ # Authors: # - Wen Guan, , 2024 +import base64 import logging import inspect import os +import pickle import traceback import uuid @@ -77,6 +79,14 @@ def get_func_name_and_args(self, else: # raise TypeError('Expected a callable or a string, but got: {0}'.format(func)) func_name = func + + if args: + args = base64.b64encode(pickle.dumps(args)).decode("utf-8") + if kwargs: + kwargs = base64.b64encode(pickle.dumps(kwargs)).decode("utf-8") + if group_kwargs: + group_kwargs = [base64.b64encode(pickle.dumps(k)).decode("utf-8") for k in group_kwargs] + return func_call, (func_name, args, kwargs, group_kwargs) @property @@ -123,6 +133,19 @@ def submit(self): self.prepare() return None + def split_setup(self, setup): + """ + Split setup string + """ + if ";" not in setup: + return "", setup + + setup_list = setup.split(";") + main_setup = setup_list[-1] + pre_setup = "; ".join(setup_list[:-1]) + pre_setup = pre_setup + "; " + return pre_setup, main_setup + def setup(self): """ :returns command: `str` to setup the workflow. diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 47c73dda..07d8b901 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -8,10 +8,13 @@ # Authors: # - Wen Guan, , 2023 - 2024 +import base64 import datetime import functools +import json import logging import os +import pickle import time import traceback @@ -293,6 +296,8 @@ def init_env(self): @init_env.setter def init_env(self, value): self._init_env = value + if self._init_env: + self._init_env = self._init_env + " " def get_idds_server(self): return self._workflow_context.get_idds_server() @@ -343,6 +348,8 @@ def __init__(self, func=None, workflow_context=None, context=None, args=None, kw self._func, self._func_name_and_args = self.get_func_name_and_args(func, args, kwargs, group_kwargs) self._update_kwargs = update_kwargs + if self._update_kwargs: + self._update_kwargs = base64.b64encode(pickle.dumps(self._update_kwargs)).decode("utf-8") self._name = self._func_name_and_args[0] if self._name: @@ -659,7 +666,14 @@ def get_status_from_panda_server(self): ret = client.get_transform(request_id=request_id, transform_id=transform_id) if ret[0] == 0 and ret[1][0]: - tf = json_loads(ret[1][1]) + tf = ret[1][1] + if type(tf) in [dict]: + tf = json_loads(json.dumps(tf)) + elif type(tf) in [str]: + try: + tf = json_loads(tf) + except Exception as ex: + logging.warn("Failed to json loads transform(%s): %s" % (tf, ex)) else: tf = None logging.error("Failed to get transform (request_id: %s, transform_id: %s) status from PanDA-iDDS: %s" % (request_id, transform_id, ret)) @@ -711,6 +725,7 @@ def get_func_name(self): def get_group_kwargs(self): group_kwargs = self._func_name_and_args[3] + group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs] return group_kwargs def wait_results(self): @@ -798,6 +813,17 @@ def run(self): self.pre_run() func_name, args, kwargs, group_kwargs = self._func_name_and_args + update_kwargs = self._update_kwargs + + if args: + args = pickle.loads(base64.b64decode(args)) + if kwargs: + kwargs = pickle.loads(base64.b64decode(kwargs)) + if group_kwargs: + group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs] + if self._update_kwargs: + update_kwargs = pickle.loads(base64.b64decode(update_kwargs)) + if self._func is None: func = self.load(func_name) self._func = func @@ -805,22 +831,22 @@ def run(self): if self._context.distributed: rets = None kwargs_copy = kwargs.copy() - if self._update_kwargs and type(self._update_kwargs) in [dict]: - kwargs_copy.update(self._update_kwargs) + if update_kwargs and type(update_kwargs) in [dict]: + kwargs_copy.update(update_kwargs) rets = self.run_func(self._func, args, kwargs_copy) request_id = self._context.request_id transform_id = self._context.transform_id logging.info("publishing AsyncResult to (request_id: %s, transform_id: %s): %s" % (request_id, transform_id, rets)) - async_ret = AsyncResult(self._context, name=self.get_func_name(), internal_id=self.internal_id, run_group_kwarg=self._update_kwargs) + async_ret = AsyncResult(self._context, name=self.get_func_name(), internal_id=self.internal_id, run_group_kwarg=update_kwargs) async_ret.publish(rets) if not self.map_results: self._results = rets else: self._results = MapResult() - self._results.add_result(name=self.get_func_name(), args=self._update_kwargs, result=rets) + self._results.add_result(name=self.get_func_name(), args=update_kwargs, result=rets) return self._results else: if not group_kwargs: @@ -829,7 +855,7 @@ def run(self): self._results = rets else: self._results = MapResult() - self._results.add_result(name=self.get_func_name(), args=self._update_kwargs, result=rets) + self._results.add_result(name=self.get_func_name(), args=kwargs, result=rets) return self._results else: if not self.map_results: @@ -861,7 +887,10 @@ def get_runner(self): run_command = self.get_run_command() if setup: - cmd = ' --setup "' + setup + '" ' + pre_setup, main_setup = self.split_setup(setup) + if pre_setup: + cmd = ' --pre_setup "' + pre_setup + '" ' + cmd = cmd + ' --setup "' + main_setup + '" ' if cmd: cmd = cmd + " " + run_command else: @@ -887,9 +916,9 @@ def run_work_distributed(w): # foo = work(arg)(foo) -def work(func=None, *, map_results=False, lazy=False): +def work(func=None, *, map_results=False, lazy=False, init_env=None): if func is None: - return functools.partial(work, map_results=map_results, lazy=lazy) + return functools.partial(work, map_results=map_results, lazy=lazy, init_env=init_env) if 'IDDS_IWORKFLOW_LOAD_WORK' in os.environ: return func @@ -905,7 +934,8 @@ def wrapper(*args, **kwargs): logging.debug("work decorator: func: %s, map_results: %s" % (func, map_results)) if workflow_context: logging.debug("setup work") - w = Work(workflow_context=workflow_context, func=func, args=args, kwargs=kwargs, group_kwargs=group_kwargs, map_results=map_results) + w = Work(workflow_context=workflow_context, func=func, args=args, kwargs=kwargs, group_kwargs=group_kwargs, + map_results=map_results, init_env=init_env) # if distributed: if workflow_context.distributed: ret = run_work_distributed(w) diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 93abda20..48a58d3e 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -8,12 +8,14 @@ # Authors: # - Wen Guan, , 2023 - 2024 +import base64 import collections import datetime import functools import logging import inspect import os +import pickle import tarfile import uuid @@ -21,7 +23,7 @@ # from idds.common import exceptions from idds.common.constants import WorkflowType -from idds.common.utils import setup_logging, create_archive_file, json_dumps, encode_base64 +from idds.common.utils import setup_logging, create_archive_file, json_dumps, json_loads, encode_base64 from .asyncresult import AsyncResult from .base import Base, Context @@ -134,6 +136,8 @@ def init_env(self): @init_env.setter def init_env(self, value): self._init_env = value + if self._init_env: + self._init_env = self._init_env + " " @property def vo(self): @@ -376,6 +380,13 @@ def get_broker_info_from_panda_server(self): ret = client.get_metainfo(name='asyncresult_config') if ret[0] == 0 and ret[1][0]: meta_info = ret[1][1] + if type(meta_info) in [dict]: + pass + elif type(meta_info) in [str]: + try: + meta_info = json_loads(meta_info) + except Exception as ex: + logging.warn("Failed to json loads meta info(%s): %s" % (meta_info, ex)) else: meta_info = None logging.error("Failed to get meta info: %s" % str(ret)) @@ -677,6 +688,8 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo # self._func = func self._func, self._func_name_and_args = self.get_func_name_and_args(func, args, kwargs, group_kwargs) self._update_kwargs = update_kwargs + if self._update_kwargs: + self._update_kwargs = base64.b64encode(pickle.dumps(self._update_kwargs)).decode("utf-8") self._name = self._func_name_and_args[0] if self._name: @@ -1088,6 +1101,13 @@ def run(self): self.pre_run() func_name, args, kwargs, group_kwargs = self._func_name_and_args + if args: + args = pickle.loads(base64.b64decode(args)) + if kwargs: + kwargs = pickle.loads(base64.b64decode(kwargs)) + if group_kwargs: + group_kwargs = [pickle.loads(base64.b64decode(k)) for k in group_kwargs] + if self._func is None: func = self.load(func_name) self._func = func @@ -1120,7 +1140,10 @@ def get_runner(self): run_command = self.get_run_command() if setup: - cmd = ' --setup "' + setup + '" ' + pre_setup, main_setup = self.split_setup(setup) + if pre_setup: + cmd = ' --pre_setup "' + pre_setup + '" ' + cmd = cmd + ' --setup "' + main_setup + '" ' if cmd: cmd = cmd + " " + run_command else: @@ -1133,10 +1156,10 @@ def get_func_name(self): # foo = workflow(arg)(foo) -def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True): +def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True, init_env=None): if func is None: return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, - max_walltime=max_walltime, distributed=distributed) + max_walltime=max_walltime, distributed=distributed, init_env=init_env) if 'IDDS_IWORKFLOW_LOAD_WORKFLOW' in os.environ: return func @@ -1144,7 +1167,8 @@ def workflow(func=None, *, local=False, service='idds', source_dir=None, primary @functools.wraps(func) def wrapper(*args, **kwargs): try: - f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed) + f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed, + args=args, kwargs=kwargs, init_env=init_env) f.queue = queue f.site = site f.cloud = cloud diff --git a/workflow/tools/env/environment.yml b/workflow/tools/env/environment.yml index 78877365..b59b2e54 100644 --- a/workflow/tools/env/environment.yml +++ b/workflow/tools/env/environment.yml @@ -5,4 +5,5 @@ dependencies: - pip: - anytree - networkx - - idds-common==2.0.9 \ No newline at end of file + - stomp.py==8.0.1 + - idds-common==2.0.9 diff --git a/workflow/tools/make/environment.yaml b/workflow/tools/make/environment.yaml index 7c5ce80f..f1c8273a 100644 --- a/workflow/tools/make/environment.yaml +++ b/workflow/tools/make/environment.yaml @@ -11,7 +11,7 @@ dependencies: - packaging - anytree - networkx - - stomp.py + - stomp.py==8.0.1 - panda-client - cffi - charset_normalizer diff --git a/workflow/tools/make/make.sh b/workflow/tools/make/make.sh index 60ce6ec4..fda58e12 100644 --- a/workflow/tools/make/make.sh +++ b/workflow/tools/make/make.sh @@ -25,7 +25,7 @@ source $workdir/bin/activate echo "install panda client" pip install panda-client -pip install tabulate requests urllib3==1.26.18 argcomplete packaging anytree networkx stomp.py +pip install tabulate requests urllib3==1.26.18 argcomplete packaging anytree networkx stomp.py==8.0.1 echo "install idds-common" python ${RootDir}/common/setup.py clean --all diff --git a/workflow/tools/make/zipheader b/workflow/tools/make/zipheader index 086dd2cb..d4507d25 100644 --- a/workflow/tools/make/zipheader +++ b/workflow/tools/make/zipheader @@ -38,6 +38,7 @@ export PANDA_BEHIND_REAL_LB=true; myargs="$@" setup="" +pre_setup="" POSITIONAL=() while [[ $# -gt 0 ]]; do @@ -48,6 +49,11 @@ while [[ $# -gt 0 ]]; do shift shift ;; + --pre_setup) + pre_setup="$2" + shift + shift + ;; *) POSITIONAL+=("$1") # save it in an array for later shift @@ -57,10 +63,11 @@ done set -- "${POSITIONAL[@]}" # restore positional parameters -echo $setup +echo "pre_setup: " $pre_setup +echo "setup:" $setup run_args=$@ -echo $run_args +echo "run_args: " $run_args cmdfile="run_workflow.sh" cat <<- EOF > ./$cmdfile @@ -90,6 +97,8 @@ chmod +x ./$cmdfile # python "$@" # exec "$@" +$pre_setup + $setup ./$cmdfile ret=$?