diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index 52aa3c84..2247f45c 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -909,7 +909,16 @@ def encode_base64(sb): return sb -def create_archive_file(work_dir, archive_filename, files): +def is_execluded_file(file, exclude_files=[]): + if exclude_files: + for f in exclude_files: + reg = re.compile(f) + if re.match(reg, file): + return True + return False + + +def create_archive_file(work_dir, archive_filename, files, exclude_files=[]): if not archive_filename.startswith("/"): archive_filename = os.path.join(work_dir, archive_filename) @@ -921,8 +930,9 @@ def create_archive_file(work_dir, archive_filename, files): elif os.path.isdir(local_file): for root, dirs, fs in os.walk(local_file): for f in fs: - file_path = os.path.join(root, f) - tar.add(file_path, arcname=os.path.relpath(file_path, local_file)) + if not is_execluded_file(f, exclude_files): + file_path = os.path.join(root, f) + tar.add(file_path, arcname=os.path.relpath(file_path, local_file)) return archive_filename diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index ef4401ef..c3710819 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -615,6 +615,7 @@ def store(self): 'original_args': self._func_name_and_args, 'multi_jobs_kwargs_list': self._multi_jobs_kwargs_list, 'current_job_kwargs': self._current_job_kwargs} + content = json_dumps(content) source_dir = self._context.get_source_dir() self.save_context(source_dir, self._name, content) @@ -625,6 +626,7 @@ def load(self, source_dir=None): source_dir = os.getcwd() ret = self.load_context(source_dir, self._name) if ret: + ret = json_loads(ret) logging.info(f"Loaded context: {ret}") if 'multi_jobs_kwargs_list' in ret: self._multi_jobs_kwargs_list = ret['multi_jobs_kwargs_list'] diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 3d271212..e3b05ea0 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -55,7 +55,7 @@ def get_current_workflow(cls): class WorkflowContext(Context): - def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None): + def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[]): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None @@ -111,6 +111,13 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo self._init_env = init_env + self._exclude_source_files = [] + if exclude_source_files: + if type(exclude_source_files) in [list, tuple]: + self._exclude_source_files = exclude_source_files + else: + self._exclude_source_files = [exclude_source_files] + @property def logger(self): return logging.getLogger(self.__class__.__name__) @@ -593,7 +600,7 @@ def upload_source_files_to_panda(self): return None archive_name = self.get_archive_name() - archive_file = create_archive_file('/tmp', archive_name, [self._source_dir]) + archive_file = create_archive_file('/tmp', archive_name, [self._source_dir], exclude_files=self._exclude_source_files) logging.info("created archive file: %s" % archive_file) from pandaclient import Client @@ -681,7 +688,8 @@ class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None, - init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None): + init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None, + exclude_source_files=[]): """ Init a workflow. """ @@ -715,7 +723,8 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo self._context = context else: self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir, - distributed=distributed, init_env=init_env, max_walltime=max_walltime) + distributed=distributed, init_env=init_env, max_walltime=max_walltime, + exclude_source_files=exclude_source_files) @property def service(self): @@ -935,6 +944,7 @@ def store(self): 'original_args': self._func_name_and_args, 'multi_jobs_kwargs_list': self._multi_jobs_kwargs_list, 'current_job_kwargs': self._current_job_kwargs} + content = json_dumps(content) source_dir = self._context.get_source_dir() self.save_context(source_dir, self._name, content) @@ -945,6 +955,7 @@ def load(self, source_dir=None): source_dir = os.getcwd() ret = self.load_context(source_dir, self._name) if ret: + ret = json_loads(ret) logging.info(f"Loaded context: {ret}") if 'multi_jobs_kwargs_list' in ret: self._multi_jobs_kwargs_list = ret['multi_jobs_kwargs_list'] @@ -1198,11 +1209,12 @@ def get_func_name(self): # foo = workflow(arg)(foo) def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False, - source_dir_parent_level=None): + source_dir_parent_level=None, exclude_source_files=[]): if func is None: return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps, - return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level) + return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level, + exclude_source_files=exclude_source_files) if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ: return func @@ -1211,7 +1223,9 @@ def workflow(func=None, *, local=False, service='idds', source_dir=None, primary def wrapper(*args, **kwargs): try: f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed, - pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level) + pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level, + exclude_source_files=exclude_source_files) + f.queue = queue f.site = site f.cloud = cloud