Skip to content

Commit

Permalink
Merge pull request #287 from populse/debug_workflow
Browse files Browse the repository at this point in the history
Debug workflow
  • Loading branch information
denisri authored Aug 6, 2023
2 parents 4eef044 + 390b587 commit de6bb8f
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 169 deletions.
31 changes: 19 additions & 12 deletions capsul/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def add_modifier(self, modifier):
raise ValueError(f'Invalid value for schema modification for parameter {self.parameter}: {modifier}')

def apply(self, metadata, process, parameter, initial_meta):
debug = False # (parameter == 't1mri_nobias')
debug = False # (parameter == 'nobias')
if debug: print('apply modifier to', parameter, ':', self, metadata, initial_meta)
for modifier in self.modifiers:
if isinstance(modifier, dict):
Expand Down Expand Up @@ -798,6 +798,7 @@ def metadata_modifications(self, process):

for field in process.user_fields():
# self.debug = (field.name == 't1mri_nobias')
done_mod = set()
if process.plugs[field.name].activated:
self.dprint(
f' Parse schema modifications for {field.name}')
Expand Down Expand Up @@ -858,17 +859,22 @@ def metadata_modifications(self, process):
if intra_node is not None:
filtered_meta = self.get_linked_metadata(
schema, intra_node, intra_src, intra_dst)
modifier = MetadataModifier(
schema, node, node_parameter,
filtered_meta=filtered_meta)
if not modifier.is_empty:
self.dprint(f' {modifier.modifiers}')
if filtered_meta is not None:
self.dprint(
' filtered_meta: '
f'{filtered_meta}')
result.setdefault(field.name,
[]).append(modifier)
if (node, node_parameter) not in done_mod:
# (avoid having several times the same modifier
# via different paths, some Prepend(), Append()
# may be duplicate)
modifier = MetadataModifier(
schema, node, node_parameter,
filtered_meta=filtered_meta)
if not modifier.is_empty:
self.dprint(f' {modifier.modifiers}')
if filtered_meta is not None:
self.dprint(
' filtered_meta: '
f'{filtered_meta}')
result.setdefault(field.name,
[]).append(modifier)
done_mod.add((node, node_parameter))
else:
self.dprint(f' {field.name} ignored (inactive)')

Expand Down Expand Up @@ -941,6 +947,7 @@ def get_schema(self, schema_name, index=None):
return schema

def generate_paths(self, executable):
# self.debug = True
if self.debug:
if self._current_iteration is not None:
iteration = f'[{self._current_iteration}]'
Expand Down
204 changes: 175 additions & 29 deletions capsul/execution_context.py

Large diffs are not rendered by default.

105 changes: 73 additions & 32 deletions capsul/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,7 @@ def workflow_ordered_nodes(self, remove_disabled_steps=True):
graph = self.workflow_graph(remove_disabled_steps)


# Start the topologival sort
# Start the topological sort
ordered_list = graph.topological_sort()

def walk_workflow(wokflow, workflow_list):
Expand All @@ -1541,7 +1541,7 @@ def walk_workflow(wokflow, workflow_list):
workflow_list = []
walk_workflow(ordered_list, workflow_list)

return workflow_list
return workflow_list

def find_empty_parameters(self):
""" Find internal File/Directory parameters not exported to the main
Expand Down Expand Up @@ -2248,18 +2248,21 @@ def dispatch_plugs(self, node, name):
name,
in_sub_pipelines=False,
activated_only=False,
process_only=False))
process_only=False,
direction=('links_from', 'links_to')))
while stack:
item = stack.pop()
if item not in done:
node, plug = item
yield (node, plug)
done.add(item)
stack.extend(self.get_linked_items(node,
plug,
stack.extend(self.get_linked_items(
node,
plug,
in_sub_pipelines=False,
activated_only=False,
process_only=False))
process_only=False,
direction=('links_from', 'links_to')))
self.enable_parameter_links = enable_parameter_links

def dispatch_all_values(self):
Expand All @@ -2278,19 +2281,31 @@ def get_linked_items(self, node, plug_name=None, in_sub_pipelines=True,
Going through switches and inside subpipelines, ignoring nodes that are
not activated.
The result is a generator of pairs (node, plug_name).
direction may be a sting, 'links_from', 'links_to', or a tuple
('linnks_from', 'links_to').
'''
if plug_name is None:
stack = [(node, plug) for plug in node.plugs]
else:
stack = [(node, plug_name)]
done = set()

while stack:
node, plug_name = stack.pop(0)
current = stack.pop(0)
if current in done:
continue
done.add(current)
node, plug_name = current
if activated_only and not node.activated:
continue
plug = node.plugs.get(plug_name)
if plug:
if direction is not None:
directions = (direction,)
if isinstance(direction, (tuple, list)):
directions = direction
else:
directions = (direction,)
else:
if isinstance(node, Pipeline):
if in_outer_pipelines:
Expand All @@ -2304,41 +2319,67 @@ def get_linked_items(self, node, plug_name=None, in_sub_pipelines=True,
else:
directions = ('links_from',)
for current_direction in directions:
for dest_plug_name, dest_node in (i[1:3] for i in getattr(plug, current_direction)):
if dest_node is node or (activated_only
and not dest_node.activated):
for dest_plug_name, dest_node in \
(i[1:3] for i in getattr(plug, current_direction)):
if dest_node is node \
or (activated_only
and not dest_node.activated):
continue
if isinstance(dest_node, Pipeline):
if ((in_sub_pipelines and dest_node is not self) or
(in_outer_pipelines and isinstance(dest_node, Pipeline))):
for n, p in self.get_linked_items(dest_node,
dest_plug_name,
activated_only=activated_only,
process_only=process_only,
in_sub_pipelines=in_sub_pipelines,
direction=current_direction,
in_outer_pipelines=in_outer_pipelines):
if ((in_sub_pipelines and dest_node is not self)
or in_outer_pipelines):
for n, p in self.get_linked_items(
dest_node,
dest_plug_name,
activated_only=activated_only,
process_only=process_only,
in_sub_pipelines=in_sub_pipelines,
direction=current_direction,
in_outer_pipelines=in_outer_pipelines):
if n is not node:
yield (n, p)
yield (dest_node, dest_plug_name)
if (n, p) not in done:
yield (n, p)
if (dest_node, dest_plug_name) not in done:
yield (dest_node, dest_plug_name)
elif isinstance(dest_node, Switch):
if dest_plug_name == 'switch':
if not process_only:
yield (dest_node, dest_plug_name)
if (dest_node, dest_plug_name) \
not in done:
yield (dest_node, dest_plug_name)
else:
for input_plug_name, output_plug_name in dest_node.connections():
if plug.output ^ isinstance(node, Pipeline):
if direction is None \
or (isinstance(direction,
(tuple, list))
and len(direction) == 2):
# if bidirectional search only
stack.append((dest_node, dest_plug_name))
for input_plug_name, output_plug_name \
in dest_node.connections():
if current_direction == 'links_to':
if dest_plug_name == input_plug_name:
if not process_only:
yield (dest_node, output_plug_name)
stack.append((dest_node, output_plug_name))
if not process_only \
and (dest_node,
output_plug_name) \
not in done:
yield (
dest_node,
output_plug_name)
stack.append((dest_node,
output_plug_name))
else:
if dest_plug_name == output_plug_name:
if not process_only:
yield (dest_node, input_plug_name)
stack.append((dest_node, input_plug_name))
if not process_only \
and (dest_node,
input_plug_name) \
not in done:
yield (
dest_node, input_plug_name)
stack.append((dest_node,
input_plug_name))
else:
yield (dest_node, dest_plug_name)
if (dest_node, dest_plug_name) not in done:
yield (dest_node, dest_plug_name)

def json(self, include_parameters=True):
result = super().json(include_parameters=include_parameters)
Expand Down
40 changes: 22 additions & 18 deletions capsul/pipeline/pipeline_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,13 @@ def _switch_changed(self, new_selection, old_selection):
setattr(self, output_plug_name,
getattr(self, corresponding_input_plug_name, undefined))

if self.pipeline is not None:
f = self.field(output_plug_name)
for n, p in self.pipeline.get_linked_items(
self, corresponding_input_plug_name,
direction='links_from'):
# copy input field metadata
for k, v in n.field(p).metadata().items():
setattr(f, k, v)
break

# Propagate the associated field documentation
out_field = self.field(output_plug_name)
in_field = self.field(corresponding_input_plug_name)
out_field.doc = in_field.metadata('doc', None)

self.propagate_fields_metadata()

self.pipeline.restore_update_nodes_and_plugs_activation()
self.__block_output_propagation = False

Expand Down Expand Up @@ -314,14 +306,6 @@ def _any_attribute_changed(self, new, old, name):
if self.switch == switch_selection:
self.__block_output_propagation = True
setattr(self, output_plug_name, new)
if self.pipeline is not None:
f = self.field(output_plug_name)
for n, p in self.pipeline.get_linked_items(
self, name, direction='links_from'):
# copy input field metadata
for k, v in n.field(p).metadata().items():
setattr(f, k, v)
break
self.__block_output_propagation = False

def __setstate__(self, state):
Expand Down Expand Up @@ -393,6 +377,26 @@ def configured_controller(self):
c.optional_params = [self.field(p).optional for p in self.inputs]
return c

def propagate_fields_metadata(self):
''' Propagate metadata from connected inputs (that is, outputs of
upstream processes) to outputs.
This is needed to get correct status (read/write) on output pipeline
plugs once the switch state is chosen.
'''
for output_plug_name in self._outputs:
# Get the associated input name
input_plug_name = f'{self.switch}_switch_{output_plug_name}'

if self.pipeline is not None:
f = self.field(output_plug_name)
for n, p in self.pipeline.get_linked_items(
self, input_plug_name,
direction='links_from'):
# copy input field metadata
for k, v in n.field(p).metadata().items():
setattr(f, k, v)
break

@classmethod
def build_node(cls, pipeline, name, conf_controller):
node = Switch(pipeline, name, conf_controller.inputs,
Expand Down
Loading

0 comments on commit de6bb8f

Please sign in to comment.