From b57580d443865ba1e81c2626d8246ab0344363d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Denis=20Rivi=C3=A8re?= Date: Fri, 4 Aug 2023 18:11:17 +0200 Subject: [PATCH] add a first process to create output directories in a pipeline --- capsul/execution_context.py | 107 ++++++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 6 deletions(-) diff --git a/capsul/execution_context.py b/capsul/execution_context.py index 79dbe5d97..fdf63a2ba 100644 --- a/capsul/execution_context.py +++ b/capsul/execution_context.py @@ -2,8 +2,10 @@ from uuid import uuid4 import importlib +import os -from soma.controller import Controller, OpenKeyDictController, File +from soma.controller import ( + Controller, OpenKeyDictController, File, Directory, field) from soma.api import DictWithProxy, undefined from .dataset import Dataset @@ -99,7 +101,7 @@ class CapsulWorkflow(Controller): # parameters: DictWithProxy jobs: dict - def __init__(self, executable, debug=False): + def __init__(self, executable, create_output_dirs=True, debug=False): super().__init__() top_parameters = DictWithProxy(all_proxies=True) self.jobs = {} @@ -159,6 +161,10 @@ def __init__(self, executable, debug=False): self.jobs[job]['wait_for'].remove(disabled_job[0]) del self.jobs[disabled_job[0]] + out_dirs = set() + out_deps = [] + out_job_id = None + # Transform wait_for sets to lists for json storage # and add waited_by for job_id, job in self.jobs.items(): @@ -185,6 +191,49 @@ def __init__(self, executable, debug=False): parameters_index[k] = i job['parameters_index'] = parameters_index + if create_output_dirs: + # record output directories + outputs = job.get('write_parameters', []) + add_dep = False + for param in outputs: + value = self.parameters_values[parameters_index[param]] + todo = [value] + while todo: + value = todo.pop(0) + if isinstance(value, (list, tuple)): + todo += value + elif isinstance(value, str): + if value.startswith('!{dataset.') \ + and value.find('}') >= 11: + dpath = os.path.dirname(value) + # remove redundant paths (parents of others) + dpathf = os.path.join(dpath, '') + do_add = True + to_remove = [] + for p in out_dirs: + if p.startswith(dpathf): + # already a deeper one present + do_add = False + break + if dpath.startswith(os.path.join(p, '')): + # current is deeper + to_remove.append(p) + for p in to_remove: + out_dirs.remove(p) + if do_add: + out_dirs.add(dpath) + # anyway make a dependency of job over dir_job + add_dep = True + if add_dep: + out_deps.append(job_id) + if out_job_id is None: + out_job_id = str(uuid4()) + wait_for.insert(0, out_job_id) + + # print('out_dirs:', out_dirs) + if len(out_dirs) != 0: + self._create_directories_job(out_job_id, out_dirs, out_deps) + @staticmethod def _no_proxy(parameters, i): if DictWithProxy.is_proxy(i): @@ -235,7 +284,7 @@ def _create_jobs(self, process_iterations=process_iterations, disabled=disabled or node in disabled_nodes) nodes.append(node) - for field in process.user_fields(): + for field in process.user_fields(): # noqa: F402 links = list(executable.get_linked_items( process, field.name, @@ -362,20 +411,21 @@ def _create_jobs(self, elif isinstance(process, Process): job_uuid = str(uuid4()) if disabled: - self.jobs[job_uuid] = { + job = { 'uuid': job_uuid, 'disabled': True, 'wait_for': set(), 'waited_by': set(), } else: - self.jobs[job_uuid] = { + job = { 'uuid': job_uuid, 'disabled': False, 'wait_for': set(), 'process': process.json(include_parameters=False), 'parameters_location': parameters_location } + self.jobs[job_uuid] = job for parent_executable in parent_executables: jobs_per_process.setdefault( parent_executable.uuid + ','.join(process_iterations.get(parent_executable.uuid, [])), @@ -384,8 +434,14 @@ def _create_jobs(self, process.uuid + ','.join(process_iterations.get(process.uuid, [])), set()).add(job_uuid) # print('!create_job!', process.full_name) + opar = [] + wpar = [] for field in process.user_fields(): value = undefined + if field.metadata().get('write', False): + wpar.append(field.name) + if field.metadata().get('output', False): + opar.append(field.name) if getattr(field, 'generate_temporary', False): if field.type is File: prefix = f'!{{dataset.tmp.path}}/{process.full_name}' @@ -430,6 +486,11 @@ def _create_jobs(self, dest_node.uuid + ','.join(process_iterations.get(dest_node.uuid, [])), set()).add( process.uuid + ','.join(process_iterations.get(process.uuid, []))) + if opar: + job['output_parameters'] = opar + if wpar: + job['write_parameters'] = wpar + return parameters def find_job(self, full_name): @@ -440,6 +501,29 @@ def find_job(self, full_name): return job return None + def _create_directories_job(self, job_uuid, out_dirs, out_deps): + if len(out_dirs) == 0: + return None # no dirs to create. + name = 'directories_creation' + + pindex = len(self.parameters_values) + self.parameters_dict['nodes'][name] = {'directories': ['&', pindex]} + self.parameters_values.append(list(out_dirs)) + self.jobs[job_uuid] = { + 'uuid': job_uuid, + 'disabled': False, + 'process': { + 'type': 'process', + 'definition': 'capsul.execution_context.CreateDirectories', + 'uuid': str(uuid4()) + }, + 'parameters_location': ['nodes', name], + 'parameters_index': {'directories': pindex}, + 'wait_for': [], + 'waited_by': list(out_deps), + 'write_parameters': ['directories'], + } + def find_temporary_to_generate(executable): # print('!temporaries! ->', executable.label) @@ -451,7 +535,7 @@ def find_temporary_to_generate(executable): nodes = [executable] for node in nodes: # print('!temporaries! initialize node', node.full_name) - for field in node.user_fields(): + for field in node.user_fields(): # noqa: F402 if (field.output or not field.metadata('write', False) or not node.plugs[field.name].activated): @@ -481,3 +565,14 @@ def find_temporary_to_generate(executable): # print('!temporaries! parameters with temporary') # for n, p in temporaries: # print('!temporaries! ', n, ':', p) + + +class CreateDirectories(Process): + directories: list[Directory] = field( + type_=list[Directory], write=True, + doc='create output directories so that later processes can write ' + 'their output files there.') + + def execute(self, execution_context): + for odir in self.directories: + os.makedirs(odir, exist_ok=True)