Skip to content

Commit

Permalink
Fixed persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
sapetnioc committed Dec 6, 2023
1 parent f6aec6b commit 1f32422
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 15 deletions.
7 changes: 5 additions & 2 deletions capsul/database/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def _enter(self):
redis.call('hset', execution_key, 'dispose', 1)
if (redis.call('hget', execution_key, 'status') == 'ended') and
(bypass_persistence or (redis.call('hget', engine_key, 'persistent') == '0')) then
(bypass_persistence ~= 0 or (redis.call('hget', engine_key, 'persistent') == '0')) then
redis.call('del', execution_key)
local executions = cjson.decode(redis.call('hget', engine_key, 'executions'))
table.remove(executions, table_find(executions, execution_id))
Expand Down Expand Up @@ -837,7 +837,10 @@ def end_execution(self, engine_id, execution_id):
self.redis.hset(f"capsul:{engine_id}:{execution_id}", "status", "ended")
tmp = self.redis.hget(f"capsul:{engine_id}:{execution_id}", "tmp")
self.redis.hdel(f"capsul:{engine_id}:{execution_id}", "tmp")
if self.redis.hget(f"capsul:{engine_id}:{execution_id}", "dispose"):
if (
self.redis.hget(f"capsul:{engine_id}:{execution_id}", "dispose")
and not self.persistent
):
executions = json.loads(
self.redis.hget(f"capsul:{engine_id}", "executions")
)
Expand Down
2 changes: 2 additions & 0 deletions capsul/database/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def get_or_create_engine(self, engine, update_database=False):
sql = "SELECT engine_id FROM capsul_engine WHERE label=?"
row = sqlite.execute(sql, [engine.label]).fetchone()
if row:
print("!sqlite update!", engine.config.persistent)
engine_id = row[0]
if update_database:
# Update configuration stored in database
Expand All @@ -171,6 +172,7 @@ def get_or_create_engine(self, engine, update_database=False):
],
)
else:
print("!sqlite create!", engine.config.persistent)
# Create new engine in database
engine_id = str(uuid4())
sql = (
Expand Down
24 changes: 16 additions & 8 deletions capsul/dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
Metadata handling and attributes-based path generation system. In other words, this module is the completion system for Capsul processes and other executables.
Metadata handling and attributes-based path generation system. In other words,
this module is the completion system for Capsul processes and other
executables.
The main function to be used contains most of the doc: see :func:`generate_paths`
"""
Expand Down Expand Up @@ -29,9 +31,11 @@
class Dataset(Controller):
"""
Dataset representation.
You don't need to define or instantiate this class yourself, it will be done automatically and internally in the path generation system.
You don't need to define or instantiate this class yourself, it will be
done automatically and internally in the path generation system.
Instead, users need to define datasets in the Capsul config. See :func:`generate_paths`.
Instead, users need to define datasets in the Capsul config. See
:func:`generate_paths`.
"""

path: Directory
Expand Down Expand Up @@ -530,9 +534,10 @@ def map_schemas(source, dest):


class process_schema:
"""Decorator used to register functions that defines how
path parameters can be generated for an executable in the
context of a dataset schema::
"""Decorator used to register functions that defines how path parameters can
be generated for a process or pipeline in the context of a dataset schema.
These process schema functions are typically written by pipeline developers.
The following example gives a typical syntax to use this decorator::
from capsul.api import Process, process_schema
from soma.controller import File, field
Expand All @@ -547,8 +552,8 @@ def execute(self, context):
@process_schema('bids', MyProcess)
def bids_MyProcess(executable, metadata):
metadata.output = metadata.input
metadata.output.prefix.prepend("my_process")
"""

modifier_function = {}
Expand Down Expand Up @@ -654,7 +659,10 @@ def __call__(self, function):


class MetadataModification:
"""Record a simple modification request of process metadata"""
"""This class aims to offer pipeline developers an uncomplicated syntax for
defining modifications to a MetadataSchema. These modifications are intended to
be applied in a sequence of chained operations whose order is determined
dynamically at runtime based on the pipeline structure and parameters."""

def __init__(
self,
Expand Down
9 changes: 4 additions & 5 deletions capsul/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def execution_context(engine_label, engine_config, executable):
f"valid configuration for module {module_name}"
)
if len(valid_module_configs) > 1:
print(f"several {module_name} valid condfigs:")
for field, v in valid_module_configs.items():
print(field.name, ":", v.asdict())
print(valid_module_configs)
# print(f"several {module_name} valid condfigs:")
# for field, v in valid_module_configs.items():
# print(field.name, ":", v.asdict())
# print(valid_module_configs)
raise RuntimeError(
f'Execution environment "{engine_label}" has '
f"{len(valid_configs)} possible configurations for "
Expand Down Expand Up @@ -353,7 +353,6 @@ def restart(self, execution_id):
"""Restart a workflow which has failed or has been stopped, and is
thus not currently running.
"""
print("Engine.restart")
raise NotImplementedError()

def raise_for_status(self, *args, **kwargs):
Expand Down

0 comments on commit 1f32422

Please sign in to comment.