Skip to content

Commit

Permalink
Initial version
Browse files Browse the repository at this point in the history
  • Loading branch information
mkrd committed Nov 25, 2023
1 parent 5a38b61 commit 0cc543e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 9 deletions.
56 changes: 51 additions & 5 deletions dictdatabase/locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

# Constants
SLEEP_TIMEOUT = 0.001
LOCK_TIMEOUT = 60.0 # Duration to wait before considering a lock as orphaned.
LOCK_KEEP_ALIVE_TIMEOUT = 0.001
REMOVE_ORPHAN_LOCK_TIMEOUT = 20.0 # Duration to wait before considering a lock as orphaned.
AQUIRE_LOCK_TIMEOUT = 60.0


def os_touch(path: str) -> None:
Expand Down Expand Up @@ -50,6 +52,9 @@ def __init__(self, ddb_dir: str, name: str, id: str, time_ns: str, stage: str, m
lock_file = f"{name}.{id}.{time_ns}.{stage}.{mode}.lock"
self.path = os.path.join(ddb_dir, lock_file)

def __repr__(self) -> str:
return f"LockFileMeta({self.ddb_dir=}, {self.name=}, {self.id=}, {self.time_ns=}, {self.stage=}, {self.mode=})"

def new_with_updated_time(self) -> LockFileMeta:
"""
Create a new instance with an updated timestamp.
Expand Down Expand Up @@ -91,7 +96,7 @@ def __init__(self, need_lock: LockFileMeta) -> None:
# Remove orphaned locks
if lock_meta.path != need_lock.path:
lock_age = time.time_ns() - int(lock_meta.time_ns)
if lock_age > LOCK_TIMEOUT * 1_000_000_000:
if lock_age > REMOVE_ORPHAN_LOCK_TIMEOUT * 1_000_000_000:
os.unlink(lock_meta.path)
print(f"Removed orphaned lock ({lock_meta.path})")
continue
Expand Down Expand Up @@ -129,13 +134,15 @@ class AbstractLock:
provides a blueprint for derived classes to implement.
"""

__slots__ = ("db_name", "need_lock", "has_lock", "snapshot", "mode")
__slots__ = ("db_name", "need_lock", "has_lock", "snapshot", "mode", "is_alive" "keep_alive_thread")

db_name: str
need_lock: LockFileMeta
has_lock: LockFileMeta
snapshot: FileLocksSnapshot
mode: str
is_alive: bool
keep_alive_thread: threading.Thread

def __init__(self, db_name: str) -> None:
# Normalize db_name to avoid file naming conflicts
Expand All @@ -147,16 +154,53 @@ def __init__(self, db_name: str) -> None:
self.need_lock = LockFileMeta(dir, self.db_name, t_id, time_ns, "need", self.mode)
self.has_lock = LockFileMeta(dir, self.db_name, t_id, time_ns, "has", self.mode)

self.is_alive = False
self.keep_alive_thread = None

# Ensure lock directory exists
if not os.path.isdir(dir):
os.makedirs(dir, exist_ok=True)

def _keep_alive_thread(self) -> None:
"""
Keep the lock alive by updating the timestamp of the lock file.
"""
time.sleep(LOCK_KEEP_ALIVE_TIMEOUT)
while self.is_alive:
new_has_lock = self.has_lock.new_with_updated_time()
os_touch(new_has_lock.path)
old_has_lock_path = self.has_lock.path
self.has_lock = new_has_lock
try:
os.unlink(old_has_lock_path)
except FileNotFoundError:
pass
time.sleep(LOCK_KEEP_ALIVE_TIMEOUT)

def _start_keep_alive_thread(self) -> None:
"""
Start a thread that keeps the lock alive by updating the timestamp of the lock file.
"""

if self.keep_alive_thread is not None:
raise RuntimeError("Keep alive thread already exists.")

self.is_alive = True
self.keep_alive_thread = threading.Thread(target=self._keep_alive_thread, daemon=False)
self.keep_alive_thread.start()

def _lock(self) -> None:
"""Override this method to implement locking mechanism."""
raise NotImplementedError

def _unlock(self) -> None:
"""Remove the lock files associated with this lock."""

if self.keep_alive_thread is not None:
self.is_alive = False
self.keep_alive_thread.join()
self.keep_alive_thread = None

for p in ("need_lock", "has_lock"):
try:
if lock := getattr(self, p, None):
Expand Down Expand Up @@ -202,9 +246,10 @@ def _lock(self) -> None:
self.has_lock = self.has_lock.new_with_updated_time()
os_touch(self.has_lock.path)
os.unlink(self.need_lock.path)
self._start_keep_alive_thread()
return
time.sleep(SLEEP_TIMEOUT)
if time.time() - start_time > LOCK_TIMEOUT:
if time.time() - start_time > AQUIRE_LOCK_TIMEOUT:
raise RuntimeError("Timeout while waiting for read lock.")
self.snapshot = FileLocksSnapshot(self.need_lock)

Expand Down Expand Up @@ -236,8 +281,9 @@ def _lock(self) -> None:
self.has_lock = self.has_lock.new_with_updated_time()
os_touch(self.has_lock.path)
os.unlink(self.need_lock.path)
self._start_keep_alive_thread()
return
time.sleep(SLEEP_TIMEOUT)
if time.time() - start_time > LOCK_TIMEOUT:
if time.time() - start_time > AQUIRE_LOCK_TIMEOUT:
raise RuntimeError("Timeout while waiting for write lock.")
self.snapshot = FileLocksSnapshot(self.need_lock)
17 changes: 13 additions & 4 deletions tests/test_locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,26 @@ def test_get_lock_names(use_compression):


def test_remove_orphaned_locks():
prev_config = locking.LOCK_TIMEOUT
locking.LOCK_TIMEOUT = 0.1
# SLEEP_TIMEOUT = 0.001
# LOCK_KEEP_ALIVE_TIMEOUT = 0.001
# REMOVE_ORPHAN_LOCK_TIMEOUT = 20.0 # Duration to wait before considering a lock as orphaned.
# AQUIRE_LOCK_TIMEOUT = 60.0

prev = locking.AQUIRE_LOCK_TIMEOUT, locking.LOCK_KEEP_ALIVE_TIMEOUT, locking.REMOVE_ORPHAN_LOCK_TIMEOUT

locking.AQUIRE_LOCK_TIMEOUT = 10.0
locking.LOCK_KEEP_ALIVE_TIMEOUT = 1.0
locking.REMOVE_ORPHAN_LOCK_TIMEOUT = 0.1
lock = locking.ReadLock("test_remove_orphaned_locks")
lock._lock()

ls = locking.FileLocksSnapshot(lock.need_lock)
assert len(ls.locks) == 1
assert len(ls.locks) >= 1 ## The one lock or two if currently in keep alive handover

time.sleep(0.2)
# Trigger the removal of orphaned locks
ls = locking.FileLocksSnapshot(lock.need_lock)

assert len(ls.locks) == 0
locking.LOCK_TIMEOUT = prev_config

locking.AQUIRE_LOCK_TIMEOUT, locking.LOCK_KEEP_ALIVE_TIMEOUT, locking.REMOVE_ORPHAN_LOCK_TIMEOUT = prev

0 comments on commit 0cc543e

Please sign in to comment.