diff --git a/capsul/dataset.py b/capsul/dataset.py index 76a4af2e0..aada8701c 100644 --- a/capsul/dataset.py +++ b/capsul/dataset.py @@ -581,7 +581,7 @@ def add_modifier(self, modifier): raise ValueError(f'Invalid value for schema modification for parameter {self.parameter}: {modifier}') def apply(self, metadata, process, parameter, initial_meta): - debug = False # (parameter == 't1mri_nobias') + debug = False # (parameter == 'nobias') if debug: print('apply modifier to', parameter, ':', self, metadata, initial_meta) for modifier in self.modifiers: if isinstance(modifier, dict): @@ -798,6 +798,7 @@ def metadata_modifications(self, process): for field in process.user_fields(): # self.debug = (field.name == 't1mri_nobias') + done_mod = set() if process.plugs[field.name].activated: self.dprint( f' Parse schema modifications for {field.name}') @@ -858,17 +859,22 @@ def metadata_modifications(self, process): if intra_node is not None: filtered_meta = self.get_linked_metadata( schema, intra_node, intra_src, intra_dst) - modifier = MetadataModifier( - schema, node, node_parameter, - filtered_meta=filtered_meta) - if not modifier.is_empty: - self.dprint(f' {modifier.modifiers}') - if filtered_meta is not None: - self.dprint( - ' filtered_meta: ' - f'{filtered_meta}') - result.setdefault(field.name, - []).append(modifier) + if (node, node_parameter) not in done_mod: + # (avoid having several times the same modifier + # via different paths, some Prepend(), Append() + # may be duplicate) + modifier = MetadataModifier( + schema, node, node_parameter, + filtered_meta=filtered_meta) + if not modifier.is_empty: + self.dprint(f' {modifier.modifiers}') + if filtered_meta is not None: + self.dprint( + ' filtered_meta: ' + f'{filtered_meta}') + result.setdefault(field.name, + []).append(modifier) + done_mod.add((node, node_parameter)) else: self.dprint(f' {field.name} ignored (inactive)') @@ -941,6 +947,7 @@ def get_schema(self, schema_name, index=None): return schema def generate_paths(self, executable): + # self.debug = True if self.debug: if self._current_iteration is not None: iteration = f'[{self._current_iteration}]' diff --git a/capsul/execution_context.py b/capsul/execution_context.py index daf8c9863..5b680fcd0 100644 --- a/capsul/execution_context.py +++ b/capsul/execution_context.py @@ -2,13 +2,17 @@ from uuid import uuid4 import importlib +import os -from soma.controller import Controller, OpenKeyDictController, File +from soma.controller import ( + Controller, OpenKeyDictController, File, Directory, field) from soma.api import DictWithProxy, undefined from .dataset import Dataset from .pipeline.pipeline import Process, Pipeline -from .pipeline.process_iteration import IndependentExecutables, ProcessIteration +from .pipeline.process_iteration import (IndependentExecutables, + ProcessIteration) +from .pipeline import pipeline_tools from capsul.config.configuration import get_config_class from .config.configuration import ModuleConfiguration @@ -97,13 +101,17 @@ class CapsulWorkflow(Controller): # parameters: DictWithProxy jobs: dict - def __init__(self, executable, debug=False): + def __init__(self, executable, create_output_dirs=True, debug=False): super().__init__() top_parameters = DictWithProxy(all_proxies=True) self.jobs = {} jobs_per_process = {} process_chronology = {} processes_proxies = {} + if isinstance(executable, Pipeline): + nodes = pipeline_tools.topological_sort_nodes( + executable.all_nodes()) + pipeline_tools.propagate_meta(executable, nodes) job_parameters = self._create_jobs( top_parameters=top_parameters, executable=executable, @@ -132,7 +140,8 @@ def __init__(self, executable, debug=False): bj['waited_by'].add(after_job) # Resolve disabled jobs - disabled_jobs = [(uuid, job) for uuid, job in self.jobs.items() if job['disabled']] + disabled_jobs = [(uuid, job) for uuid, job in self.jobs.items() + if job['disabled']] for disabled_job in disabled_jobs: wait_for = set() stack = disabled_job[1]['wait_for'] @@ -153,7 +162,11 @@ def __init__(self, executable, debug=False): for job in disabled_job[1]['waited_by']: self.jobs[job]['wait_for'].remove(disabled_job[0]) del self.jobs[disabled_job[0]] - + + out_dirs = set() + out_deps = [] + out_job_id = None + # Transform wait_for sets to lists for json storage # and add waited_by for job_id, job in self.jobs.items(): @@ -180,6 +193,58 @@ def __init__(self, executable, debug=False): parameters_index[k] = i job['parameters_index'] = parameters_index + if create_output_dirs: + # record output directories + outputs = job.get('write_parameters', []) + add_dep = False + for param in outputs: + todo = [parameters_index[param]] + pindices = [] + while todo: + pind = todo.pop(0) + if isinstance(pind, (list, tuple)): + todo += pind + else: + pindices.append(pind) + + value = [self.parameters_values[pind] for pind in pindices] + todo = value + while todo: + value = todo.pop(0) + if isinstance(value, (list, tuple)): + todo += value + elif isinstance(value, str): + if value.startswith('!{dataset.') \ + and value.find('}') >= 11: + dpath = os.path.dirname(value) + # remove redundant paths (parents of others) + dpathf = os.path.join(dpath, '') + do_add = True + to_remove = [] + for p in out_dirs: + if p.startswith(dpathf): + # already a deeper one present + do_add = False + break + if dpath.startswith(os.path.join(p, '')): + # current is deeper + to_remove.append(p) + for p in to_remove: + out_dirs.remove(p) + if do_add: + out_dirs.add(dpath) + # anyway make a dependency of job over dir_job + add_dep = True + if add_dep: + out_deps.append(job_id) + if out_job_id is None: + out_job_id = str(uuid4()) + wait_for.insert(0, out_job_id) + + # print('out_dirs:', out_dirs) + if len(out_dirs) != 0: + self._create_directories_job(out_job_id, out_dirs, out_deps) + @staticmethod def _no_proxy(parameters, i): if DictWithProxy.is_proxy(i): @@ -211,10 +276,10 @@ def _create_jobs(self, find_temporary_to_generate(executable) disabled_nodes = process.disabled_pipeline_steps_nodes() for node_name, node in process.nodes.items(): - if (node is process - or not node.activated - or not isinstance(node, Process) - or node in disabled_nodes): + if (node is process + or not node.activated + or not isinstance(node, Process) + or node in disabled_nodes): continue nodes_dict[node_name] = {} job_parameters = self._create_jobs( @@ -230,10 +295,18 @@ def _create_jobs(self, process_iterations=process_iterations, disabled=disabled or node in disabled_nodes) nodes.append(node) - for field in process.user_fields(): - for dest_node, plug_name in executable.get_linked_items(process, - field.name, - in_sub_pipelines=False): + for field in process.user_fields(): # noqa: F402 + links = list(executable.get_linked_items( + process, + field.name, + in_sub_pipelines=False, + direction='links_from')) \ + + list(executable.get_linked_items( + process, + field.name, + in_sub_pipelines=False, + direction='links_to')) + for dest_node, plug_name in links: if dest_node in disabled_nodes: continue if field.metadata('write', False) \ @@ -243,12 +316,18 @@ def _create_jobs(self, else: parameters.content[field.name] \ = nodes_dict.get(dest_node.name, {}).get(plug_name) - break + # break if field.is_output(): - for dest_node_name, dest_plug_name, dest_node, dest_plug, is_weak in process.plugs[field.name].links_to: - if (isinstance(dest_node, Process) and dest_node.activated and - dest_node not in disabled_nodes and - not dest_node.field(dest_plug_name).is_output()): + for dest_node, dest_plug_name \ + in executable.get_linked_items( + process, field.name, direction='links_to', + in_sub_pipelines=True, + in_outer_pipelines=True): + if (isinstance(dest_node, Process) + and dest_node.activated + and dest_node not in disabled_nodes + and not dest_node.field( + dest_plug_name).is_output()): if isinstance(dest_node, Pipeline): continue process_chronology.setdefault( @@ -258,9 +337,11 @@ def _create_jobs(self, for node in nodes: for plug_name in node.plugs: first = nodes_dict[node.name].get(plug_name) - for dest_node, dest_plug_name in process.get_linked_items(node, plug_name, - in_sub_pipelines=False): - + for dest_node, dest_plug_name in process.get_linked_items( + node, plug_name, + in_sub_pipelines=False, + direction=('links_from', 'links_to')): + second = nodes_dict.get(dest_node.name, {}).get(dest_plug_name) if dest_node.pipeline is not node.pipeline: continue @@ -325,8 +406,12 @@ def _create_jobs(self, if isinstance(executable, Pipeline): for field in process.user_fields(): if field.is_output(): - for dest_node, plug_name in executable.get_linked_items(process, field.name, - direction='links_to'): + for dest_node, plug_name \ + in executable.get_linked_items( + process, field.name, + direction='links_to', + in_sub_pipelines=True, + in_outer_pipelines=True): if isinstance(dest_node, Pipeline): continue process_chronology.setdefault( @@ -337,20 +422,21 @@ def _create_jobs(self, elif isinstance(process, Process): job_uuid = str(uuid4()) if disabled: - self.jobs[job_uuid] = { + job = { 'uuid': job_uuid, 'disabled': True, 'wait_for': set(), 'waited_by': set(), } else: - self.jobs[job_uuid] = { + job = { 'uuid': job_uuid, 'disabled': False, 'wait_for': set(), 'process': process.json(include_parameters=False), 'parameters_location': parameters_location } + self.jobs[job_uuid] = job for parent_executable in parent_executables: jobs_per_process.setdefault( parent_executable.uuid + ','.join(process_iterations.get(parent_executable.uuid, [])), @@ -359,8 +445,14 @@ def _create_jobs(self, process.uuid + ','.join(process_iterations.get(process.uuid, [])), set()).add(job_uuid) # print('!create_job!', process.full_name) + opar = [] + wpar = [] for field in process.user_fields(): value = undefined + if field.metadata().get('write', False): + wpar.append(field.name) + if field.metadata().get('output', False): + opar.append(field.name) if getattr(field, 'generate_temporary', False): if field.type is File: prefix = f'!{{dataset.tmp.path}}/{process.full_name}' @@ -385,22 +477,65 @@ def _create_jobs(self, suffix = '' uuid = str(uuid4()) value[i] = f'{prefix}.{field.name}_{i}_{uuid}{suffix}' + # print('generate tmp:', value) if value is undefined: value = process.getattr(field.name, None) # print(' ', field.name, '<-', repr(value), getattr(field, 'generate_temporary', False)) proxy = parameters.proxy(executable.json_value(value)) parameters[field.name] = proxy - if field.is_output() and isinstance(executable, (Pipeline, ProcessIteration)): - for dest_node, plug_name in executable.get_linked_items(process, field.name, - direction='links_to'): + if field.is_output() and isinstance( + executable, (Pipeline, ProcessIteration)): + for dest_node, plug_name \ + in executable.get_linked_items( + process, field.name, + direction='links_to', + in_sub_pipelines=True, + in_outer_pipelines=True): if isinstance(dest_node, Pipeline): continue process_chronology.setdefault( dest_node.uuid + ','.join(process_iterations.get(dest_node.uuid, [])), set()).add( process.uuid + ','.join(process_iterations.get(process.uuid, []))) + if opar: + job['output_parameters'] = opar + if wpar: + job['write_parameters'] = wpar + return parameters + def find_job(self, full_name): + for job in self.jobs.values(): + pl = [p for p in job['parameters_location'] if p != 'nodes'] + name = '.'.join(pl) + if name == full_name: + return job + return None + + def _create_directories_job(self, job_uuid, out_dirs, out_deps): + if len(out_dirs) == 0: + return None # no dirs to create. + name = 'directories_creation' + + pindex = len(self.parameters_values) + self.parameters_dict['nodes'][name] = {'directories': ['&', pindex]} + self.parameters_values.append(list(out_dirs)) + self.jobs[job_uuid] = { + 'uuid': job_uuid, + 'disabled': False, + 'process': { + 'type': 'process', + 'definition': 'capsul.execution_context.CreateDirectories', + 'uuid': str(uuid4()) + }, + 'parameters_location': ['nodes', name], + 'parameters_index': {'directories': pindex}, + 'wait_for': [], + 'waited_by': list(out_deps), + 'write_parameters': ['directories'], + } + + def find_temporary_to_generate(executable): # print('!temporaries! ->', executable.label) if isinstance(executable, Pipeline): @@ -411,7 +546,7 @@ def find_temporary_to_generate(executable): nodes = [executable] for node in nodes: # print('!temporaries! initialize node', node.full_name) - for field in node.user_fields(): + for field in node.user_fields(): # noqa: F402 if (field.output or not field.metadata('write', False) or not node.plugs[field.name].activated): @@ -441,3 +576,14 @@ def find_temporary_to_generate(executable): # print('!temporaries! parameters with temporary') # for n, p in temporaries: # print('!temporaries! ', n, ':', p) + + +class CreateDirectories(Process): + directories: list[Directory] = field( + type_=list[Directory], write=True, + doc='create output directories so that later processes can write ' + 'their output files there.') + + def execute(self, execution_context): + for odir in self.directories: + os.makedirs(odir, exist_ok=True) diff --git a/capsul/pipeline/pipeline.py b/capsul/pipeline/pipeline.py index 5c4c750b3..fdf57f85d 100644 --- a/capsul/pipeline/pipeline.py +++ b/capsul/pipeline/pipeline.py @@ -1518,7 +1518,7 @@ def workflow_ordered_nodes(self, remove_disabled_steps=True): graph = self.workflow_graph(remove_disabled_steps) - # Start the topologival sort + # Start the topological sort ordered_list = graph.topological_sort() def walk_workflow(wokflow, workflow_list): @@ -1541,7 +1541,7 @@ def walk_workflow(wokflow, workflow_list): workflow_list = [] walk_workflow(ordered_list, workflow_list) - return workflow_list + return workflow_list def find_empty_parameters(self): """ Find internal File/Directory parameters not exported to the main @@ -2248,18 +2248,21 @@ def dispatch_plugs(self, node, name): name, in_sub_pipelines=False, activated_only=False, - process_only=False)) + process_only=False, + direction=('links_from', 'links_to'))) while stack: item = stack.pop() if item not in done: node, plug = item yield (node, plug) done.add(item) - stack.extend(self.get_linked_items(node, - plug, + stack.extend(self.get_linked_items( + node, + plug, in_sub_pipelines=False, activated_only=False, - process_only=False)) + process_only=False, + direction=('links_from', 'links_to'))) self.enable_parameter_links = enable_parameter_links def dispatch_all_values(self): @@ -2278,19 +2281,31 @@ def get_linked_items(self, node, plug_name=None, in_sub_pipelines=True, Going through switches and inside subpipelines, ignoring nodes that are not activated. The result is a generator of pairs (node, plug_name). + + direction may be a sting, 'links_from', 'links_to', or a tuple + ('linnks_from', 'links_to'). ''' if plug_name is None: stack = [(node, plug) for plug in node.plugs] else: stack = [(node, plug_name)] + done = set() + while stack: - node, plug_name = stack.pop(0) + current = stack.pop(0) + if current in done: + continue + done.add(current) + node, plug_name = current if activated_only and not node.activated: continue plug = node.plugs.get(plug_name) if plug: if direction is not None: - directions = (direction,) + if isinstance(direction, (tuple, list)): + directions = direction + else: + directions = (direction,) else: if isinstance(node, Pipeline): if in_outer_pipelines: @@ -2304,41 +2319,67 @@ def get_linked_items(self, node, plug_name=None, in_sub_pipelines=True, else: directions = ('links_from',) for current_direction in directions: - for dest_plug_name, dest_node in (i[1:3] for i in getattr(plug, current_direction)): - if dest_node is node or (activated_only - and not dest_node.activated): + for dest_plug_name, dest_node in \ + (i[1:3] for i in getattr(plug, current_direction)): + if dest_node is node \ + or (activated_only + and not dest_node.activated): continue if isinstance(dest_node, Pipeline): - if ((in_sub_pipelines and dest_node is not self) or - (in_outer_pipelines and isinstance(dest_node, Pipeline))): - for n, p in self.get_linked_items(dest_node, - dest_plug_name, - activated_only=activated_only, - process_only=process_only, - in_sub_pipelines=in_sub_pipelines, - direction=current_direction, - in_outer_pipelines=in_outer_pipelines): + if ((in_sub_pipelines and dest_node is not self) + or in_outer_pipelines): + for n, p in self.get_linked_items( + dest_node, + dest_plug_name, + activated_only=activated_only, + process_only=process_only, + in_sub_pipelines=in_sub_pipelines, + direction=current_direction, + in_outer_pipelines=in_outer_pipelines): if n is not node: - yield (n, p) - yield (dest_node, dest_plug_name) + if (n, p) not in done: + yield (n, p) + if (dest_node, dest_plug_name) not in done: + yield (dest_node, dest_plug_name) elif isinstance(dest_node, Switch): if dest_plug_name == 'switch': if not process_only: - yield (dest_node, dest_plug_name) + if (dest_node, dest_plug_name) \ + not in done: + yield (dest_node, dest_plug_name) else: - for input_plug_name, output_plug_name in dest_node.connections(): - if plug.output ^ isinstance(node, Pipeline): + if direction is None \ + or (isinstance(direction, + (tuple, list)) + and len(direction) == 2): + # if bidirectional search only + stack.append((dest_node, dest_plug_name)) + for input_plug_name, output_plug_name \ + in dest_node.connections(): + if current_direction == 'links_to': if dest_plug_name == input_plug_name: - if not process_only: - yield (dest_node, output_plug_name) - stack.append((dest_node, output_plug_name)) + if not process_only \ + and (dest_node, + output_plug_name) \ + not in done: + yield ( + dest_node, + output_plug_name) + stack.append((dest_node, + output_plug_name)) else: if dest_plug_name == output_plug_name: - if not process_only: - yield (dest_node, input_plug_name) - stack.append((dest_node, input_plug_name)) + if not process_only \ + and (dest_node, + input_plug_name) \ + not in done: + yield ( + dest_node, input_plug_name) + stack.append((dest_node, + input_plug_name)) else: - yield (dest_node, dest_plug_name) + if (dest_node, dest_plug_name) not in done: + yield (dest_node, dest_plug_name) def json(self, include_parameters=True): result = super().json(include_parameters=include_parameters) diff --git a/capsul/pipeline/pipeline_nodes.py b/capsul/pipeline/pipeline_nodes.py index 008e8684c..dc78cdecb 100644 --- a/capsul/pipeline/pipeline_nodes.py +++ b/capsul/pipeline/pipeline_nodes.py @@ -235,21 +235,13 @@ def _switch_changed(self, new_selection, old_selection): setattr(self, output_plug_name, getattr(self, corresponding_input_plug_name, undefined)) - if self.pipeline is not None: - f = self.field(output_plug_name) - for n, p in self.pipeline.get_linked_items( - self, corresponding_input_plug_name, - direction='links_from'): - # copy input field metadata - for k, v in n.field(p).metadata().items(): - setattr(f, k, v) - break - # Propagate the associated field documentation out_field = self.field(output_plug_name) in_field = self.field(corresponding_input_plug_name) out_field.doc = in_field.metadata('doc', None) + self.propagate_fields_metadata() + self.pipeline.restore_update_nodes_and_plugs_activation() self.__block_output_propagation = False @@ -314,14 +306,6 @@ def _any_attribute_changed(self, new, old, name): if self.switch == switch_selection: self.__block_output_propagation = True setattr(self, output_plug_name, new) - if self.pipeline is not None: - f = self.field(output_plug_name) - for n, p in self.pipeline.get_linked_items( - self, name, direction='links_from'): - # copy input field metadata - for k, v in n.field(p).metadata().items(): - setattr(f, k, v) - break self.__block_output_propagation = False def __setstate__(self, state): @@ -393,6 +377,26 @@ def configured_controller(self): c.optional_params = [self.field(p).optional for p in self.inputs] return c + def propagate_fields_metadata(self): + ''' Propagate metadata from connected inputs (that is, outputs of + upstream processes) to outputs. + This is needed to get correct status (read/write) on output pipeline + plugs once the switch state is chosen. + ''' + for output_plug_name in self._outputs: + # Get the associated input name + input_plug_name = f'{self.switch}_switch_{output_plug_name}' + + if self.pipeline is not None: + f = self.field(output_plug_name) + for n, p in self.pipeline.get_linked_items( + self, input_plug_name, + direction='links_from'): + # copy input field metadata + for k, v in n.field(p).metadata().items(): + setattr(f, k, v) + break + @classmethod def build_node(cls, pipeline, name, conf_controller): node = Switch(pipeline, name, conf_controller.inputs, diff --git a/capsul/pipeline/pipeline_tools.py b/capsul/pipeline/pipeline_tools.py index d9807dcf9..3780f38e3 100644 --- a/capsul/pipeline/pipeline_tools.py +++ b/capsul/pipeline/pipeline_tools.py @@ -28,10 +28,6 @@ ----------------------------------- :func:`set_pipeline_state_from_dict` ------------------------------------ -:func:`get_output_directories` ------------------------------- -:func:`create_output_directories` ---------------------------------- :func:`save_pipeline` --------------------- :func:`load_pipeline_parameters` @@ -1113,68 +1109,6 @@ def set_pipeline_state_from_dict(pipeline, state_dict): for node_name, sub_dict in sub_nodes.items()] -def get_output_directories(process): - ''' - Get output directories for a process, pipeline, or node - - Returns - ------- - dirs: dict - organized directories list: a dict with recursive nodes mapping. - In each element, the "directories" key holds a directories names set, - and "nodes" is a dict with sub-nodes (node_name, dict mapping, - organized the same way) - flat_dirs: set - set of all directories in the pipeline, as a flat set. - ''' - all_dirs = set() - root_dirs = {} - nodes = [(process, '', root_dirs)] - disabled_nodes = set() - if isinstance(process, Pipeline): - disabled_nodes = process.disabled_pipeline_steps_nodes() - - while nodes: - node, node_name, dirs = nodes.pop(0) - plugs = getattr(node, 'plugs', None) - if plugs is None: - plugs = [field.name for field in node.fields()] - process = node - dirs_set = set() - dirs['directories'] = dirs_set - for param_name in plugs: - field = process.field(param_name) - if process.metadata(field).get('write', False) \ - and field.is_path(): - value = getattr(process, param_name, undefined) - if value is not None and value is not undefined: - directory = os.path.dirname(value) - if directory not in ('', '.'): - all_dirs.add(directory) - dirs_set.add(directory) - sub_nodes = getattr(process, 'nodes', None) - if sub_nodes: - # TODO: handle disabled steps - sub_dict = {} - dirs['nodes'] = sub_dict - for node_name, node in sub_nodes.items(): - if node_name != '' and node.activated and node.enabled \ - and not node in disabled_nodes: - sub_node_dict = {} - sub_dict[node_name] = sub_node_dict - nodes.append((node, node_name, sub_node_dict)) - return root_dirs, all_dirs - - -def create_output_directories(process): - ''' - Create output directories for a process, pipeline or node. - ''' - for directory in get_output_directories(process)[1]: - if not os.path.exists(directory): - os.makedirs(directory) - - def save_pipeline(pipeline, filename): ''' Save the pipeline either in JSON or .py source file @@ -1526,3 +1460,95 @@ def replace_node(node, module_name, dirname, done, parent, node_name): save_pipeline(pipeline, filename) del sys.path[0] + + +def topological_sort_nodes(nodes): + ''' Sort nodes topologically according to their links. + All linked nodes must be in the nodes list: if switched or pipelines are + removed, the sort will be broken. + + In the output list, pipeline nodes will appear twice, in tuples: + + (pipeline, 0) is the position of the input plugs of the pipeline + + (pipeline, 1) is the position of the output plugs of the pipeline + + nodes inside the pipeline will logically be between both. + ''' + nsort = [] + done = set() + todo = list(nodes) + while todo: + node = todo.pop(0) + if node in done: + continue + + i = -1 + cont = False + for plug in node.plugs.values(): + if not plug.output: + for ld in plug.links_from: + n = ld[2] + n0 = n + if isinstance(n, Pipeline): + if not ld[3].output: + n = (n, 0) # begin of pipeline + else: + n = (n, 1) # end of pipeline + if n in done: + ni = nsort.index(n) # WARNING: expensive search + if ni > i: + i = ni + else: + todo.insert(0, n0) + cont = True + break + if cont: + break + if cont: + continue + + # print('insert', node.full_name, ':', i+1) + # if i >= 0: + # print(' after', nsort[i].full_name if not isinstance(nsort[i], tuple) else (nsort[i][0].full_name, nsort[i][1]) ) + if isinstance(node, Pipeline): + nsort.insert(i+1, (node, 0)) + nsort.insert(i+2, (node, 1)) + done.add((node, 0)) + done.add((node, 1)) + else: + nsort.insert(i+1, node) + done.add(node) + return nsort + + +def propagate_meta(executable, nodes=None): + ''' Propagate metadata from processes output plugs to downstream + switches and upper level pipelines plugs, recursively in topological order. + + If ``nodes`` is provided, it should be the nodes list already in + topological order. It may be passed if reused in order to avoid + calling :func:`topological_sort_nodes` several times. + ''' + if nodes is None: + nodes = topological_sort_nodes( + executable.all_nodes()) + for node in nodes: + if isinstance(node, Switch): + node.propagate_fields_metadata() + if isinstance(node, tuple): + if node[1] == 0: + # pipeline inputs + continue + node = node[0] + for pname, plug in node.plugs.items(): + if plug.output: + f = node.field(pname) + for ld in plug.links_to: + n = ld[2] + p = ld[1] + fo = n.field(p) + if fo.is_output(): + # copy field metadata + for k, v in f.metadata().items(): + setattr(fo, k, v) diff --git a/capsul/pipeline/test/test_pipeline.py b/capsul/pipeline/test/test_pipeline.py index b3d1b8a1f..da46278c8 100644 --- a/capsul/pipeline/test/test_pipeline.py +++ b/capsul/pipeline/test/test_pipeline.py @@ -150,7 +150,7 @@ def run_pipeline_io(self, filename): from capsul.pipeline import pipeline_tools pipeline_tools.save_pipeline(pipeline, filename) pipeline2 = executable(filename) - wf = CapsulWorkflow(pipeline2) + wf = CapsulWorkflow(pipeline2, create_output_dirs=False) if self.debug: from soma.qt_gui.qt_backend import QtGui from capsul.qt_gui.widgets import PipelineDeveloperView diff --git a/capsul/pipeline/test/test_pipeline_workflow.py b/capsul/pipeline/test/test_pipeline_workflow.py index 8f480824f..e4fc9ae0f 100644 --- a/capsul/pipeline/test/test_pipeline_workflow.py +++ b/capsul/pipeline/test/test_pipeline_workflow.py @@ -164,30 +164,37 @@ def tearDown(self): def test_full_wf(self): self.pipeline.enable_all_pipeline_steps() - wf = CapsulWorkflow(self.pipeline) + wf = CapsulWorkflow(self.pipeline, create_output_dirs=False) # 4 jobs self.assertEqual(len(wf.jobs), 4) # 3 deps - self.assertEqual(sum(len(job['wait_for']) for job in wf.jobs.values()), 3) + self.assertEqual(sum(len(job['wait_for']) for job in wf.jobs.values()), + 3) + wf = CapsulWorkflow(self.pipeline, create_output_dirs=True) + # 5 jobs with the directories creation + self.assertEqual(len(wf.jobs), 5) + # 4 deps + self.assertEqual(sum(len(job['wait_for']) for job in wf.jobs.values()), + 4) def test_partial_wf1(self): self.pipeline.enable_all_pipeline_steps() self.pipeline.pipeline_steps.step3 = False - wf = CapsulWorkflow(self.pipeline) + wf = CapsulWorkflow(self.pipeline, create_output_dirs=False) self.assertEqual(len(wf.jobs), 3) self.assertEqual(sum(len(job['wait_for']) for job in wf.jobs.values()), 2) def test_partial_wf2(self): self.pipeline.enable_all_pipeline_steps() self.pipeline.pipeline_steps.step2 = False - wf = CapsulWorkflow(self.pipeline) + wf = CapsulWorkflow(self.pipeline, create_output_dirs=False) self.assertEqual(len(wf.jobs), 3) self.assertEqual(sum(len(job['wait_for']) for job in wf.jobs.values()), 0) def test_partial_wf3_fail(self): self.pipeline.enable_all_pipeline_steps() self.pipeline.pipeline_steps.step1 = False - wf = CapsulWorkflow(self.pipeline) + wf = CapsulWorkflow(self.pipeline, create_output_dirs=False) self.assertEqual(len(wf.jobs), 3) self.assertEqual(sum(len(job['wait_for']) for job in wf.jobs.values()), 2) @@ -219,7 +226,7 @@ def test_iter_without_temp(self): pipeline.intermediate = [osp.join(self.tmpdir, 'file_out1'), osp.join(self.tmpdir, 'file_out2')] - wf = CapsulWorkflow(pipeline) + wf = CapsulWorkflow(pipeline, create_output_dirs=False) njobs = niter + 1 # 1 after self.assertEqual(len(wf.jobs), njobs) @@ -247,7 +254,7 @@ def test_iter_workflow(self): pipeline.output2 = osp.join(self.tmpdir, 'file_out2') pipeline.output3 = osp.join(self.tmpdir, 'file_out3') - wf = CapsulWorkflow(pipeline) + wf = CapsulWorkflow(pipeline, create_output_dirs=False) njobs = 4*niter + 3 # 3 after self.assertEqual(len(wf.jobs), njobs) diff --git a/capsul/schemas/brainvisa.py b/capsul/schemas/brainvisa.py index 884d7f3c8..0a2a422af 100644 --- a/capsul/schemas/brainvisa.py +++ b/capsul/schemas/brainvisa.py @@ -110,7 +110,7 @@ def _path_list(self): elif self.data_id == 'sulci_cnn_recognition_param': path_list = [ 'models', f'models_20{self.model_version}', 'cnn_models'] - filename = f'sulci_unet_model_params_{full_side[self.side]}.mdsm' + filename = f'sulci_unet_model_params_{full_side[self.side]}.json' else: filename = self.data_id @@ -695,6 +695,7 @@ class BrainVolumesBrainVISA(ProcessSchema, schema='brainvisa', 'sulci_graph_version': None, 'sulci_recognition_session': None, 'suffix': None, + 'extension': 'nii.gz', # should not be hard-coded but I failed } right_csf = { 'prefix': 'csf', @@ -703,6 +704,7 @@ class BrainVolumesBrainVISA(ProcessSchema, schema='brainvisa', 'sulci_graph_version': None, 'sulci_recognition_session': None, 'suffix': None, + 'extension': 'nii.gz', # should not be hard-coded but I failed } brain_volumes_file = { 'prefix': 'brain_volumes', @@ -725,7 +727,11 @@ class BrainVolumesBrainVISA(ProcessSchema, schema='brainvisa', 'extension': None } _meta_links = { - '*_labelled_graph': {'*': []} + '*_labelled_graph': {'*': []}, + '*_grey_white': {'*': []}, + '*_mesh': {'*': []}, + 'left_grey_white': {'left_csf': ['extension']}, # no effect ..? + 'right_grey_white': {'right_csf': ['extension']}, # no effect ..? } class MorphoReportBIDS(ProcessSchema, schema='bids', diff --git a/capsul/test/test_completion.py b/capsul/test/test_completion.py index d782aa769..a7bf6f7aa 100644 --- a/capsul/test/test_completion.py +++ b/capsul/test/test_completion.py @@ -88,8 +88,11 @@ class TestPipelineBrainVISA(ProcessSchema, schema='brainvisa', '*': {'process': 'test_pipeline'}, } _nodes = { + 'nobias': { + 'output': {'seg_directory': None}, #, 'prefix': 'nobias'}, + }, 'split': { - '*': {'seg_directory': 'segmentation'}, + '*_output': {'seg_directory': 'plouf'}, 'left_output':{'side': 'L', 'suffix': ''}, 'right_output': {'side': 'R', 'suffix': ''}, } @@ -99,7 +102,6 @@ class TestPipelineBrainVISA(ProcessSchema, schema='brainvisa', - datasets = { 'template': 'shared', }