From 1f3242227d63b23424f0d66d392336b9330ea1c3 Mon Sep 17 00:00:00 2001 From: sapetnioc Date: Wed, 6 Dec 2023 15:23:34 +0100 Subject: [PATCH] Fixed persistence --- capsul/database/redis.py | 7 +++++-- capsul/database/sqlite.py | 2 ++ capsul/dataset.py | 24 ++++++++++++++++-------- capsul/engine/__init__.py | 9 ++++----- 4 files changed, 27 insertions(+), 15 deletions(-) diff --git a/capsul/database/redis.py b/capsul/database/redis.py index 9363cef25..51015be0e 100644 --- a/capsul/database/redis.py +++ b/capsul/database/redis.py @@ -352,7 +352,7 @@ def _enter(self): redis.call('hset', execution_key, 'dispose', 1) if (redis.call('hget', execution_key, 'status') == 'ended') and - (bypass_persistence or (redis.call('hget', engine_key, 'persistent') == '0')) then + (bypass_persistence ~= 0 or (redis.call('hget', engine_key, 'persistent') == '0')) then redis.call('del', execution_key) local executions = cjson.decode(redis.call('hget', engine_key, 'executions')) table.remove(executions, table_find(executions, execution_id)) @@ -837,7 +837,10 @@ def end_execution(self, engine_id, execution_id): self.redis.hset(f"capsul:{engine_id}:{execution_id}", "status", "ended") tmp = self.redis.hget(f"capsul:{engine_id}:{execution_id}", "tmp") self.redis.hdel(f"capsul:{engine_id}:{execution_id}", "tmp") - if self.redis.hget(f"capsul:{engine_id}:{execution_id}", "dispose"): + if ( + self.redis.hget(f"capsul:{engine_id}:{execution_id}", "dispose") + and not self.persistent + ): executions = json.loads( self.redis.hget(f"capsul:{engine_id}", "executions") ) diff --git a/capsul/database/sqlite.py b/capsul/database/sqlite.py index 458877f3f..ec8bb1146 100644 --- a/capsul/database/sqlite.py +++ b/capsul/database/sqlite.py @@ -158,6 +158,7 @@ def get_or_create_engine(self, engine, update_database=False): sql = "SELECT engine_id FROM capsul_engine WHERE label=?" row = sqlite.execute(sql, [engine.label]).fetchone() if row: + print("!sqlite update!", engine.config.persistent) engine_id = row[0] if update_database: # Update configuration stored in database @@ -171,6 +172,7 @@ def get_or_create_engine(self, engine, update_database=False): ], ) else: + print("!sqlite create!", engine.config.persistent) # Create new engine in database engine_id = str(uuid4()) sql = ( diff --git a/capsul/dataset.py b/capsul/dataset.py index a2150ed4a..1190b375d 100644 --- a/capsul/dataset.py +++ b/capsul/dataset.py @@ -1,5 +1,7 @@ """ -Metadata handling and attributes-based path generation system. In other words, this module is the completion system for Capsul processes and other executables. +Metadata handling and attributes-based path generation system. In other words, +this module is the completion system for Capsul processes and other +executables. The main function to be used contains most of the doc: see :func:`generate_paths` """ @@ -29,9 +31,11 @@ class Dataset(Controller): """ Dataset representation. - You don't need to define or instantiate this class yourself, it will be done automatically and internally in the path generation system. + You don't need to define or instantiate this class yourself, it will be + done automatically and internally in the path generation system. - Instead, users need to define datasets in the Capsul config. See :func:`generate_paths`. + Instead, users need to define datasets in the Capsul config. See + :func:`generate_paths`. """ path: Directory @@ -530,9 +534,10 @@ def map_schemas(source, dest): class process_schema: - """Decorator used to register functions that defines how - path parameters can be generated for an executable in the - context of a dataset schema:: + """Decorator used to register functions that defines how path parameters can + be generated for a process or pipeline in the context of a dataset schema. + These process schema functions are typically written by pipeline developers. + The following example gives a typical syntax to use this decorator:: from capsul.api import Process, process_schema from soma.controller import File, field @@ -547,8 +552,8 @@ def execute(self, context): @process_schema('bids', MyProcess) def bids_MyProcess(executable, metadata): + metadata.output = metadata.input metadata.output.prefix.prepend("my_process") - """ modifier_function = {} @@ -654,7 +659,10 @@ def __call__(self, function): class MetadataModification: - """Record a simple modification request of process metadata""" + """This class aims to offer pipeline developers an uncomplicated syntax for + defining modifications to a MetadataSchema. These modifications are intended to + be applied in a sequence of chained operations whose order is determined + dynamically at runtime based on the pipeline structure and parameters.""" def __init__( self, diff --git a/capsul/engine/__init__.py b/capsul/engine/__init__.py index c8b79bb81..030ddbb31 100644 --- a/capsul/engine/__init__.py +++ b/capsul/engine/__init__.py @@ -68,10 +68,10 @@ def execution_context(engine_label, engine_config, executable): f"valid configuration for module {module_name}" ) if len(valid_module_configs) > 1: - print(f"several {module_name} valid condfigs:") - for field, v in valid_module_configs.items(): - print(field.name, ":", v.asdict()) - print(valid_module_configs) + # print(f"several {module_name} valid condfigs:") + # for field, v in valid_module_configs.items(): + # print(field.name, ":", v.asdict()) + # print(valid_module_configs) raise RuntimeError( f'Execution environment "{engine_label}" has ' f"{len(valid_configs)} possible configurations for " @@ -353,7 +353,6 @@ def restart(self, execution_id): """Restart a workflow which has failed or has been stopped, and is thus not currently running. """ - print("Engine.restart") raise NotImplementedError() def raise_for_status(self, *args, **kwargs):