diff --git a/dictdatabase/locking.py b/dictdatabase/locking.py index 069088a..74a3bb9 100644 --- a/dictdatabase/locking.py +++ b/dictdatabase/locking.py @@ -11,7 +11,9 @@ # Constants SLEEP_TIMEOUT = 0.001 -LOCK_TIMEOUT = 60.0 # Duration to wait before considering a lock as orphaned. +LOCK_KEEP_ALIVE_TIMEOUT = 0.001 +REMOVE_ORPHAN_LOCK_TIMEOUT = 20.0 # Duration to wait before considering a lock as orphaned. +AQUIRE_LOCK_TIMEOUT = 60.0 def os_touch(path: str) -> None: @@ -50,6 +52,9 @@ def __init__(self, ddb_dir: str, name: str, id: str, time_ns: str, stage: str, m lock_file = f"{name}.{id}.{time_ns}.{stage}.{mode}.lock" self.path = os.path.join(ddb_dir, lock_file) + def __repr__(self) -> str: + return f"LockFileMeta({self.ddb_dir=}, {self.name=}, {self.id=}, {self.time_ns=}, {self.stage=}, {self.mode=})" + def new_with_updated_time(self) -> LockFileMeta: """ Create a new instance with an updated timestamp. @@ -91,7 +96,7 @@ def __init__(self, need_lock: LockFileMeta) -> None: # Remove orphaned locks if lock_meta.path != need_lock.path: lock_age = time.time_ns() - int(lock_meta.time_ns) - if lock_age > LOCK_TIMEOUT * 1_000_000_000: + if lock_age > REMOVE_ORPHAN_LOCK_TIMEOUT * 1_000_000_000: os.unlink(lock_meta.path) print(f"Removed orphaned lock ({lock_meta.path})") continue @@ -129,13 +134,15 @@ class AbstractLock: provides a blueprint for derived classes to implement. """ - __slots__ = ("db_name", "need_lock", "has_lock", "snapshot", "mode") + __slots__ = ("db_name", "need_lock", "has_lock", "snapshot", "mode", "is_alive" "keep_alive_thread") db_name: str need_lock: LockFileMeta has_lock: LockFileMeta snapshot: FileLocksSnapshot mode: str + is_alive: bool + keep_alive_thread: threading.Thread def __init__(self, db_name: str) -> None: # Normalize db_name to avoid file naming conflicts @@ -147,16 +154,53 @@ def __init__(self, db_name: str) -> None: self.need_lock = LockFileMeta(dir, self.db_name, t_id, time_ns, "need", self.mode) self.has_lock = LockFileMeta(dir, self.db_name, t_id, time_ns, "has", self.mode) + self.is_alive = False + self.keep_alive_thread = None + # Ensure lock directory exists if not os.path.isdir(dir): os.makedirs(dir, exist_ok=True) + def _keep_alive_thread(self) -> None: + """ + Keep the lock alive by updating the timestamp of the lock file. + """ + time.sleep(LOCK_KEEP_ALIVE_TIMEOUT) + while self.is_alive: + new_has_lock = self.has_lock.new_with_updated_time() + os_touch(new_has_lock.path) + old_has_lock_path = self.has_lock.path + self.has_lock = new_has_lock + try: + os.unlink(old_has_lock_path) + except FileNotFoundError: + pass + time.sleep(LOCK_KEEP_ALIVE_TIMEOUT) + + def _start_keep_alive_thread(self) -> None: + """ + Start a thread that keeps the lock alive by updating the timestamp of the lock file. + """ + + if self.keep_alive_thread is not None: + raise RuntimeError("Keep alive thread already exists.") + + self.is_alive = True + self.keep_alive_thread = threading.Thread(target=self._keep_alive_thread, daemon=False) + self.keep_alive_thread.start() + def _lock(self) -> None: """Override this method to implement locking mechanism.""" raise NotImplementedError def _unlock(self) -> None: """Remove the lock files associated with this lock.""" + + if self.keep_alive_thread is not None: + self.is_alive = False + self.keep_alive_thread.join() + self.keep_alive_thread = None + for p in ("need_lock", "has_lock"): try: if lock := getattr(self, p, None): @@ -202,9 +246,10 @@ def _lock(self) -> None: self.has_lock = self.has_lock.new_with_updated_time() os_touch(self.has_lock.path) os.unlink(self.need_lock.path) + self._start_keep_alive_thread() return time.sleep(SLEEP_TIMEOUT) - if time.time() - start_time > LOCK_TIMEOUT: + if time.time() - start_time > AQUIRE_LOCK_TIMEOUT: raise RuntimeError("Timeout while waiting for read lock.") self.snapshot = FileLocksSnapshot(self.need_lock) @@ -236,8 +281,9 @@ def _lock(self) -> None: self.has_lock = self.has_lock.new_with_updated_time() os_touch(self.has_lock.path) os.unlink(self.need_lock.path) + self._start_keep_alive_thread() return time.sleep(SLEEP_TIMEOUT) - if time.time() - start_time > LOCK_TIMEOUT: + if time.time() - start_time > AQUIRE_LOCK_TIMEOUT: raise RuntimeError("Timeout while waiting for write lock.") self.snapshot = FileLocksSnapshot(self.need_lock) diff --git a/tests/test_locking.py b/tests/test_locking.py index dc3e013..17beca1 100644 --- a/tests/test_locking.py +++ b/tests/test_locking.py @@ -39,17 +39,26 @@ def test_get_lock_names(use_compression): def test_remove_orphaned_locks(): - prev_config = locking.LOCK_TIMEOUT - locking.LOCK_TIMEOUT = 0.1 + # SLEEP_TIMEOUT = 0.001 + # LOCK_KEEP_ALIVE_TIMEOUT = 0.001 + # REMOVE_ORPHAN_LOCK_TIMEOUT = 20.0 # Duration to wait before considering a lock as orphaned. + # AQUIRE_LOCK_TIMEOUT = 60.0 + + prev = locking.AQUIRE_LOCK_TIMEOUT, locking.LOCK_KEEP_ALIVE_TIMEOUT, locking.REMOVE_ORPHAN_LOCK_TIMEOUT + + locking.AQUIRE_LOCK_TIMEOUT = 10.0 + locking.LOCK_KEEP_ALIVE_TIMEOUT = 1.0 + locking.REMOVE_ORPHAN_LOCK_TIMEOUT = 0.1 lock = locking.ReadLock("test_remove_orphaned_locks") lock._lock() ls = locking.FileLocksSnapshot(lock.need_lock) - assert len(ls.locks) == 1 + assert len(ls.locks) >= 1 ## The one lock or two if currently in keep alive handover time.sleep(0.2) # Trigger the removal of orphaned locks ls = locking.FileLocksSnapshot(lock.need_lock) assert len(ls.locks) == 0 - locking.LOCK_TIMEOUT = prev_config + + locking.AQUIRE_LOCK_TIMEOUT, locking.LOCK_KEEP_ALIVE_TIMEOUT, locking.REMOVE_ORPHAN_LOCK_TIMEOUT = prev