Skip to content

Commit

Permalink
Allow exclusive access to database via context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
gerlero committed Jul 30, 2023
1 parent 0eefb2b commit b027621
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ You can look up components in the `database` as you would with `dict` (with comp
def is_user_defined(self, name: str) -> bool: ...
```

The `database` object is also usable as a context manager (i.e. `with database:`), which allows multiple operations to be performed with exclusive access to the database (locking out any other processes for the duration).

`Constituent` names are case insensitive and will be automatically converted to all uppercase. Any instances added to (or removed from) the `database` will be saved for the current operating system user. Default components cannot be changed or removed (expect a `ValueError` if you try).

The public interface of the `Constituent` class is:
Expand Down
70 changes: 40 additions & 30 deletions electrolytes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from functools import cached_property
else:
from backports.cached_property import cached_property
from contextlib import ContextDecorator
from warnings import warn

from pydantic import BaseModel, Field, field_validator, FieldValidationInfo, model_validator, TypeAdapter
Expand Down Expand Up @@ -121,13 +122,12 @@ def _dump_constituents(constituents: List[Constituent]) -> bytes:
return _StoredConstituents.dump_json({"constituents": constituents}, by_alias=True, indent=4)


class _FileLock(FileLock):
def __enter__(self) -> Any:
Path(self.lock_file).parent.mkdir(parents=True, exist_ok=True) # https://github.com/tox-dev/py-filelock/issues/176
return super().__enter__()
class _Database(ContextDecorator):
def __init__(self, user_constituents_file: Path) -> None:
self._user_constituents_file = user_constituents_file
self._user_constituents_lock = FileLock(self._user_constituents_file.with_suffix(".lock"))
self._user_constituents_dirty = False


class _Database:
@cached_property
def _default_constituents(self) -> Dict[str, Constituent]:
data = pkgutil.get_data(__package__, "db1.json")
Expand All @@ -136,34 +136,46 @@ def _default_constituents(self) -> Dict[str, Constituent]:
constituents = _load_constituents(data, context={"fix": "db1"})
return {c.name: c for c in constituents}

_USER_CONSTITUENTS_FILE = Path(get_app_dir(__package__), "user_constituents.json")
_user_constituents_file_lock = _FileLock(_USER_CONSTITUENTS_FILE.with_suffix(".lock"))

@cached_property
def _user_constituents(self) -> Dict[str, Constituent]:
try:
with self._user_constituents_file_lock:
data = self._USER_CONSTITUENTS_FILE.read_bytes()
with self:
user_data = self._user_constituents_file.read_bytes()
except OSError:
return {}
try:
constituents = _load_constituents(data)
user_constituents = _load_constituents(user_data)
except Exception as e:
warn(f"failed to load user constituents from {self._USER_CONSTITUENTS_FILE}: {type(e).__name__}", RuntimeWarning)
warn(f"failed to load user constituents from {self._user_constituents_file}: {type(e).__name__}", RuntimeWarning)
return {}
return {c.name: c for c in constituents}
return {c.name: c for c in user_constituents}

def _invalidate_user_constituents(self) -> None:
assert not self._user_constituents_dirty
try:
del self._user_constituents
except AttributeError:
pass

@_user_constituents_file_lock
def _save_user_constituents(self) -> None:
data = _dump_constituents(list(self._user_constituents.values()))
self._USER_CONSTITUENTS_FILE.parent.mkdir(parents=True, exist_ok=True)
self._USER_CONSTITUENTS_FILE.write_bytes(data)
self._user_constituents_file.parent.mkdir(parents=True, exist_ok=True)
with self:
self._user_constituents_file.write_bytes(data)
self._user_constituents_dirty = False

def __enter__(self) -> None:
if not self._user_constituents_lock.is_locked:
Path(self._user_constituents_lock.lock_file).parent.mkdir(parents=True, exist_ok=True) # https://github.com/tox-dev/py-filelock/issues/176
self._invalidate_user_constituents()
self._user_constituents_lock.acquire()

def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
try:
if self._user_constituents_lock.lock_counter == 1 and self._user_constituents_dirty:
self._save_user_constituents()
finally:
self._user_constituents_lock.release()


def __iter__(self) -> Iterator[str]:
Expand All @@ -176,22 +188,20 @@ def __getitem__(self, name: str) -> Constituent:
except KeyError:
return self._default_constituents[name]

@_user_constituents_file_lock
def add(self, constituent: Constituent) -> None:
self._invalidate_user_constituents()
if constituent.name not in self:
self._user_constituents[constituent.name] = constituent
self._save_user_constituents()
else:
warn(f"{constituent.name}: component was not added (name already exists in database)")
with self:
if constituent.name not in self:
self._user_constituents[constituent.name] = constituent
self._user_constituents_dirty = True
else:
warn(f"{constituent.name}: component was not added (name already exists in database)")

@_user_constituents_file_lock
def __delitem__(self, name: str) -> None:
name = name.upper()
try:
self._invalidate_user_constituents()
del self._user_constituents[name]
self._save_user_constituents()
with self:
del self._user_constituents[name]
self._user_constituents_dirty = True
except KeyError:
if name in self._default_constituents:
raise ValueError(f"{name}: cannot remove default component")
Expand Down Expand Up @@ -221,4 +231,4 @@ def is_user_defined(self, name: str) -> bool:
return name in self._user_constituents


database = _Database()
database = _Database(Path(get_app_dir(__package__), "user_constituents.json"))
2 changes: 1 addition & 1 deletion electrolytes/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def add(name: Annotated[str, typer.Argument(autocompletion=complete_name_user_de
pkas_neg=[x[1] for x in neg],
pkas_pos=[x[1] for x in pos])

with database._user_constituents_file_lock:
with database:
if name in database:
if not database.is_user_defined(name):
typer.echo(f"Error: {name}: is a default component", err=True)
Expand Down

0 comments on commit b027621

Please sign in to comment.