Skip to content

Commit

Permalink
add a first process to create output directories in a pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
denisri committed Aug 4, 2023
1 parent ea52b95 commit b57580d
Showing 1 changed file with 101 additions and 6 deletions.
107 changes: 101 additions & 6 deletions capsul/execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from uuid import uuid4
import importlib
import os

from soma.controller import Controller, OpenKeyDictController, File
from soma.controller import (
Controller, OpenKeyDictController, File, Directory, field)
from soma.api import DictWithProxy, undefined

from .dataset import Dataset
Expand Down Expand Up @@ -99,7 +101,7 @@ class CapsulWorkflow(Controller):
# parameters: DictWithProxy
jobs: dict

def __init__(self, executable, debug=False):
def __init__(self, executable, create_output_dirs=True, debug=False):
super().__init__()
top_parameters = DictWithProxy(all_proxies=True)
self.jobs = {}
Expand Down Expand Up @@ -159,6 +161,10 @@ def __init__(self, executable, debug=False):
self.jobs[job]['wait_for'].remove(disabled_job[0])
del self.jobs[disabled_job[0]]

out_dirs = set()
out_deps = []
out_job_id = None

# Transform wait_for sets to lists for json storage
# and add waited_by
for job_id, job in self.jobs.items():
Expand All @@ -185,6 +191,49 @@ def __init__(self, executable, debug=False):
parameters_index[k] = i
job['parameters_index'] = parameters_index

if create_output_dirs:
# record output directories
outputs = job.get('write_parameters', [])
add_dep = False
for param in outputs:
value = self.parameters_values[parameters_index[param]]
todo = [value]
while todo:
value = todo.pop(0)
if isinstance(value, (list, tuple)):
todo += value
elif isinstance(value, str):
if value.startswith('!{dataset.') \
and value.find('}') >= 11:
dpath = os.path.dirname(value)
# remove redundant paths (parents of others)
dpathf = os.path.join(dpath, '')
do_add = True
to_remove = []
for p in out_dirs:
if p.startswith(dpathf):
# already a deeper one present
do_add = False
break
if dpath.startswith(os.path.join(p, '')):
# current is deeper
to_remove.append(p)
for p in to_remove:
out_dirs.remove(p)
if do_add:
out_dirs.add(dpath)
# anyway make a dependency of job over dir_job
add_dep = True
if add_dep:
out_deps.append(job_id)
if out_job_id is None:
out_job_id = str(uuid4())
wait_for.insert(0, out_job_id)

# print('out_dirs:', out_dirs)
if len(out_dirs) != 0:
self._create_directories_job(out_job_id, out_dirs, out_deps)

@staticmethod
def _no_proxy(parameters, i):
if DictWithProxy.is_proxy(i):
Expand Down Expand Up @@ -235,7 +284,7 @@ def _create_jobs(self,
process_iterations=process_iterations,
disabled=disabled or node in disabled_nodes)
nodes.append(node)
for field in process.user_fields():
for field in process.user_fields(): # noqa: F402
links = list(executable.get_linked_items(
process,
field.name,
Expand Down Expand Up @@ -362,20 +411,21 @@ def _create_jobs(self,
elif isinstance(process, Process):
job_uuid = str(uuid4())
if disabled:
self.jobs[job_uuid] = {
job = {
'uuid': job_uuid,
'disabled': True,
'wait_for': set(),
'waited_by': set(),
}
else:
self.jobs[job_uuid] = {
job = {
'uuid': job_uuid,
'disabled': False,
'wait_for': set(),
'process': process.json(include_parameters=False),
'parameters_location': parameters_location
}
self.jobs[job_uuid] = job
for parent_executable in parent_executables:
jobs_per_process.setdefault(
parent_executable.uuid + ','.join(process_iterations.get(parent_executable.uuid, [])),
Expand All @@ -384,8 +434,14 @@ def _create_jobs(self,
process.uuid + ','.join(process_iterations.get(process.uuid, [])),
set()).add(job_uuid)
# print('!create_job!', process.full_name)
opar = []
wpar = []
for field in process.user_fields():
value = undefined
if field.metadata().get('write', False):
wpar.append(field.name)
if field.metadata().get('output', False):
opar.append(field.name)
if getattr(field, 'generate_temporary', False):
if field.type is File:
prefix = f'!{{dataset.tmp.path}}/{process.full_name}'
Expand Down Expand Up @@ -430,6 +486,11 @@ def _create_jobs(self,
dest_node.uuid + ','.join(process_iterations.get(dest_node.uuid, [])),
set()).add(
process.uuid + ','.join(process_iterations.get(process.uuid, [])))
if opar:
job['output_parameters'] = opar
if wpar:
job['write_parameters'] = wpar

return parameters

def find_job(self, full_name):
Expand All @@ -440,6 +501,29 @@ def find_job(self, full_name):
return job
return None

def _create_directories_job(self, job_uuid, out_dirs, out_deps):
if len(out_dirs) == 0:
return None # no dirs to create.
name = 'directories_creation'

pindex = len(self.parameters_values)
self.parameters_dict['nodes'][name] = {'directories': ['&', pindex]}
self.parameters_values.append(list(out_dirs))
self.jobs[job_uuid] = {
'uuid': job_uuid,
'disabled': False,
'process': {
'type': 'process',
'definition': 'capsul.execution_context.CreateDirectories',
'uuid': str(uuid4())
},
'parameters_location': ['nodes', name],
'parameters_index': {'directories': pindex},
'wait_for': [],
'waited_by': list(out_deps),
'write_parameters': ['directories'],
}


def find_temporary_to_generate(executable):
# print('!temporaries! ->', executable.label)
Expand All @@ -451,7 +535,7 @@ def find_temporary_to_generate(executable):
nodes = [executable]
for node in nodes:
# print('!temporaries! initialize node', node.full_name)
for field in node.user_fields():
for field in node.user_fields(): # noqa: F402
if (field.output or
not field.metadata('write', False) or
not node.plugs[field.name].activated):
Expand Down Expand Up @@ -481,3 +565,14 @@ def find_temporary_to_generate(executable):
# print('!temporaries! parameters with temporary')
# for n, p in temporaries:
# print('!temporaries! ', n, ':', p)


class CreateDirectories(Process):
directories: list[Directory] = field(
type_=list[Directory], write=True,
doc='create output directories so that later processes can write '
'their output files there.')

def execute(self, execution_context):
for odir in self.directories:
os.makedirs(odir, exist_ok=True)

0 comments on commit b57580d

Please sign in to comment.