Skip to content

Commit

Permalink
preparing API for stop/kill/restart operations (not implemented yet)
Browse files Browse the repository at this point in the history
  • Loading branch information
denisri committed Oct 13, 2023
1 parent e6f9524 commit 087a107
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 115 deletions.
95 changes: 32 additions & 63 deletions capsul/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
from ..execution_context import ExecutionContext
from ..pipeline.pipeline import Process, Pipeline


database_classes = {
'sqlite': 'capsul.database.populse_db:Populse_dbExecutionDatabase',
'redis': 'capsul.database.redis:RedisExecutionDatabase',
'redis+socket': 'capsul.database.redis:RedisExecutionDatabase',
}


class URL:
pattern = re.compile(
r'^(?P<scheme>[^:]+)://'
Expand All @@ -39,7 +41,7 @@ def __init__(self, string):
raise ValueError(f'Invalid URL: {string}')
for k, v in m.groupdict().items():
setattr(self, k, v)

def __str__(self):
if self.login:
if self.password:
Expand Down Expand Up @@ -152,7 +154,8 @@ def workers_command(self, engine_id):
db_config]
return workers_command

def new_execution(self, executable, engine_id, execution_context, workflow, start_time):
def new_execution(self, executable, engine_id, execution_context, workflow,
start_time):
executable_json = json_encode(executable.json(include_parameters=False))
execution_context_json = execution_context.json()
workflow_parameters_values_json = json_encode(workflow.parameters_values)
Expand All @@ -179,15 +182,13 @@ def new_execution(self, executable, engine_id, execution_context, workflow, star
)
return execution_id


def _executable_from_json(self, executable_json):
return Capsul.executable(json_decode(executable_json))

def executable(self, execution_id):
j = self.executable_json(execution_id)
if j is not None:
return self._executable_from_json(j)


@staticmethod
def _time_from_json(time_json):
Expand All @@ -196,7 +197,6 @@ def _time_from_json(time_json):
@staticmethod
def _time_to_json(time):
return time.isoformat()


def _job_from_json(self, job):
for k in ('start_time', 'end_time'):
Expand All @@ -205,22 +205,19 @@ def _job_from_json(self, job):
job[k] = self._time_from_json(t)
return job


def _job_to_json(self, job):
for k in ('start_time', 'end_time'):
t = job.get(k)
if t:
job[k] = self._time_to_json(t)
return job


def job(self, engine_id, execution_id, job_uuid):
j = self.job_json(engine_id, execution_id, job_uuid)
if j:
return self._job_from_json(j)
return None


def execution_report(self, engine_id, execution_id):
report = self.execution_report_json(engine_id, execution_id)
execution_context = report['execution_context']
Expand Down Expand Up @@ -400,29 +397,23 @@ def update_executable(self, engine_id, execution_id, executable):
if enable_parameter_links is not None:
executable.enable_parameter_links = enable_parameter_links


@property
def is_ready(self):
raise NotImplementedError


@property
def is_connected(self):
raise NotImplementedError


def engine_id(self, label):
raise NotImplementedError


def _enter(self):
raise NotImplementedError


def _exit(self):
raise NotImplementedError


def get_or_create_engine(self, engine, update_database=False):
'''
If engine with given label is in the database, simply return its
Expand All @@ -434,7 +425,6 @@ def get_or_create_engine(self, engine, update_database=False):
'''
raise NotImplementedError


def engine_connections(self, engine_id):
'''
Return the current number of active connections to
Expand All @@ -443,21 +433,18 @@ def engine_connections(self, engine_id):
'''
raise NotImplementedError


def engine_config(self, engine_id):
'''
Return the configuration dict stored for an engine
'''
raise NotImplementedError


def workers_count(self, engine_id):
'''
Return the number of workers that are running.
'''
raise NotImplementedError


def worker_database_config(self, engine_id):
'''
Return database connection settings for workers. This
Expand All @@ -467,46 +454,40 @@ def worker_database_config(self, engine_id):
'''
raise NotImplementedError


def worker_started(self, engine_id):
'''
Register a new worker that had been started for this engine and
return an identifier for it.
'''
raise NotImplementedError


def worker_ended(self, engine_id, worker_id):
'''
Remove a worker from the list of workers for this engine.
'''
raise NotImplementedError


def persistent(self, engine_id):
'''
Return wether an engine is persistent or not.
'''
raise NotImplementedError


def set_persistent(self, engine_id, persistent):
'''
Sets the persitency status of an engine.
'''
raise NotImplementedError


def dispose_engine(self, engine_id):
'''
Tell Capsul that this engine will not be used anymore by any client.
The ressource it uses must be freed as soon as possible. If no
The ressource it uses must be freed as soon as possible. If no
execution is running, engine is destroyed. Otherwise, workers will
process ongoing executions and cleanup when done.
'''
raise NotImplementedError


def executions_summary(self, engine_id):
'''
Returns a JSON-compatible list whose elements contains some
Expand All @@ -524,105 +505,93 @@ def executions_summary(self, engine_id):
'''
raise NotImplementedError


def store_execution(self,
engine_id,
label,
start_time,
executable_json,
execution_context_json,
workflow_parameters_values_json,
workflow_parameters_dict_json,
jobs,
ready,
waiting
):
engine_id,
label,
start_time,
executable_json,
execution_context_json,
workflow_parameters_values_json,
workflow_parameters_dict_json,
jobs,
ready,
waiting):
raise NotImplementedError


def execution_context(self, engine_id, execution_id):
j = self.execution_context_json(engine_id, execution_id)
if j is not None:
return ExecutionContext(config=j)


def execution_context_json(self, engine_id, execution_id):
raise NotImplementedError


def pop_job(self, engine_id, start_time):
'''
Convert its parameters to JSON and calls pop_job_json()
'''
return self.pop_job_json(engine_id, self._time_to_json(start_time))



def pop_job_json(self, engine_id, start_time):
raise NotImplementedError


def get_job_input_parameters(self, engine_id, execution_id, job_uuid):
'''
Return a dictionary of input parameters to use for a job. Also store
the returned dict with the job to ease the creation of job execution
monitoring tools.
'''
raise NotImplementedError


def job_finished(self, engine_id, execution_id, job_uuid, end_time, return_code, stdout, stderr):
def job_finished(self, engine_id, execution_id, job_uuid, end_time,
return_code, stdout, stderr):
'''
Convert its parameters to JSON and calls job_finished_json()
'''
self.job_finished_json(engine_id, execution_id, job_uuid,
self._time_to_json(end_time), return_code, stdout, stderr)

self.job_finished_json(engine_id, execution_id, job_uuid,
self._time_to_json(end_time), return_code,
stdout, stderr)

def job_finished_json(self, engine_id, execution_id, job_uuid, end_time, return_code, stdout, stderr):
def job_finished_json(self, engine_id, execution_id, job_uuid, end_time,
return_code, stdout, stderr):
raise NotImplementedError



def status(self, engine_id, execution_id):
raise NotImplementedError


def workflow_parameters_values(self, engine_id, execution_id):
return json_decode(self.workflow_parameters_values_json(engine_id, execution_id))

return json_decode(self.workflow_parameters_values_json(engine_id,
execution_id))

def workflow_parameters_values_json(self, engine_id, execution_id):
raise NotImplementedError


def workflow_parameters_dict_json(self, engine_id, execution_id):
raise NotImplementedError


def job_json(self, engine_id, execution_id, job_uuid):
raise NotImplementedError


def execution_report_json(self, engine_id, execution_id):
raise NotImplementedError


def dispose(self, engine_id, execution_id):
raise NotImplementedError


def start_execution(self, engine_id, execution_id, tmp):
raise NotImplementedError


def end_execution(self, engine_id, execution_id):
raise NotImplementedError


def tmp(self, engine_id, execution_id):
raise NotImplementedError


def error(self, engine_id, execution_id):
raise NotImplementedError

def stop_execution(self, engine_id, execution_id):
raise NotImplementedError

def kill_jobs(self, engine_id, execution_id, job_ids):
raise NotImplementedError
Loading

0 comments on commit 087a107

Please sign in to comment.