Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix database locks #35

Merged
merged 19 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
13459b8
Update busy flag to use string value with session id: Only locking se…
jobrachem Feb 13, 2024
ff0b044
Refactor exception classes in _util.py
jobrachem Feb 13, 2024
7a8daeb
Fix MatchMakerIO release
jobrachem Feb 13, 2024
3cb4c4e
Update _release_mongo method in MatchMakerIO class
jobrachem Feb 13, 2024
fc1b467
Update busy flag to use exp session id: Only the flagging session is …
jobrachem Feb 13, 2024
84d8810
Fix condition for checking if group is busy
jobrachem Feb 13, 2024
a450f96
Update version number to 0.2.5-dev2
jobrachem Feb 13, 2024
f424f6e
Handle MatchMakerBusy exception in WaitingPage.
jobrachem Feb 13, 2024
29b237b
Update version to 0.2.5-dev3
jobrachem Feb 13, 2024
60d3a20
Add debug logs for member initialization and loading
jobrachem Feb 13, 2024
5452c1b
Update version to 0.2.5-dev4
jobrachem Feb 13, 2024
cc63072
Update version and add debug logs
jobrachem Feb 13, 2024
e2875af
Add debug logging for member data before and after saving
jobrachem Feb 13, 2024
a368708
Refactor MatchMaker._init_member() method throw MatchMakerBusy exception
jobrachem Feb 13, 2024
f47941b
Update version number to 0.2.5-dev6
jobrachem Feb 13, 2024
fc0830d
Update version number to 0.2.5-dev7 and fix variable name in match.py
jobrachem Feb 13, 2024
54fc444
Update version and fix MatchMaker initialization
jobrachem Feb 13, 2024
2179701
Refactor MatchMakerIO class and remove unnecessary log statements
jobrachem Feb 13, 2024
3a887ab
Add spec_name to exp.adata
jobrachem Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/alfred3_interact/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@ def saving_method(exp) -> str:
return None


class MatchingTimeout(Exception):
class AlfredInteractError(Exception):
pass


class MatchingError(Exception):
class MatchingTimeout(AlfredInteractError):
pass


class BusyGroup(Exception):
class MatchingError(AlfredInteractError):
pass


class NoMatch(Exception):
class BusyGroup(AlfredInteractError):
pass


class NoMatch(AlfredInteractError):
pass


class MatchMakerBusy(AlfredInteractError):
pass
2 changes: 1 addition & 1 deletion src/alfred3_interact/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# 3) we can import it into your module module


__version__ = "0.2.5-dev1"
__version__ = "0.2.5-dev8"
34 changes: 23 additions & 11 deletions src/alfred3_interact/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class GroupData:
members: list = field(default_factory=list)
timestamp: float = field(default_factory=time.time)
active: bool = True
busy: bool = False
busy: str = "false"
shared_data: dict = field(default_factory=dict)
type: str = "match_group"

Expand Down Expand Up @@ -229,10 +229,10 @@ def load_markbusy(self) -> GroupData:

def _load_markbusy_mongo(self) -> dict:
q = self.query
q["busy"] = False
q["busy"] = "false"
data = self.db.find_one_and_update(
q,
{"$set": {"busy": True}},
filter=q,
update={"$set": {"busy": self.exp.session_id}},
return_document=ReturnDocument.AFTER,
projection={"_id": False},
)
Expand All @@ -241,28 +241,35 @@ def _load_markbusy_mongo(self) -> dict:

def _load_markbusy_local(self):
data = self._load_local()
if not data["busy"]:
data["busy"] = True
if data["busy"] == "false":
data["busy"] = self.exp.session_id
self._save_local(data)
return data
else:
return None

def release(self):
if self.saving_method == "mongo":
self._release_mongo()
data = self._release_mongo()
elif self.saving_method == "local":
self._release_local()
data = self._release_local()

self.group.data.busy = False
return data

def _release_mongo(self):
self.db.find_one_and_update(self.query, {"$set": {"busy": False}})
data = self.db.find_one_and_update(
filter=self.query,
update={"$set": {"busy": "false"}},
return_document=ReturnDocument.AFTER,
)
return data

def _release_local(self):
data = self._load_local()
data["busy"] = False
data["busy"] = "false"
self._save_local(data)
return data


class GroupRoles(GroupHelper):
Expand Down Expand Up @@ -751,7 +758,12 @@ def __exit__(self, exc_type, exc_value, tb):
self.data.active = False
self.io.save()

self.io.release()
data = self.io.release()

if not data:
self.exp.log.debug(
f"{self} seems to be busy. GroupIO.release() returned {data}"
)


class GroupManager:
Expand Down
95 changes: 51 additions & 44 deletions src/alfred3_interact/match.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from alfred3_interact.group import GroupManager

from ._util import MatchingError, NoMatch, saving_method
from ._util import MatchingError, MatchMakerBusy, NoMatch, saving_method
from .group import Group
from .member import GroupMember, MemberManager
from .quota import MetaQuota
Expand All @@ -29,7 +29,7 @@ class MatchMakerData:
matchmaker_id: str
type: str
members: dict = field(default_factory=dict)
busy: bool = False
busy: str = "false"
active: bool = False
ping_timeout: int = None

Expand Down Expand Up @@ -90,12 +90,27 @@ def load_markbusy(self) -> MatchMakerData:
def release(self) -> MatchMakerData:
"""
Releases MatchMakerData from a 'busy' state.
Happens only in groupwise matching, so there is only a mongoDB
version of this one.
"""
self.mm.busy = False
if saving_method(self.mm.exp) == "mongo":
return self._release_mongo()
elif saving_method(self.mm.exp) == "local":
return self._release_local()

def _release_mongo(self):
q = copy.copy(self.query)
q["busy"] = True
self.db.find_one_and_update(q, {"$set": {"busy": False}})
q["busy"] = self.mm.exp.session_id
return self.db.find_one_and_update(
filter=q,
update={"$set": {"busy": "false"}},
return_document=ReturnDocument.AFTER,
)

def _release_local(self):
data = self._load_local()
data.busy = "false"
self._save_local(data)
return data

def _save_mongo(self, data: MatchMakerData):
self.db.find_one_and_replace(self.query, asdict(data))
Expand Down Expand Up @@ -148,36 +163,30 @@ def _load_local(self) -> MatchMakerData:

def _load_markbusy_local(self):
data = self._load_local()
if not data.busy:
data.busy = True
if data.busy == "false":
data.busy = self.mm.exp.session_id
self._save_local(data)
return data
else:
return None

def _load_markbusy_mongo(self):
q = self.query
q["busy"] = False
data = self.db.find_one_and_update(q, {"$set": {"busy": True}})
q["busy"] = "false"

data = self.db.find_one_and_update(
filter=q,
update={"$set": {"busy": self.mm.exp.session_id}},
projection={"_id": False},
return_document=ReturnDocument.AFTER,
)

if data is not None:
self.mm.exp.log.debug(
f"Found non-busy MatchMaker dataset. Timestamp: {time.time()}"
)
data.pop("_id", None)
data["busy"] = True
return MatchMakerData(**data)
else:
self.mm.exp.log.debug(
f"Did NOT find non-busy MatchMaker dataset. Timestamp: {time.time()}"
)
return None

def __enter__(self):
self.mm.exp.log.debug(
"Entering MatchMakerIO Trying to load MatchMakerData. Timestamp:"
f" {time.time()}"
)
return self.load_markbusy()

def __exit__(self, exc_type, exc_value, traceback):
Expand All @@ -192,12 +201,15 @@ def __exit__(self, exc_type, exc_value, traceback):
f" responsible member {self.mm.member} and released the lock.\n{tb}"
)

self.mm.exp.log.debug(
f"Releasing MatchMakerData lock. Timestamp: {time.time()}"
)
self.release()
self.mm.exp.log.debug(f"MatchMakerData lock released. Timestamp: {time.time()}")
data = self.release()

if not data:
self.mm.exp.log.debug(
f"MatchMaker seems to be busy. MatchMakerIO.release() returned {data}"
)
raise MatchMakerBusy

self.mm.exp.log.debug(f"MatchMakerData lock released. Timestamp: {time.time()}")
self.mm._data = self.load()


Expand Down Expand Up @@ -542,14 +554,7 @@ def on_first_show(self):
self.member = self._init_member()

if self.member.matched:
self.exp.log.debug(
"Member is matched. Retrieving group. This call is logged BEFORE."
)
group = self._get_group(self.member)
self.exp.log.debug(
f"Member is matched. Returning group {group}. This call is logged"
" AFTER."
)
return group

self.member.io.ping()
Expand All @@ -568,14 +573,7 @@ def on_first_show(self):

if enough_members or waited_enough:
random.shuffle(self.groupspecs)
self.exp.log.debug(
"Member is not matched. Calling _match_quota. This call is logged"
" BEFORE."
)
group = self._match_quota(self.groupspecs)
self.exp.log.debug(
f"Match completed. Returning group {group}. This call is logged AFTER."
)
return group

raise NoMatch
Expand Down Expand Up @@ -873,18 +871,27 @@ def _validate_specs(self, specs):
return specs

def _init_member(self) -> GroupMember:

if self.member:
self.member.io.load()

return self.member

member = GroupMember(self)
member.io.save()
return member
with self.io as data:

if data is not None:
member = GroupMember(self)
member.io.save()

return member

raise MatchMakerBusy

def _update_additional_data(self):
prefix = "interact"
while prefix in self.exp.adata:
prefix = "_" + prefix
self.exp.adata[prefix] = {}
self.exp.adata[prefix]["groupid"] = self.group.data.group_id
self.exp.adata[prefix]["spec_name"] = self.group.data.spec_name
self.exp.adata[prefix]["role"] = self.member.data.role
12 changes: 11 additions & 1 deletion src/alfred3_interact/member.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from alfred3.data_manager import DataManager as dm
from alfred3.quota import SessionGroup
from pymongo.collection import ReturnDocument

from ._util import saving_method

Expand Down Expand Up @@ -62,6 +63,7 @@ def load(self):
data = self._load_local()

data = data["members"][self.sid]

self.member.data = GroupMemberData(**data)
return

Expand Down Expand Up @@ -102,7 +104,15 @@ def _save_local(self):

def _save_mongo(self):
data = {"members": {self.sid: asdict(self.member.data)}}
self.db.find_one_and_update(self.query, [{"$set": data}])

from pprint import pprint

data_before = self.db.find_one(self.query)
self.exp.log.debug(f"MEBER DATA BEFORE: {pprint(data_before)}")
data_after = self.db.find_one_and_update(
self.query, [{"$set": data}], return_document=ReturnDocument.AFTER
)
self.exp.log.debug(f"MEBER DATA AFTER: {pprint(data_after)}")

def ping(self):
if saving_method(self.exp) == "local":
Expand Down
4 changes: 2 additions & 2 deletions src/alfred3_interact/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from alfred3._helper import inherit_kwargs
from alfred3.element.misc import RepeatedCallback

from ._util import NoMatch
from ._util import MatchMakerBusy, NoMatch
from .element import ToggleMatchMakerActivation, ViewMembers


Expand Down Expand Up @@ -467,7 +467,7 @@ def _wait_for(self) -> bool:
try:
wait_status = self.wait_for() # can return None

except NoMatch:
except (NoMatch, MatchMakerBusy):
return False # return False so that the repeated callback will try again

except Exception:
Expand Down
11 changes: 3 additions & 8 deletions src/alfred3_interact/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,17 @@ def start_group(self, data, waiting_members: t.List[GroupMember]) -> Group:
group.io.save()
self.mm.io.save(data=data)

self.log.info(f"Group {group} filled. Returning group")
self.log.info(f"{group} filled. Returning group")
return group

def get_group(self) -> Group:
member = self.mm.member
member.io.load()

if member.status.matched:
group = self.group_manager.find_one(member.data.group_id)
self.mm.exp.log.debug(
f"We are in ParallelMatchMaker.get_group(). Found group {group}."
" Returning."
)

return group
self.mm.exp.log.debug(
"We are in ParallelMatchMaker.get_group(). Found NO group. Returning."
)


class Spec(ABC):
Expand Down
Loading