-
-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lock-based thread safety #92
Conversation
crusaderky
commented
Mar 25, 2023
•
edited
Loading
edited
- Partially closes [WIP] Asynchronous SpillBuffer distributed#7686
- The whole lockless thread safety design from Partial thread-safety #82 and Enhanced thread-safety in zict.File #90 was extremely brittle. This PR swaps it for an industry-standard RLock based design.
- Full thread safety for all classes (except Zip and LMDB), with much less caveats than before
- Follow-up: get(), pop(), popitem(), and setdefault() are not thread-safe #99
with self._lock: | ||
discard(self.slow, key) | ||
if key in self._cancel_restore: | ||
self._cancel_restore[key] = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annoyingly I can't just call
self.set_noevict(key, value)
self.fast.evict_until_below_capacity()
due to the bespoke exception handling in LRU.__setitem__
.
I'll clean this up once distributed has switched to async mode.
|
||
def __init__( | ||
self, | ||
n: float, | ||
d: MutableMapping[KT, VT], | ||
*, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very small API breakage; I doubt anybody will mind
98ca2c4
to
87c8572
Compare
526dc10
to
bf4174a
Compare
f9b559a
to
981f796
Compare
@milesgranger this is ready for review. |
|
||
# How many times to repeat non-deterministic stress tests. | ||
# You may set it as high as 50 if you wish to run in CI. | ||
REPEAT_STRESS_TESTS = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran it 100 times just before review; all green.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty darn good to me; just some nits and clarifications about implementation. Specifically why we still need to account for potential state mgmt caused from the temporary releases of the locks and if it's worth releasing those locks during those points.
However it seems well tested and thought out, so feel free to just comment and move along with approval; I have nothing which warrants holding you up here. :)
zict/common.py
Outdated
|
||
def locked(func: Callable[P, VT]) -> Callable[P, VT]: | ||
"""Decorator for a method of ZictBase, which wraps the whole method in a | ||
mapping-global rlock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Not a global lock, right? One lock per instance of ZictBase
it appears.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, per-instance.
gen += 1 | ||
self._last_updated[key] = self._gen = gen | ||
|
||
with self.unlock(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we comment here why this benefits from being unlocked? It seems like some of the second condition below can be removed if this were left locked. Assume there is a reason for the trade-off though; to allow another thread to make progress? If so, I'm curious if it's worth the tradeoff for added complexity of checking / correcting state (second conditional call to discard(self.cache, key)
) in that same second conditional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's explained in the notes at the top of every class.
These methods can take arbitrarily long to run, so two threads should be able to run in unison on the same mapping while they're running:
Buffer.slow.__setitem__
Buffer.slow.__getitem__
LRU.on_evict
Cache.data.__setitem__
Cache.data.__getitem__
File.__setitem__
(specifically fh.write / fh.writelines; not the other syscalls)File.__getitem__
(specifically fh.readinto; not the other syscalls)Sieve.mappings[*].__setitem__
Sieve.mappings[*].__getitem__
All other methods are expected to be fast, which should be intended as it's OK for a thread that must not be busy (read: the Worker's event loop) to lock waiting for them to finish.
fn = self._safe_key(key) | ||
with open(os.path.join(self.directory, fn), "wb") as fh: | ||
with open(os.path.join(self.directory, fn), "wb") as fh, self.unlock(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why writing files is considered 'safe' to release the lock but deleting it is not (see delitem)? Seems mutation of files ought to (probably) be locked, or consistently unlocked and left to user to worry about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not mutating files though. We're always creating a new one. If two threads call __setitem__
on the same key, they'll end up writing to two different files.
I just noticed that I forgot about a race condition there though - one of the two files would remain there indefinitely, littering the hard drive. Fixed.
It's not that deleting files is unsafe - it's that os.remove
is expected to be fast (see definition above) so I chose to keep the method simple. For the record, dict.pop
(first line of __delete__
) is not thread-safe and would need to be wrapped in a lock.
with self.unlock(): | ||
mapping[key] = value | ||
|
||
if gen != self.gen and self.key_to_mapping.get(key) is not mapping: | ||
# Multithreaded race condition | ||
discard(mapping, key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is a similar clarification; is the tradeoff worth potentially corrupting the expected state and trying to rectify it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mappings in Sieve can be slow. Realistic example: we have appetite to change the current distributed.spill.SpillBuffer.slow
from
slow = zict.Func(dumps, loads, zict.File(local_directory)
to
def selector(k, v):
if isinstance(v, (pandas.DataFrame, pandas.Series)):
return "pandas"
else:
return "generic"
slow = zict.Sieve(
{
"pandas": zict.ParquetFile(local_directory),
"generic": zict.Func(dumps, loads, zict.File(local_directory),
},
selector=selector,
)
Thanks for the clarifications @crusaderky 😃 |