Skip to content

Commit

Permalink
Fixed populse_db implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sapetnioc committed Mar 19, 2024
1 parent 680527c commit 8cac1e9
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 49 deletions.
2 changes: 1 addition & 1 deletion capsul/config/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class EngineConfiguration(Controller):
python_modules: list[str]

database: str = "builtin"
persistent: bool = True
persistent: bool

start_workers: field(
type_=dict, default_factory=lambda: default_engine_start_workers
Expand Down
5 changes: 0 additions & 5 deletions capsul/config/test/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def test_single_configuration(self):
"builtin": {
"config_modules": ["spm", "matlab"],
"database": "builtin",
"persistent": True,
"start_workers": default_engine_start_workers,
"matlab": {},
"spm": {
Expand Down Expand Up @@ -66,7 +65,6 @@ def test_single_configuration(self):
"site": {
"builtin": {
"database": "builtin",
"persistent": True,
"start_workers": default_engine_start_workers,
},
"databases": {
Expand All @@ -84,7 +82,6 @@ def test_config_as_dict(self):
"builtin": {
"config_modules": ["spm", "matlab"],
"database": "builtin",
"persistent": True,
"start_workers": default_engine_start_workers,
"matlab": {},
"spm": {
Expand Down Expand Up @@ -112,7 +109,6 @@ def test_config_as_dict(self):
"app_name": "single_conf",
"site": {
"builtin": {
"persistent": True,
"database": "builtin",
"start_workers": default_engine_start_workers,
},
Expand Down Expand Up @@ -162,7 +158,6 @@ def test_config_merge(self):
"builtin": {
"config_modules": ["spm", "matlab"],
"database": "builtin",
"persistent": True,
"start_workers": default_engine_start_workers,
"matlab": {},
"spm": {
Expand Down
1 change: 0 additions & 1 deletion capsul/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

database_classes = {
"populse-db": "capsul.database.populse_db:PopulseDBExecutionDatabase",
"sqlite": "capsul.database.sqlite:SQliteExecutionDatabase",
"redis": "capsul.database.redis:RedisExecutionDatabase",
"redis+socket": "capsul.database.redis:RedisExecutionDatabase",
}
Expand Down
53 changes: 30 additions & 23 deletions capsul/database/populse_db.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import os
import tempfile
from contextlib import contextmanager
from datetime import datetime
from uuid import uuid4

from populse_db import Storage
from soma.undefined import undefined

from . import ExecutionDatabase

Expand Down Expand Up @@ -114,22 +114,27 @@ def _enter(self):
db.capsul_connection[self.uuid] = {"date": datetime.now()}

def _exit(self):
with self.storage.data(write=True) as db:
del db.capsul_connection[self.uuid]
if os.path.exists(self.path):
with self.storage.data(write=True) as db:
del db.capsul_connection[self.uuid]
self.check_shutdown()

def get_or_create_engine(self, engine, update_database=False):
with self.storage.data(write=True) as db:
row = db.capsul_engine.search(
label=engine.label, fields=["engine_id"], as_list=True
)
persistent = engine.config.persistent
if persistent is undefined:
persistent = db.tmp.get() is None
if row:
engine_id = row[0]
if update_database:
# Update configuration stored in database
db.capsul_engine[engine_id].update(
{
"congig": engine.config.json(),
"persistent": engine.config.persistent,
"persistent": persistent,
}
)
else:
Expand All @@ -143,7 +148,7 @@ def get_or_create_engine(self, engine, update_database=False):
"workers": [],
# executions: list of execution_id
"executions": [],
"persistent": engine.config.persistent,
"persistent": persistent,
# connections: number of connections
"connections": 0,
}
Expand Down Expand Up @@ -188,14 +193,12 @@ def worker_started(self, engine_id):

def worker_ended(self, engine_id, worker_id):
with self.storage.data(write=True) as db:
if not db:
return
workers = db.capsul_engine[engine_id].workers.get()
if workers:
if workers and worker_id in workers:
workers.remove(worker_id)
db.capsul_engine[engine_id].workers = workers
else:
raise ValueError(
f"Invalid engine_id or worker_id: engine_id={engine_id}, worker_id={worker_id}"
)

def persistent(self, engine_id):
if os.path.exists(self.path):
Expand Down Expand Up @@ -233,6 +236,7 @@ def dispose_engine(self, engine_id):
db.capsul_job.search_and_delete(engine_id=engine_id)
else:
db.capsul_engine[engine_id].connections = connections
self.check_shutdown()

def executions_summary(self, engine_id):
result = []
Expand Down Expand Up @@ -318,6 +322,8 @@ def execution_context_json(self, engine_id, execution_id):
].execution_context.get()

def pop_job_json(self, engine_id, start_time):
if not os.path.exists(self.path):
return None, None
with self.storage.data(write=True) as db:
executions = db.capsul_engine[engine_id].executions.get()
if executions is None:
Expand Down Expand Up @@ -540,22 +546,22 @@ def failed_node_paths(self, engine_id, execution_id):
if result != ("directories_creation",):
yield result

def dispose(self, engine_id, execution_id):
def dispose(self, engine_id, execution_id, bypass_persistence=False):
with self.storage.data(write=True) as db:
db.capsul_execution[engine_id, execution_id].dispose = True
status = db.capsul_execution[engine_id, execution_id].status.get()
if status == "ended":
persistent, executions = db.capsul_engine[engine_id].get(
fields=["persistent", "executions"], as_list=True
if status == "ended" and (
bypass_persistence or not db.capsul_engine[engine_id].persistent.get()
):
db.capsul_job.search_and_delete(
engine_id=engine_id, execution_id=execution_id
)
if not persistent:
db.capsul_job.search_and_delete(
engine_id=engine_id, execution_id=execution_id
)
db.capsul_execution.search_and_delete(
engine_id=engine_id, execution_id=execution_id
)
executions.remove(execution_id)
db.capsul_engine[engine_id].executions = executions
db.capsul_execution.search_and_delete(
engine_id=engine_id, execution_id=execution_id
)
executions = db.capsul_engine[engine_id].executions.get()
executions.remove(execution_id)
db.capsul_engine[engine_id].executions = executions

def check_shutdown(self):
database_empty = False
Expand Down Expand Up @@ -603,6 +609,7 @@ def end_execution(self, engine_id, execution_id):
db.capsul_execution.search_and_delete(engine_id=engine_id)
db.capsul_engine.search_and_delete(engine_id=engine_id)
db.capsul_job.search_and_delete(engine_id=engine_id)
self.check_shutdown()
return tmp

def tmp(self, engine_id, execution_id):
Expand Down
22 changes: 14 additions & 8 deletions capsul/engine/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def set_pid(self, pid):
self.job_pid = pid


def worflow_loop(db_config, engine_id):
def worker_loop(db_config, engine_id):
try:
with engine_database(db_config) as database:
worker_id = database.worker_started(engine_id)
poll_thread = PollingThread(database, engine_id)
poll_thread.start()
# print(f'!worker {worker_id}! started', engine_id)
# print(f"!worker {worker_id}! started", engine_id)
try:
execution_id, job_uuid = database.pop_job(
engine_id, start_time=datetime.now()
Expand All @@ -71,8 +71,9 @@ def worflow_loop(db_config, engine_id):
if not job_uuid:
# Empty string means no job available yet
time.sleep(0.2)
# print(f"!worker {worker_id}! wait", (execution_id, job_uuid))
elif job_uuid == "start_execution":
# print(f'!worker {worker_id}! start', execution_id)
# print(f"!worker {worker_id}! start", execution_id)
# This part is done before the processing of any job
tmp = os.path.join(
tempfile.gettempdir(), f"capsul_execution_{execution_id}"
Expand All @@ -83,7 +84,7 @@ def worflow_loop(db_config, engine_id):
except Exception:
os.rmdir(tmp)
elif job_uuid == "end_execution":
# print(f'!worker {worker_id}! end', execution_id)
# print(f"!worker {worker_id}! end", execution_id)
tmp = database.end_execution(engine_id, execution_id)
if tmp and os.path.exists(tmp):
shutil.rmtree(tmp)
Expand All @@ -101,7 +102,12 @@ def worflow_loop(db_config, engine_id):
)
with poll_thread.lock:
poll_thread.job_pid = None
# print(f'!worker {worker_id}! job', execution_id, job_uuid, database.job_finished)
# print(
# f"!worker {worker_id}! job",
# execution_id,
# job_uuid,
# database.job_finished,
# )
database.job_finished(
engine_id,
execution_id,
Expand All @@ -115,7 +121,7 @@ def worflow_loop(db_config, engine_id):
engine_id, start_time=datetime.now()
)
finally:
# print(f'!worker {worker_id}! ended' )
# print(f"!worker {worker_id}! ended")
with poll_thread.lock:
poll_thread.stop_me = True
poll_thread.join()
Expand All @@ -138,10 +144,10 @@ def worflow_loop(db_config, engine_id):
pid = os.fork()
if pid == 0:
os.setsid()
worflow_loop(db_config, engine_id)
worker_loop(db_config, engine_id)

# import cProfile
# import tempfile
# f = tempfile.mktemp(prefix='worker_profile_', dir='/tmp')
# print('!!!', f)
# cProfile.run('worflow_loop(db_config, engine_id)', f)
# cProfile.run('worker_loop(db_config, engine_id)', f)
5 changes: 2 additions & 3 deletions capsul/test/test_fake_morphologist.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def setUp(self):
"test_fake_morphologist",
site_file=self.config_file,
user_file=None,
database_path=osp.join(self.tmp, "capsul_engine_database.rdb"),
database_path=osp.join(self.tmp, "capsul_engine_database.sqlite"),
)
return super().setUp()

Expand All @@ -200,14 +200,13 @@ def test_fake_morphologist_config(self):
expected_config = {
"databases": {
"builtin": {
"path": osp.join(self.tmp, "capsul_engine_database.rdb"),
"path": osp.join(self.tmp, "capsul_engine_database.sqlite"),
"type": default_builtin_database["type"],
}
},
"builtin": {
"config_modules": ["spm", "matlab"],
"database": "builtin",
"persistent": True,
"start_workers": default_engine_start_workers,
"dataset": {
"input": {
Expand Down
9 changes: 7 additions & 2 deletions capsul/test/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_retry_pipeline():
executable.file = tmp_failure.name
executable.output = tmp_result.name

with Capsul().engine() as engine:
with Capsul(database_path="").engine() as engine:
engine.assess_ready_to_start(executable)
execution_id = engine.start(executable)
engine.wait(execution_id, timeout=30)
Expand All @@ -104,6 +104,7 @@ def test_retry_pipeline():
assert error == "Some jobs failed"
assert result == "initial_value\nsuccessful"
engine.prepare_pipeline_for_retry(executable, execution_id)
engine.dispose(execution_id)
execution_id = engine.start(executable)
engine.wait(execution_id, timeout=30)
error = engine.database.error(engine.engine_id, execution_id)
Expand All @@ -112,6 +113,7 @@ def test_retry_pipeline():
assert error is None
assert result == "initial_value\nsuccessful\nmust_restart\nfinal_value"
engine.raise_for_status(execution_id)
engine.dispose(execution_id)


def test_retry_sub_pipeline():
Expand All @@ -123,7 +125,7 @@ def test_retry_sub_pipeline():
executable.file2 = tmp_failure2.name
executable.output = tmp_result.name

with Capsul().engine() as engine:
with Capsul(database_path="").engine() as engine:
engine.assess_ready_to_start(executable)
execution_id = engine.start(executable)
engine.wait(execution_id, timeout=30)
Expand All @@ -133,6 +135,7 @@ def test_retry_sub_pipeline():
assert error == "Some jobs failed"
assert result == "initial_value_1\nsuccessful"
engine.prepare_pipeline_for_retry(executable, execution_id)
engine.dispose(execution_id)
execution_id = engine.start(executable)
engine.wait(execution_id, timeout=30)
error = engine.database.error(engine.engine_id, execution_id)
Expand All @@ -144,6 +147,7 @@ def test_retry_sub_pipeline():
== "initial_value_1\nsuccessful\nmust_restart\nfinal_value\ninitial_value_2\nsuccessful"
)
engine.prepare_pipeline_for_retry(executable, execution_id)
engine.dispose(execution_id)
execution_id = engine.start(executable)
engine.wait(execution_id, timeout=30)
error = engine.database.error(engine.engine_id, execution_id)
Expand All @@ -155,3 +159,4 @@ def test_retry_sub_pipeline():
== "initial_value_1\nsuccessful\nmust_restart\nfinal_value\ninitial_value_2\nsuccessful\nmust_restart\nfinal_value"
)
engine.raise_for_status(execution_id)
engine.dispose(execution_id)
8 changes: 4 additions & 4 deletions capsul/test/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_input_and_output_files(self):
p = capsul.api.executable(PipelineWithInputAndOutputFiles)
p.input_file = "input.txt"
p.output_file = "output.txt"
with Capsul().engine() as ce:
with Capsul(database_path="").engine() as ce:
ce.run(p)
assert p.process_has_run

Expand All @@ -82,15 +82,15 @@ def test_custom_pipeline_with_input_and_output_files(self):
)
p.input = "input.txt"
p.output = "output.txt"
with Capsul().engine() as ce:
with Capsul(database_path="").engine() as ce:
ce.run(p)
assert p.process_has_run

def test_pipeline_loading_from_json_file(self):
p = capsul.api.executable("capsul.test.simple_pipeline_with_output")
with tempfile.NamedTemporaryFile("r") as tmpfile:
p.output_file = tmpfile.name
with Capsul().engine() as ce:
with Capsul(database_path="").engine() as ce:
ce.run(p)
assert tmpfile.read() == "ProcessWithOutputFile has run!\n"

Expand Down Expand Up @@ -119,7 +119,7 @@ def test_intermediate_temporary_file(self):
},
}
)
with Capsul().engine() as ce:
with Capsul(database_path="").engine() as ce:
ce.run(p)
assert p.process_has_run
assert p.name_of_intermediate_file
6 changes: 4 additions & 2 deletions capsul/test/test_tiny_morphologist.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,10 @@ def setUp(self):
json.dump(config, f)

self.capsul = Capsul(
"test_tiny_morphologist", site_file=self.config_file, user_file=None
"test_tiny_morphologist",
site_file=self.config_file,
user_file=None,
database_path="",
)
return super().setUp()

Expand All @@ -392,7 +395,6 @@ def test_tiny_morphologist_config(self):
},
"builtin": {
"database": "builtin",
"persistent": True,
"start_workers": default_engine_start_workers,
"dataset": {
"input": {
Expand Down

0 comments on commit 8cac1e9

Please sign in to comment.