diff --git a/capsul/database/__init__.py b/capsul/database/__init__.py index 4620afe3d..25c0282ee 100644 --- a/capsul/database/__init__.py +++ b/capsul/database/__init__.py @@ -16,12 +16,14 @@ from ..execution_context import ExecutionContext from ..pipeline.pipeline import Process, Pipeline + database_classes = { 'sqlite': 'capsul.database.populse_db:Populse_dbExecutionDatabase', 'redis': 'capsul.database.redis:RedisExecutionDatabase', 'redis+socket': 'capsul.database.redis:RedisExecutionDatabase', } + class URL: pattern = re.compile( r'^(?P[^:]+)://' @@ -39,7 +41,7 @@ def __init__(self, string): raise ValueError(f'Invalid URL: {string}') for k, v in m.groupdict().items(): setattr(self, k, v) - + def __str__(self): if self.login: if self.password: @@ -152,7 +154,8 @@ def workers_command(self, engine_id): db_config] return workers_command - def new_execution(self, executable, engine_id, execution_context, workflow, start_time): + def new_execution(self, executable, engine_id, execution_context, workflow, + start_time): executable_json = json_encode(executable.json(include_parameters=False)) execution_context_json = execution_context.json() workflow_parameters_values_json = json_encode(workflow.parameters_values) @@ -179,15 +182,13 @@ def new_execution(self, executable, engine_id, execution_context, workflow, star ) return execution_id - def _executable_from_json(self, executable_json): return Capsul.executable(json_decode(executable_json)) - + def executable(self, execution_id): j = self.executable_json(execution_id) if j is not None: return self._executable_from_json(j) - @staticmethod def _time_from_json(time_json): @@ -196,7 +197,6 @@ def _time_from_json(time_json): @staticmethod def _time_to_json(time): return time.isoformat() - def _job_from_json(self, job): for k in ('start_time', 'end_time'): @@ -205,7 +205,6 @@ def _job_from_json(self, job): job[k] = self._time_from_json(t) return job - def _job_to_json(self, job): for k in ('start_time', 'end_time'): t = job.get(k) @@ -213,14 +212,12 @@ def _job_to_json(self, job): job[k] = self._time_to_json(t) return job - def job(self, engine_id, execution_id, job_uuid): j = self.job_json(engine_id, execution_id, job_uuid) if j: return self._job_from_json(j) return None - def execution_report(self, engine_id, execution_id): report = self.execution_report_json(engine_id, execution_id) execution_context = report['execution_context'] @@ -400,29 +397,23 @@ def update_executable(self, engine_id, execution_id, executable): if enable_parameter_links is not None: executable.enable_parameter_links = enable_parameter_links - @property def is_ready(self): raise NotImplementedError - @property def is_connected(self): raise NotImplementedError - def engine_id(self, label): raise NotImplementedError - def _enter(self): raise NotImplementedError - def _exit(self): raise NotImplementedError - def get_or_create_engine(self, engine, update_database=False): ''' If engine with given label is in the database, simply return its @@ -434,7 +425,6 @@ def get_or_create_engine(self, engine, update_database=False): ''' raise NotImplementedError - def engine_connections(self, engine_id): ''' Return the current number of active connections to @@ -443,21 +433,18 @@ def engine_connections(self, engine_id): ''' raise NotImplementedError - def engine_config(self, engine_id): ''' Return the configuration dict stored for an engine ''' raise NotImplementedError - def workers_count(self, engine_id): ''' Return the number of workers that are running. ''' raise NotImplementedError - def worker_database_config(self, engine_id): ''' Return database connection settings for workers. This @@ -467,7 +454,6 @@ def worker_database_config(self, engine_id): ''' raise NotImplementedError - def worker_started(self, engine_id): ''' Register a new worker that had been started for this engine and @@ -475,38 +461,33 @@ def worker_started(self, engine_id): ''' raise NotImplementedError - def worker_ended(self, engine_id, worker_id): ''' Remove a worker from the list of workers for this engine. ''' raise NotImplementedError - def persistent(self, engine_id): ''' Return wether an engine is persistent or not. ''' raise NotImplementedError - def set_persistent(self, engine_id, persistent): ''' Sets the persitency status of an engine. ''' raise NotImplementedError - def dispose_engine(self, engine_id): ''' Tell Capsul that this engine will not be used anymore by any client. - The ressource it uses must be freed as soon as possible. If no + The ressource it uses must be freed as soon as possible. If no execution is running, engine is destroyed. Otherwise, workers will process ongoing executions and cleanup when done. ''' raise NotImplementedError - def executions_summary(self, engine_id): ''' Returns a JSON-compatible list whose elements contains some @@ -524,44 +505,36 @@ def executions_summary(self, engine_id): ''' raise NotImplementedError - def store_execution(self, - engine_id, - label, - start_time, - executable_json, - execution_context_json, - workflow_parameters_values_json, - workflow_parameters_dict_json, - jobs, - ready, - waiting - ): + engine_id, + label, + start_time, + executable_json, + execution_context_json, + workflow_parameters_values_json, + workflow_parameters_dict_json, + jobs, + ready, + waiting): raise NotImplementedError - def execution_context(self, engine_id, execution_id): j = self.execution_context_json(engine_id, execution_id) if j is not None: return ExecutionContext(config=j) - def execution_context_json(self, engine_id, execution_id): raise NotImplementedError - def pop_job(self, engine_id, start_time): ''' Convert its parameters to JSON and calls pop_job_json() ''' return self.pop_job_json(engine_id, self._time_to_json(start_time)) - - def pop_job_json(self, engine_id, start_time): raise NotImplementedError - def get_job_input_parameters(self, engine_id, execution_id, job_uuid): ''' Return a dictionary of input parameters to use for a job. Also store @@ -569,60 +542,56 @@ def get_job_input_parameters(self, engine_id, execution_id, job_uuid): monitoring tools. ''' raise NotImplementedError - - def job_finished(self, engine_id, execution_id, job_uuid, end_time, return_code, stdout, stderr): + def job_finished(self, engine_id, execution_id, job_uuid, end_time, + return_code, stdout, stderr): ''' Convert its parameters to JSON and calls job_finished_json() ''' - self.job_finished_json(engine_id, execution_id, job_uuid, - self._time_to_json(end_time), return_code, stdout, stderr) - + self.job_finished_json(engine_id, execution_id, job_uuid, + self._time_to_json(end_time), return_code, + stdout, stderr) - def job_finished_json(self, engine_id, execution_id, job_uuid, end_time, return_code, stdout, stderr): + def job_finished_json(self, engine_id, execution_id, job_uuid, end_time, + return_code, stdout, stderr): raise NotImplementedError - - def status(self, engine_id, execution_id): raise NotImplementedError - def workflow_parameters_values(self, engine_id, execution_id): - return json_decode(self.workflow_parameters_values_json(engine_id, execution_id)) - + return json_decode(self.workflow_parameters_values_json(engine_id, + execution_id)) def workflow_parameters_values_json(self, engine_id, execution_id): raise NotImplementedError - def workflow_parameters_dict_json(self, engine_id, execution_id): raise NotImplementedError - def job_json(self, engine_id, execution_id, job_uuid): raise NotImplementedError - def execution_report_json(self, engine_id, execution_id): raise NotImplementedError - def dispose(self, engine_id, execution_id): raise NotImplementedError - def start_execution(self, engine_id, execution_id, tmp): raise NotImplementedError - def end_execution(self, engine_id, execution_id): raise NotImplementedError - def tmp(self, engine_id, execution_id): raise NotImplementedError - def error(self, engine_id, execution_id): raise NotImplementedError + + def stop_execution(self, engine_id, execution_id): + raise NotImplementedError + + def kill_jobs(self, engine_id, execution_id, job_ids): + raise NotImplementedError diff --git a/capsul/database/redis.py b/capsul/database/redis.py index 52247154f..5cf45a7eb 100644 --- a/capsul/database/redis.py +++ b/capsul/database/redis.py @@ -561,26 +561,24 @@ def executions_summary(self, engine_id): self.redis.hget(execution_key, 'done'))), 'failed': len(json.loads( self.redis.hget(execution_key, 'failed'))), - 'engine_label': self.redis.hget(f'capsul:{engine_id}', 'label'), + 'engine_label': self.redis.hget(f'capsul:{engine_id}', + 'label'), 'execution_id': execution_id, } result.append(info) return result - - def store_execution(self, - engine_id, - label, - start_time, - executable_json, - execution_context_json, - workflow_parameters_values_json, - workflow_parameters_dict_json, - jobs, - ready, - waiting - ): + engine_id, + label, + start_time, + executable_json, + execution_context_json, + workflow_parameters_values_json, + workflow_parameters_dict_json, + jobs, + ready, + waiting): execution_id = str(uuid4()) keys = [ f'capsul:{engine_id}', @@ -601,13 +599,12 @@ def store_execution(self, self._store_execution(keys=keys, args=args) return execution_id - def execution_context_json(self, engine_id, execution_id): - return json.loads(self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'execution_context')) - + return json.loads(self.redis.hget(f'capsul:{engine_id}:{execution_id}', + 'execution_context')) def pop_job_json(self, engine_id, start_time): - executions = self.redis.hget(f'capsul:{engine_id}','executions') + executions = self.redis.hget(f'capsul:{engine_id}', 'executions') if executions is None: # engine_id does not exists anymore # return None to say to workers that they can die @@ -615,11 +612,12 @@ def pop_job_json(self, engine_id, start_time): else: all_disposed = True for execution_id in json.loads(executions): - all_disposed = all_disposed and self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'dispose') + all_disposed = all_disposed and self.redis.hget( + f'capsul:{engine_id}:{execution_id}', 'dispose') keys = [ f'capsul:{engine_id}:{execution_id}' ] - args=[start_time] + args = [start_time] job_uuid = self._pop_job(keys=keys, args=args) if job_uuid: return execution_id, job_uuid @@ -630,34 +628,34 @@ def pop_job_json(self, engine_id, start_time): # Empty string means "no job ready yet" return '', '' - def job_finished_json(self, engine_id, execution_id, job_uuid, + def job_finished_json(self, engine_id, execution_id, job_uuid, end_time, return_code, stdout, stderr): keys = [ f'capsul:{engine_id}', f'capsul:{engine_id}:{execution_id}' ] args = [ - execution_id, - job_uuid, - end_time, - return_code, - stdout, + execution_id, + job_uuid, + end_time, + return_code, + stdout, stderr ] self._job_finished(keys=keys, args=args) - def status(self, engine_id, execution_id): return self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'status') - def workflow_parameters_values_json(self, engine_id, execution_id): - return json.loads(self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'workflow_parameters_values') or 'null') - + return json.loads(self.redis.hget( + f'capsul:{engine_id}:{execution_id}', 'workflow_parameters_values') + or 'null') def workflow_parameters_dict(self, engine_id, execution_id): - return json.loads(self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'workflow_parameters_dict') or 'null') - + return json.loads(self.redis.hget( + f'capsul:{engine_id}:{execution_id}', 'workflow_parameters_dict') + or 'null') def get_job_input_parameters(self, engine_id, execution_id, job_uuid): values = self.workflow_parameters_values_json(engine_id, execution_id) @@ -670,25 +668,28 @@ def get_job_input_parameters(self, engine_id, execution_id, job_uuid): else: result[k] = values[i] job['input_parameters'] = result - self.redis.hset(f'capsul:{engine_id}:{execution_id}', f'job:{job_uuid}', json.dumps(job)) - return result - + self.redis.hset(f'capsul:{engine_id}:{execution_id}', + f'job:{job_uuid}', json.dumps(job)) + return result - def set_job_output_parameters(self, engine_id, execution_id, job_uuid, output_parameters): + def set_job_output_parameters(self, engine_id, execution_id, job_uuid, + output_parameters): keys = [ f'capsul:{engine_id}:{execution_id}' ] args = [ - job_uuid, + job_uuid, json.dumps(output_parameters), ] self._set_job_output_parameters(keys=keys, args=args) - def job_json(self, engine_id, execution_id, job_uuid): - job = json.loads(self.redis.hget(f'capsul:{engine_id}:{execution_id}', f'job:{job_uuid}')) + job = json.loads(self.redis.hget(f'capsul:{engine_id}:{execution_id}', + f'job:{job_uuid}')) return job + def kill_jobs(self, engine_id, execution_id, job_ids): + raise NotImplementedError def execution_report_json(self, engine_id, execution_id): (label, execution_context, status, error, @@ -724,7 +725,6 @@ def execution_report_json(self, engine_id, execution_id): return result - def dispose(self, engine_id, execution_id): keys = [ f'capsul:{engine_id}', @@ -735,7 +735,6 @@ def dispose(self, engine_id, execution_id): ] self._dispose(keys=keys, args=args) - def check_shutdown(self): try: if self._check_shutdown(): @@ -750,7 +749,7 @@ def check_shutdown(self): keys = self.redis.keys('capsul:*') if not keys or keys == ['capsul:shutting_down']: # Nothing in the database, do not save it - save= False + save = False else: save = True self.redis.save() @@ -772,32 +771,39 @@ def check_shutdown(self): exc.with_traceback(e.__traceback__) raise exc from None - def start_execution(self, engine_id, execution_id, tmp): self.redis.hset(f'capsul:{engine_id}:{execution_id}', 'tmp', tmp) - self.redis.hset(f'capsul:{engine_id}:{execution_id}', 'status', 'running') + self.redis.hset(f'capsul:{engine_id}:{execution_id}', 'status', + 'running') + def stop_execution(self, engine_id, execution_id): + status = self.status(engine_id, execution_id) + if status == 'running': + self.redis.hset(f'capsul:{engine_id}:{execution_id}', 'error', + 'Aborted by user') + self.redis.hset(f'capsul:{engine_id}:{execution_id}', 'status', + 'finalization') def end_execution(self, engine_id, execution_id): - self.redis.hset(f'capsul:{engine_id}:{execution_id}', 'status', 'ended') + self.redis.hset(f'capsul:{engine_id}:{execution_id}', 'status', + 'ended') tmp = self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'tmp') self.redis.hdel(f'capsul:{engine_id}:{execution_id}', 'tmp') if self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'dispose'): - executions = json.loads(self.redis.hget(f'capsul:{engine_id}', 'executions')) + executions = json.loads(self.redis.hget(f'capsul:{engine_id}', + 'executions')) executions.remove(execution_id) - self.redis.hset(f'capsul:{engine_id}', 'executions', json.dumps(executions)) + self.redis.hset(f'capsul:{engine_id}', 'executions', + json.dumps(executions)) self.redis.delete(f'capsul:{engine_id}:{execution_id}') - if not executions and not self.redis.hget(f'capsul:{engine_id}', 'label'): + if not executions and not self.redis.hget(f'capsul:{engine_id}', + 'label'): # Engine is already disopsed: delete it self.redis.delete(f'capsul:{engine_id}') return tmp - def tmp(self, engine_id, execution_id): return self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'tmp') - def error(self, engine_id, execution_id): return self.redis.hget(f'capsul:{engine_id}:{execution_id}', 'error') - - diff --git a/capsul/engine/__init__.py b/capsul/engine/__init__.py index 8339d266a..9da2179e0 100644 --- a/capsul/engine/__init__.py +++ b/capsul/engine/__init__.py @@ -271,6 +271,52 @@ def wait(self, execution_id, *args, **kwargs): ''' self.database.wait(self.engine_id, execution_id, *args, **kwargs) + def stop(self, execution_id, kill_running=True): + ''' Stop a running execution + ''' + self.database.stop_execution(self.engine_id, execution_id) + if kill_running: + self.kill_jobs(execution_id) + + def kill_jobs(self, execution_id, job_ids=None): + ''' Kill running jobs during execution + + Passing None as the job_ids argument kills all currently running jobs + ''' + self.database.kill_jobs(self.engine_id, execution_id, job_ids) + + def restart_jobs(self, execution_id: str, job_ids: list[str], + force_stop: bool = False, + allow_restart_execution: bool = False): + ''' Restart jobs which have been stopped or have failed. + + Jobs are reset to ready or waiting state in the execution workflow, + thus can be run again when their dependencies are satisfied. + + Parameters + ---------- + execution_id: str + execution ID + job_ids: list[str] + list of jobs to be restarted + force_stop: bool + if True, jobs in the job_ids list which are currently running are + killed then reset to ready state. Otherwise a running job is not + modified (we let it finish and do not restart it) + allow_restart_execution: bool + if the execution workflow is stopped, by default only the jobs + state is modified, the workdlow is left waiting. If + allow_restart_execution is True, then restart() is called and the + workflow starts execution again. + ''' + raise NotImplementedError() + + def restart(self, execution_id): + ''' Restart a workflow which has failed or has been stopped, and is + thus not currently running. + ''' + raise NotImplementedError() + def raise_for_status(self, *args, **kwargs): ''' Raises an exception according to the execution status. '''