Skip to content

Commit

Permalink
Fix database locks (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
jobrachem authored Feb 13, 2024
1 parent 5e9f239 commit 1571746
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 71 deletions.
16 changes: 12 additions & 4 deletions src/alfred3_interact/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@ def saving_method(exp) -> str:
return None


class MatchingTimeout(Exception):
class AlfredInteractError(Exception):
pass


class MatchingError(Exception):
class MatchingTimeout(AlfredInteractError):
pass


class BusyGroup(Exception):
class MatchingError(AlfredInteractError):
pass


class NoMatch(Exception):
class BusyGroup(AlfredInteractError):
pass


class NoMatch(AlfredInteractError):
pass


class MatchMakerBusy(AlfredInteractError):
pass
2 changes: 1 addition & 1 deletion src/alfred3_interact/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# 3) we can import it into your module module


__version__ = "0.2.5-dev1"
__version__ = "0.2.5-dev8"
34 changes: 23 additions & 11 deletions src/alfred3_interact/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class GroupData:
members: list = field(default_factory=list)
timestamp: float = field(default_factory=time.time)
active: bool = True
busy: bool = False
busy: str = "false"
shared_data: dict = field(default_factory=dict)
type: str = "match_group"

Expand Down Expand Up @@ -229,10 +229,10 @@ def load_markbusy(self) -> GroupData:

def _load_markbusy_mongo(self) -> dict:
q = self.query
q["busy"] = False
q["busy"] = "false"
data = self.db.find_one_and_update(
q,
{"$set": {"busy": True}},
filter=q,
update={"$set": {"busy": self.exp.session_id}},
return_document=ReturnDocument.AFTER,
projection={"_id": False},
)
Expand All @@ -241,28 +241,35 @@ def _load_markbusy_mongo(self) -> dict:

def _load_markbusy_local(self):
data = self._load_local()
if not data["busy"]:
data["busy"] = True
if data["busy"] == "false":
data["busy"] = self.exp.session_id
self._save_local(data)
return data
else:
return None

def release(self):
if self.saving_method == "mongo":
self._release_mongo()
data = self._release_mongo()
elif self.saving_method == "local":
self._release_local()
data = self._release_local()

self.group.data.busy = False
return data

def _release_mongo(self):
self.db.find_one_and_update(self.query, {"$set": {"busy": False}})
data = self.db.find_one_and_update(
filter=self.query,
update={"$set": {"busy": "false"}},
return_document=ReturnDocument.AFTER,
)
return data

def _release_local(self):
data = self._load_local()
data["busy"] = False
data["busy"] = "false"
self._save_local(data)
return data


class GroupRoles(GroupHelper):
Expand Down Expand Up @@ -751,7 +758,12 @@ def __exit__(self, exc_type, exc_value, tb):
self.data.active = False
self.io.save()

self.io.release()
data = self.io.release()

if not data:
self.exp.log.debug(
f"{self} seems to be busy. GroupIO.release() returned {data}"
)


class GroupManager:
Expand Down
95 changes: 51 additions & 44 deletions src/alfred3_interact/match.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from alfred3_interact.group import GroupManager

from ._util import MatchingError, NoMatch, saving_method
from ._util import MatchingError, MatchMakerBusy, NoMatch, saving_method
from .group import Group
from .member import GroupMember, MemberManager
from .quota import MetaQuota
Expand All @@ -29,7 +29,7 @@ class MatchMakerData:
matchmaker_id: str
type: str
members: dict = field(default_factory=dict)
busy: bool = False
busy: str = "false"
active: bool = False
ping_timeout: int = None

Expand Down Expand Up @@ -90,12 +90,27 @@ def load_markbusy(self) -> MatchMakerData:
def release(self) -> MatchMakerData:
"""
Releases MatchMakerData from a 'busy' state.
Happens only in groupwise matching, so there is only a mongoDB
version of this one.
"""
self.mm.busy = False
if saving_method(self.mm.exp) == "mongo":
return self._release_mongo()
elif saving_method(self.mm.exp) == "local":
return self._release_local()

def _release_mongo(self):
q = copy.copy(self.query)
q["busy"] = True
self.db.find_one_and_update(q, {"$set": {"busy": False}})
q["busy"] = self.mm.exp.session_id
return self.db.find_one_and_update(
filter=q,
update={"$set": {"busy": "false"}},
return_document=ReturnDocument.AFTER,
)

def _release_local(self):
data = self._load_local()
data.busy = "false"
self._save_local(data)
return data

def _save_mongo(self, data: MatchMakerData):
self.db.find_one_and_replace(self.query, asdict(data))
Expand Down Expand Up @@ -148,36 +163,30 @@ def _load_local(self) -> MatchMakerData:

def _load_markbusy_local(self):
data = self._load_local()
if not data.busy:
data.busy = True
if data.busy == "false":
data.busy = self.mm.exp.session_id
self._save_local(data)
return data
else:
return None

def _load_markbusy_mongo(self):
q = self.query
q["busy"] = False
data = self.db.find_one_and_update(q, {"$set": {"busy": True}})
q["busy"] = "false"

data = self.db.find_one_and_update(
filter=q,
update={"$set": {"busy": self.mm.exp.session_id}},
projection={"_id": False},
return_document=ReturnDocument.AFTER,
)

if data is not None:
self.mm.exp.log.debug(
f"Found non-busy MatchMaker dataset. Timestamp: {time.time()}"
)
data.pop("_id", None)
data["busy"] = True
return MatchMakerData(**data)
else:
self.mm.exp.log.debug(
f"Did NOT find non-busy MatchMaker dataset. Timestamp: {time.time()}"
)
return None

def __enter__(self):
self.mm.exp.log.debug(
"Entering MatchMakerIO Trying to load MatchMakerData. Timestamp:"
f" {time.time()}"
)
return self.load_markbusy()

def __exit__(self, exc_type, exc_value, traceback):
Expand All @@ -192,12 +201,15 @@ def __exit__(self, exc_type, exc_value, traceback):
f" responsible member {self.mm.member} and released the lock.\n{tb}"
)

self.mm.exp.log.debug(
f"Releasing MatchMakerData lock. Timestamp: {time.time()}"
)
self.release()
self.mm.exp.log.debug(f"MatchMakerData lock released. Timestamp: {time.time()}")
data = self.release()

if not data:
self.mm.exp.log.debug(
f"MatchMaker seems to be busy. MatchMakerIO.release() returned {data}"
)
raise MatchMakerBusy

self.mm.exp.log.debug(f"MatchMakerData lock released. Timestamp: {time.time()}")
self.mm._data = self.load()


Expand Down Expand Up @@ -542,14 +554,7 @@ def on_first_show(self):
self.member = self._init_member()

if self.member.matched:
self.exp.log.debug(
"Member is matched. Retrieving group. This call is logged BEFORE."
)
group = self._get_group(self.member)
self.exp.log.debug(
f"Member is matched. Returning group {group}. This call is logged"
" AFTER."
)
return group

self.member.io.ping()
Expand All @@ -568,14 +573,7 @@ def on_first_show(self):

if enough_members or waited_enough:
random.shuffle(self.groupspecs)
self.exp.log.debug(
"Member is not matched. Calling _match_quota. This call is logged"
" BEFORE."
)
group = self._match_quota(self.groupspecs)
self.exp.log.debug(
f"Match completed. Returning group {group}. This call is logged AFTER."
)
return group

raise NoMatch
Expand Down Expand Up @@ -873,18 +871,27 @@ def _validate_specs(self, specs):
return specs

def _init_member(self) -> GroupMember:

if self.member:
self.member.io.load()

return self.member

member = GroupMember(self)
member.io.save()
return member
with self.io as data:

if data is not None:
member = GroupMember(self)
member.io.save()

return member

raise MatchMakerBusy

def _update_additional_data(self):
prefix = "interact"
while prefix in self.exp.adata:
prefix = "_" + prefix
self.exp.adata[prefix] = {}
self.exp.adata[prefix]["groupid"] = self.group.data.group_id
self.exp.adata[prefix]["spec_name"] = self.group.data.spec_name
self.exp.adata[prefix]["role"] = self.member.data.role
12 changes: 11 additions & 1 deletion src/alfred3_interact/member.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from alfred3.data_manager import DataManager as dm
from alfred3.quota import SessionGroup
from pymongo.collection import ReturnDocument

from ._util import saving_method

Expand Down Expand Up @@ -62,6 +63,7 @@ def load(self):
data = self._load_local()

data = data["members"][self.sid]

self.member.data = GroupMemberData(**data)
return

Expand Down Expand Up @@ -102,7 +104,15 @@ def _save_local(self):

def _save_mongo(self):
data = {"members": {self.sid: asdict(self.member.data)}}
self.db.find_one_and_update(self.query, [{"$set": data}])

from pprint import pprint

data_before = self.db.find_one(self.query)
self.exp.log.debug(f"MEBER DATA BEFORE: {pprint(data_before)}")
data_after = self.db.find_one_and_update(
self.query, [{"$set": data}], return_document=ReturnDocument.AFTER
)
self.exp.log.debug(f"MEBER DATA AFTER: {pprint(data_after)}")

def ping(self):
if saving_method(self.exp) == "local":
Expand Down
4 changes: 2 additions & 2 deletions src/alfred3_interact/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from alfred3._helper import inherit_kwargs
from alfred3.element.misc import RepeatedCallback

from ._util import NoMatch
from ._util import MatchMakerBusy, NoMatch
from .element import ToggleMatchMakerActivation, ViewMembers


Expand Down Expand Up @@ -467,7 +467,7 @@ def _wait_for(self) -> bool:
try:
wait_status = self.wait_for() # can return None

except NoMatch:
except (NoMatch, MatchMakerBusy):
return False # return False so that the repeated callback will try again

except Exception:
Expand Down
11 changes: 3 additions & 8 deletions src/alfred3_interact/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,17 @@ def start_group(self, data, waiting_members: t.List[GroupMember]) -> Group:
group.io.save()
self.mm.io.save(data=data)

self.log.info(f"Group {group} filled. Returning group")
self.log.info(f"{group} filled. Returning group")
return group

def get_group(self) -> Group:
member = self.mm.member
member.io.load()

if member.status.matched:
group = self.group_manager.find_one(member.data.group_id)
self.mm.exp.log.debug(
f"We are in ParallelMatchMaker.get_group(). Found group {group}."
" Returning."
)

return group
self.mm.exp.log.debug(
"We are in ParallelMatchMaker.get_group(). Found NO group. Returning."
)


class Spec(ABC):
Expand Down

0 comments on commit 1571746

Please sign in to comment.