Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/3.0' into 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
nsouedet committed Dec 7, 2023
2 parents d79dc0f + 3c9d039 commit dfeca74
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 26 deletions.
7 changes: 6 additions & 1 deletion capsul/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ def job_parameters_from_values(self, job_dict, parameters_values):
result[k] = parameters_values[i]
return result

def successful_node_paths(self, engine_id, execution_id):
raise NotImplementedError

def print_execution_report(self, report, file=sys.stdout):
print(
"====================\n" "| Execution report |\n" "====================\n",
Expand Down Expand Up @@ -410,6 +413,8 @@ def update_executable(self, engine_id, execution_id, executable):
# pprint(parameters.content)
while stack:
node, parameters = stack.pop(0)
if parameters is None:
continue
for field in node.user_fields():
value = parameters.get(field.name, undefined)
value = parameters.no_proxy(value)
Expand All @@ -421,7 +426,7 @@ def update_executable(self, engine_id, execution_id, executable):
setattr(node, field.name, value)
if isinstance(node, Pipeline):
stack.extend(
(n, parameters["nodes"][n.name])
(n, parameters["nodes"].get(n.name))
for n in node.nodes.values()
if n is not node and isinstance(n, Process) and n.activated
)
Expand Down
38 changes: 13 additions & 25 deletions capsul/database/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,31 +310,6 @@ def _enter(self):
"""
)

self._update_workflow_parameters = self.redis.register_script(
"""
local execution_key = KEYS[1]
local parameters_location = cjson.decode(ARGV[1])
local output_parameters = cjson.decode(ARGV[2])
local workflow_parameters = cjson.decode(redis.call('hget', execution_key, 'workflow_parameters'))
local parameters = workflow_parameters['content']
for index, value in ipairs(parameters_location) do
local i = tonumber(value)
if i then
parameters = parameters[i+1]
else
parameters = parameters[value]
end
end
for k, v in pairs(output_parameters) do
workflow_parameters['proxy_values'][parameters[k][2]+1] = v
end
redis.call('hset', execution_key, 'workflow_parameters', cjson.encode(workflow_parameters))
"""
)

self._dispose = self.redis.register_script(
"""
local function table_find(array, value)
Expand Down Expand Up @@ -808,6 +783,19 @@ def execution_report_json(self, engine_id, execution_id):

return result

def successful_node_paths(self, engine_id, execution_id):
execution_key = f"capsul:{engine_id}:{execution_id}"
failed = json.loads(self.redis.hget(execution_key, "done"))
for job_uuid in failed:
job = json.loads(
self.redis.hget(f"capsul:{engine_id}:{execution_id}", f"job:{job_uuid}")
)
parameters_location = job.get("parameters_location")
if parameters_location:
result = tuple(i for i in parameters_location if i != "nodes")
if result != ("directories_creation",):
yield result

def dispose(self, engine_id, execution_id, bypass_persistence=False):
keys = [f"capsul:{engine_id}", f"capsul:{engine_id}:{execution_id}"]
args = [execution_id, int(bool(bypass_persistence))]
Expand Down
19 changes: 19 additions & 0 deletions capsul/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,25 @@ def run(self, executable, timeout=None, print_report=False, debug=False, **kwarg
self.dispose(execution_id)
return status

def prepare_pipeline_for_retry(self, pipeline, execution_id):
"""Modify a pipeline given a previous execution to select only the nodes that
weren't successful. Running the pipeline after this step will retry the
execution of faile jobs. This method adds (or modify if it exists) an unselectd
pipeline step called "succesfully_executed" containing all nodes that were
succesfully executed.
""" ""
successful_nodes = []
for path in self.database.successful_node_paths(self.engine_id, execution_id):
successful_nodes.append(pipeline.node_from_path(path).name)
step_field = None
if pipeline.field("pipeline_steps"):
step_field = pipeline.pipeline_steps.fields("succesfully_executed")
if step_field is None:
pipeline.add_pipeline_step("succesfully_executed", successful_nodes, False)
else:
step_field.nodes = successful_nodes
setattr(pipeline.pipeline_steps, "succesfully_executed", False)


class Workers(Controller):
def __init__(self, engine_label, engine_config, database):
Expand Down
16 changes: 16 additions & 0 deletions capsul/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2449,6 +2449,13 @@ def __setattr__(self, name, value):
self.dispatch_value(self, name, value)
return result

def __setitem__(self, path, value):
path = path.split(".")
node_path = path[:-1]
node = self.node_from_path(node_path)
setattr(node, path[-1], value)
self.dispatch_value(node, path[-1], value)

def dispatch_value(self, node, name, value):
"""Propagate the value from a pipeline plug through links"""
# print(f"!dispatch! {node.name}.{name} = {value}")
Expand Down Expand Up @@ -2650,6 +2657,15 @@ def import_json(self, json):
):
self.dispatch_value(node, names[-1], json_value)

def node_from_path(self, path):
node = self
for path_item in path:
if isinstance(node, ProcessIteration):
node = node.process
else:
node = node.nodes[path_item]
return node


class CustomPipeline(Pipeline):
def __init__(self, definition="custom_pipeline", json_executable={}):
Expand Down

0 comments on commit dfeca74

Please sign in to comment.