From 1571746040b1495d281699c02c2c25b0d1b039b5 Mon Sep 17 00:00:00 2001 From: Johannes Brachem <37882800+jobrachem@users.noreply.github.com> Date: Tue, 13 Feb 2024 18:46:23 +0100 Subject: [PATCH] Fix database locks (#35) --- src/alfred3_interact/_util.py | 16 ++++-- src/alfred3_interact/_version.py | 2 +- src/alfred3_interact/group.py | 34 ++++++++---- src/alfred3_interact/match.py | 95 +++++++++++++++++--------------- src/alfred3_interact/member.py | 12 +++- src/alfred3_interact/page.py | 4 +- src/alfred3_interact/spec.py | 11 +--- 7 files changed, 103 insertions(+), 71 deletions(-) diff --git a/src/alfred3_interact/_util.py b/src/alfred3_interact/_util.py index 7dd5252..e51804b 100644 --- a/src/alfred3_interact/_util.py +++ b/src/alfred3_interact/_util.py @@ -13,17 +13,25 @@ def saving_method(exp) -> str: return None -class MatchingTimeout(Exception): +class AlfredInteractError(Exception): pass -class MatchingError(Exception): +class MatchingTimeout(AlfredInteractError): pass -class BusyGroup(Exception): +class MatchingError(AlfredInteractError): pass -class NoMatch(Exception): +class BusyGroup(AlfredInteractError): + pass + + +class NoMatch(AlfredInteractError): + pass + + +class MatchMakerBusy(AlfredInteractError): pass diff --git a/src/alfred3_interact/_version.py b/src/alfred3_interact/_version.py index 19d6c32..ec751c0 100644 --- a/src/alfred3_interact/_version.py +++ b/src/alfred3_interact/_version.py @@ -4,4 +4,4 @@ # 3) we can import it into your module module -__version__ = "0.2.5-dev1" +__version__ = "0.2.5-dev8" diff --git a/src/alfred3_interact/group.py b/src/alfred3_interact/group.py index 63fb1c2..c9dbf8c 100644 --- a/src/alfred3_interact/group.py +++ b/src/alfred3_interact/group.py @@ -36,7 +36,7 @@ class GroupData: members: list = field(default_factory=list) timestamp: float = field(default_factory=time.time) active: bool = True - busy: bool = False + busy: str = "false" shared_data: dict = field(default_factory=dict) type: str = "match_group" @@ -229,10 +229,10 @@ def load_markbusy(self) -> GroupData: def _load_markbusy_mongo(self) -> dict: q = self.query - q["busy"] = False + q["busy"] = "false" data = self.db.find_one_and_update( - q, - {"$set": {"busy": True}}, + filter=q, + update={"$set": {"busy": self.exp.session_id}}, return_document=ReturnDocument.AFTER, projection={"_id": False}, ) @@ -241,8 +241,8 @@ def _load_markbusy_mongo(self) -> dict: def _load_markbusy_local(self): data = self._load_local() - if not data["busy"]: - data["busy"] = True + if data["busy"] == "false": + data["busy"] = self.exp.session_id self._save_local(data) return data else: @@ -250,19 +250,26 @@ def _load_markbusy_local(self): def release(self): if self.saving_method == "mongo": - self._release_mongo() + data = self._release_mongo() elif self.saving_method == "local": - self._release_local() + data = self._release_local() self.group.data.busy = False + return data def _release_mongo(self): - self.db.find_one_and_update(self.query, {"$set": {"busy": False}}) + data = self.db.find_one_and_update( + filter=self.query, + update={"$set": {"busy": "false"}}, + return_document=ReturnDocument.AFTER, + ) + return data def _release_local(self): data = self._load_local() - data["busy"] = False + data["busy"] = "false" self._save_local(data) + return data class GroupRoles(GroupHelper): @@ -751,7 +758,12 @@ def __exit__(self, exc_type, exc_value, tb): self.data.active = False self.io.save() - self.io.release() + data = self.io.release() + + if not data: + self.exp.log.debug( + f"{self} seems to be busy. GroupIO.release() returned {data}" + ) class GroupManager: diff --git a/src/alfred3_interact/match.py b/src/alfred3_interact/match.py index 463c96d..cbc60d6 100644 --- a/src/alfred3_interact/match.py +++ b/src/alfred3_interact/match.py @@ -14,7 +14,7 @@ from alfred3_interact.group import GroupManager -from ._util import MatchingError, NoMatch, saving_method +from ._util import MatchingError, MatchMakerBusy, NoMatch, saving_method from .group import Group from .member import GroupMember, MemberManager from .quota import MetaQuota @@ -29,7 +29,7 @@ class MatchMakerData: matchmaker_id: str type: str members: dict = field(default_factory=dict) - busy: bool = False + busy: str = "false" active: bool = False ping_timeout: int = None @@ -90,12 +90,27 @@ def load_markbusy(self) -> MatchMakerData: def release(self) -> MatchMakerData: """ Releases MatchMakerData from a 'busy' state. - Happens only in groupwise matching, so there is only a mongoDB - version of this one. """ + self.mm.busy = False + if saving_method(self.mm.exp) == "mongo": + return self._release_mongo() + elif saving_method(self.mm.exp) == "local": + return self._release_local() + + def _release_mongo(self): q = copy.copy(self.query) - q["busy"] = True - self.db.find_one_and_update(q, {"$set": {"busy": False}}) + q["busy"] = self.mm.exp.session_id + return self.db.find_one_and_update( + filter=q, + update={"$set": {"busy": "false"}}, + return_document=ReturnDocument.AFTER, + ) + + def _release_local(self): + data = self._load_local() + data.busy = "false" + self._save_local(data) + return data def _save_mongo(self, data: MatchMakerData): self.db.find_one_and_replace(self.query, asdict(data)) @@ -148,8 +163,8 @@ def _load_local(self) -> MatchMakerData: def _load_markbusy_local(self): data = self._load_local() - if not data.busy: - data.busy = True + if data.busy == "false": + data.busy = self.mm.exp.session_id self._save_local(data) return data else: @@ -157,27 +172,21 @@ def _load_markbusy_local(self): def _load_markbusy_mongo(self): q = self.query - q["busy"] = False - data = self.db.find_one_and_update(q, {"$set": {"busy": True}}) + q["busy"] = "false" + + data = self.db.find_one_and_update( + filter=q, + update={"$set": {"busy": self.mm.exp.session_id}}, + projection={"_id": False}, + return_document=ReturnDocument.AFTER, + ) if data is not None: - self.mm.exp.log.debug( - f"Found non-busy MatchMaker dataset. Timestamp: {time.time()}" - ) - data.pop("_id", None) - data["busy"] = True return MatchMakerData(**data) else: - self.mm.exp.log.debug( - f"Did NOT find non-busy MatchMaker dataset. Timestamp: {time.time()}" - ) return None def __enter__(self): - self.mm.exp.log.debug( - "Entering MatchMakerIO Trying to load MatchMakerData. Timestamp:" - f" {time.time()}" - ) return self.load_markbusy() def __exit__(self, exc_type, exc_value, traceback): @@ -192,12 +201,15 @@ def __exit__(self, exc_type, exc_value, traceback): f" responsible member {self.mm.member} and released the lock.\n{tb}" ) - self.mm.exp.log.debug( - f"Releasing MatchMakerData lock. Timestamp: {time.time()}" - ) - self.release() - self.mm.exp.log.debug(f"MatchMakerData lock released. Timestamp: {time.time()}") + data = self.release() + + if not data: + self.mm.exp.log.debug( + f"MatchMaker seems to be busy. MatchMakerIO.release() returned {data}" + ) + raise MatchMakerBusy + self.mm.exp.log.debug(f"MatchMakerData lock released. Timestamp: {time.time()}") self.mm._data = self.load() @@ -542,14 +554,7 @@ def on_first_show(self): self.member = self._init_member() if self.member.matched: - self.exp.log.debug( - "Member is matched. Retrieving group. This call is logged BEFORE." - ) group = self._get_group(self.member) - self.exp.log.debug( - f"Member is matched. Returning group {group}. This call is logged" - " AFTER." - ) return group self.member.io.ping() @@ -568,14 +573,7 @@ def on_first_show(self): if enough_members or waited_enough: random.shuffle(self.groupspecs) - self.exp.log.debug( - "Member is not matched. Calling _match_quota. This call is logged" - " BEFORE." - ) group = self._match_quota(self.groupspecs) - self.exp.log.debug( - f"Match completed. Returning group {group}. This call is logged AFTER." - ) return group raise NoMatch @@ -873,13 +871,21 @@ def _validate_specs(self, specs): return specs def _init_member(self) -> GroupMember: + if self.member: self.member.io.load() + return self.member - member = GroupMember(self) - member.io.save() - return member + with self.io as data: + + if data is not None: + member = GroupMember(self) + member.io.save() + + return member + + raise MatchMakerBusy def _update_additional_data(self): prefix = "interact" @@ -887,4 +893,5 @@ def _update_additional_data(self): prefix = "_" + prefix self.exp.adata[prefix] = {} self.exp.adata[prefix]["groupid"] = self.group.data.group_id + self.exp.adata[prefix]["spec_name"] = self.group.data.spec_name self.exp.adata[prefix]["role"] = self.member.data.role diff --git a/src/alfred3_interact/member.py b/src/alfred3_interact/member.py index 18f0745..bfd95d1 100644 --- a/src/alfred3_interact/member.py +++ b/src/alfred3_interact/member.py @@ -10,6 +10,7 @@ from alfred3.data_manager import DataManager as dm from alfred3.quota import SessionGroup +from pymongo.collection import ReturnDocument from ._util import saving_method @@ -62,6 +63,7 @@ def load(self): data = self._load_local() data = data["members"][self.sid] + self.member.data = GroupMemberData(**data) return @@ -102,7 +104,15 @@ def _save_local(self): def _save_mongo(self): data = {"members": {self.sid: asdict(self.member.data)}} - self.db.find_one_and_update(self.query, [{"$set": data}]) + + from pprint import pprint + + data_before = self.db.find_one(self.query) + self.exp.log.debug(f"MEBER DATA BEFORE: {pprint(data_before)}") + data_after = self.db.find_one_and_update( + self.query, [{"$set": data}], return_document=ReturnDocument.AFTER + ) + self.exp.log.debug(f"MEBER DATA AFTER: {pprint(data_after)}") def ping(self): if saving_method(self.exp) == "local": diff --git a/src/alfred3_interact/page.py b/src/alfred3_interact/page.py index c7b7c9d..3844568 100644 --- a/src/alfred3_interact/page.py +++ b/src/alfred3_interact/page.py @@ -11,7 +11,7 @@ from alfred3._helper import inherit_kwargs from alfred3.element.misc import RepeatedCallback -from ._util import NoMatch +from ._util import MatchMakerBusy, NoMatch from .element import ToggleMatchMakerActivation, ViewMembers @@ -467,7 +467,7 @@ def _wait_for(self) -> bool: try: wait_status = self.wait_for() # can return None - except NoMatch: + except (NoMatch, MatchMakerBusy): return False # return False so that the repeated callback will try again except Exception: diff --git a/src/alfred3_interact/spec.py b/src/alfred3_interact/spec.py index 61e8812..c9c7f97 100644 --- a/src/alfred3_interact/spec.py +++ b/src/alfred3_interact/spec.py @@ -163,22 +163,17 @@ def start_group(self, data, waiting_members: t.List[GroupMember]) -> Group: group.io.save() self.mm.io.save(data=data) - self.log.info(f"Group {group} filled. Returning group") + self.log.info(f"{group} filled. Returning group") return group def get_group(self) -> Group: member = self.mm.member member.io.load() + if member.status.matched: group = self.group_manager.find_one(member.data.group_id) - self.mm.exp.log.debug( - f"We are in ParallelMatchMaker.get_group(). Found group {group}." - " Returning." - ) + return group - self.mm.exp.log.debug( - "We are in ParallelMatchMaker.get_group(). Found NO group. Returning." - ) class Spec(ABC):