Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock-based thread safety #92

Merged
merged 2 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ Changelog
(:pr:`78`) `Guido Imperiale`_
- ``LMDB`` now uses memory-mapped I/O on MacOSX and is usable on Windows.
(:pr:`78`) `Guido Imperiale`_
- The library is now partially thread-safe.
(:pr:`82`, :pr:`90`, :pr:`93`) `Guido Imperiale`_
- The library is now almost completely thread-safe.
(:pr:`82`, :pr:`90`, :pr:`92`, :pr:`93`) `Guido Imperiale`_
- :class:`LRU` and :class:`Buffer` now support delayed eviction.
New objects :class:`Accumulator` and :class:`InsertionSortedSet`.
New object :class:`InsertionSortedSet`.
(:pr:`87`) `Guido Imperiale`_
- All mappings now return proper KeysView, ItemsView, and ValuesView objects from their
keys(), items(), and values() methods (:pr:`93`) `Guido Imperiale`_
Expand Down
6 changes: 2 additions & 4 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ zlib-compressed, directory of files.

Thread-safety
-------------
This library is only partially thread-safe. Refer to the documentation of the individual
mappings for details.
Most classes in this library are thread-safe.
Refer to the documentation of the individual mappings for exceptions.

API
---
Expand All @@ -64,8 +64,6 @@ API

Additionally, **zict** makes available the following general-purpose objects:

.. autoclass:: Accumulator
:members:
.. autoclass:: InsertionSortedSet
:members:
.. autoclass:: WeakValueMapping
Expand Down
6 changes: 3 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ addopts =
# 'thread' kills off the whole test suite. 'signal' only kills the offending test.
# However, 'signal' doesn't work on Windows (due to lack of SIGALRM).
timeout_method = thread
# This should not be reduced; Windows CI has been observed to be occasionally
# exceptionally slow.
timeout = 300
timeout = 180
markers =
stress: slow-running stress test with a random component. Pass --stress <n> to change number of reruns.


[isort]
Expand Down
1 change: 0 additions & 1 deletion zict/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from zict.lmdb import LMDB as LMDB
from zict.lru import LRU as LRU
from zict.sieve import Sieve as Sieve
from zict.utils import Accumulator as Accumulator
from zict.utils import InsertionSortedSet as InsertionSortedSet
from zict.zip import Zip as Zip

Expand Down
91 changes: 71 additions & 20 deletions zict/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
ValuesView,
)

from zict.common import KT, VT, ZictBase, close, flush
from zict.common import KT, VT, ZictBase, close, discard, flush, locked
from zict.lru import LRU


Expand Down Expand Up @@ -37,8 +37,10 @@ class Buffer(ZictBase[KT, VT]):

Notes
-----
``__contains__`` and ``__len__`` are thread-safe if the same methods on both
``fast`` and ``slow`` are thread-safe. All other methods are not thread-safe.
If you call methods of this class from multiple threads, access will be fast as long
as all methods of ``fast``, plus ``slow.__contains__`` and ``slow.__delitem__``, are
fast. ``slow.__getitem__``, ``slow.__setitem__`` and callbacks are not protected
by locks.

Examples
--------
Expand All @@ -58,6 +60,7 @@ class Buffer(ZictBase[KT, VT]):
weight: Callable[[KT, VT], float]
fast_to_slow_callbacks: list[Callable[[KT, VT], None]]
slow_to_fast_callbacks: list[Callable[[KT, VT], None]]
_cancel_restore: dict[KT, bool]

def __init__(
self,
Expand All @@ -72,7 +75,14 @@ def __init__(
| list[Callable[[KT, VT], None]]
| None = None,
):
self.fast = LRU(n, fast, weight=weight, on_evict=[self.fast_to_slow])
super().__init__()
self.fast = LRU(
n,
fast,
weight=weight,
on_evict=[self.fast_to_slow],
on_cancel_evict=[self._cancel_evict],
)
self.slow = slow
self.weight = weight
if callable(fast_to_slow_callbacks):
Expand All @@ -81,6 +91,7 @@ def __init__(
slow_to_fast_callbacks = [slow_to_fast_callbacks]
self.fast_to_slow_callbacks = fast_to_slow_callbacks or []
self.slow_to_fast_callbacks = slow_to_fast_callbacks or []
self._cancel_restore = {}

@property
def n(self) -> float:
Expand All @@ -98,56 +109,96 @@ def fast_to_slow(self, key: KT, value: VT) -> None:
raise

def slow_to_fast(self, key: KT) -> VT:
value = self.slow[key]
self._cancel_restore[key] = False
try:
with self.unlock():
value = self.slow[key]
if self._cancel_restore[key]:
raise KeyError(key)
finally:
del self._cancel_restore[key]

# Avoid useless movement for heavy values
w = self.weight(key, value)
if w <= self.n:
# Multithreaded edge case:
# - Thread 1 starts slow_to_fast(x) and puts it at the top of fast
# - This causes the eviction of older key(s)
# - While thread 1 is evicting older keys, thread 2 is loading fast with
# set_noevict()
# - By the time the eviction of the older key(s) is done, there is
# enough weight in fast that thread 1 will spill x
# - If the below code was just `self.fast[key] = value; del
# self.slow[key]` now the key would be in neither slow nor fast!
self.fast.set_noevict(key, value)
del self.slow[key]
self.fast[key] = value
for cb in self.slow_to_fast_callbacks:
cb(key, value)

with self.unlock():
self.fast.evict_until_below_target()
for cb in self.slow_to_fast_callbacks:
cb(key, value)

return value

@locked
def __getitem__(self, key: KT) -> VT:
try:
return self.fast[key]
except KeyError:
return self.slow_to_fast(key)

def __setitem__(self, key: KT, value: VT) -> None:
try:
del self.slow[key]
except KeyError:
pass
# This may trigger an eviction from fast to slow of older keys.
# If the weight is individually greater than n, then key/value will be stored
# into self.slow instead (see LRU.__setitem__).
with self.lock:
discard(self.slow, key)
if key in self._cancel_restore:
self._cancel_restore[key] = True
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annoyingly I can't just call

self.set_noevict(key, value)
self.fast.evict_until_below_capacity()

due to the bespoke exception handling in LRU.__setitem__.
I'll clean this up once distributed has switched to async mode.

self.fast[key] = value

@locked
def set_noevict(self, key: KT, value: VT) -> None:
"""Variant of ``__setitem__`` that does not move keys from fast to slow if the
total weight exceeds n
"""
try:
del self.slow[key]
except KeyError:
pass
discard(self.slow, key)
if key in self._cancel_restore:
self._cancel_restore[key] = True
self.fast.set_noevict(key, value)

def evict_until_below_target(self, n: float | None = None) -> None:
"""Wrapper around :meth:`zict.LRU.evict_until_below_target`.
Presented here to allow easier overriding.
"""
self.fast.evict_until_below_target(n)

@locked
def __delitem__(self, key: KT) -> None:
if key in self._cancel_restore:
self._cancel_restore[key] = True
try:
del self.fast[key]
except KeyError:
del self.slow[key]

@locked
def _cancel_evict(self, key: KT, value: VT) -> None:
discard(self.slow, key)

def values(self) -> ValuesView[VT]:
return BufferValuesView(self)

def items(self) -> ItemsView[KT, VT]:
return BufferItemsView(self)

def __len__(self) -> int:
return len(self.fast) + len(self.slow)
with self.lock, self.fast.lock:
return (
len(self.fast)
+ len(self.slow)
- sum(
k in self.fast and k in self.slow
for k in chain(self._cancel_restore, self.fast._cancel_evict)
)
)

def __iter__(self) -> Iterator[KT]:
"""Make sure that the iteration is not disrupted if you evict/restore a key in
Expand Down
73 changes: 51 additions & 22 deletions zict/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections.abc import Iterator, MutableMapping
from typing import TYPE_CHECKING

from zict.common import KT, VT, ZictBase, close, flush
from zict.common import KT, VT, ZictBase, close, discard, flush, locked


class Cache(ZictBase[KT, VT]):
Expand All @@ -24,11 +24,9 @@ class Cache(ZictBase[KT, VT]):

Notes
-----
All methods are thread-safe if all methods on both ``data`` and ``cache`` are
thread-safe; however, only one thread can call ``__setitem__`` and ``__delitem__``
at any given time.
``__contains__`` and ``__len__`` are thread-safe if the same methods on ``data`` are
thread-safe.
If you call methods of this class from multiple threads, access will be fast as long
as all methods of ``cache``, plus ``data.__delitem__``, are fast. Other methods of
``data`` are not protected by locks.

Examples
--------
Expand All @@ -44,44 +42,75 @@ class Cache(ZictBase[KT, VT]):
data: MutableMapping[KT, VT]
cache: MutableMapping[KT, VT]
update_on_set: bool
_gen: int
_last_updated: dict[KT, int]

def __init__(
self,
data: MutableMapping[KT, VT],
cache: MutableMapping[KT, VT],
update_on_set: bool = True,
):
super().__init__()
self.data = data
self.cache = cache
self.update_on_set = update_on_set
self._gen = 0
self._last_updated = {}

@locked
def __getitem__(self, key: KT) -> VT:
try:
return self.cache[key]
except KeyError:
pass
value = self.data[key]
self.cache[key] = value
return value
gen = self._last_updated[key]

def __setitem__(self, key: KT, value: VT) -> None:
# If the item was already in cache and data.__setitem__ fails, e.g. because it's
# a File and the disk is full, make sure that the cache is invalidated.
try:
del self.cache[key]
except KeyError:
pass
with self.unlock():
value = self.data[key]

self.data[key] = value
if self.update_on_set:
# Could another thread have called __setitem__ or __delitem__ on the
# same key in the meantime? If not, update the cache
if gen == self._last_updated.get(key):
self.cache[key] = value
self._last_updated[key] += 1
return value

@locked
def __setitem__(self, key: KT, value: VT) -> None:
# If the item was already in cache and data.__setitem__ fails, e.g. because
# it's a File and the disk is full, make sure that the cache is invalidated.
discard(self.cache, key)
gen = self._gen
gen += 1
self._last_updated[key] = self._gen = gen

with self.unlock():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we comment here why this benefits from being unlocked? It seems like some of the second condition below can be removed if this were left locked. Assume there is a reason for the trade-off though; to allow another thread to make progress? If so, I'm curious if it's worth the tradeoff for added complexity of checking / correcting state (second conditional call to discard(self.cache, key)) in that same second conditional.

Copy link
Collaborator Author

@crusaderky crusaderky Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's explained in the notes at the top of every class.
These methods can take arbitrarily long to run, so two threads should be able to run in unison on the same mapping while they're running:

  • Buffer.slow.__setitem__
  • Buffer.slow.__getitem__
  • LRU.on_evict
  • Cache.data.__setitem__
  • Cache.data.__getitem__
  • File.__setitem__ (specifically fh.write / fh.writelines; not the other syscalls)
  • File.__getitem__ (specifically fh.readinto; not the other syscalls)
  • Sieve.mappings[*].__setitem__
  • Sieve.mappings[*].__getitem__

All other methods are expected to be fast, which should be intended as it's OK for a thread that must not be busy (read: the Worker's event loop) to lock waiting for them to finish.

self.data[key] = value

if key not in self._last_updated:
# Another thread called __delitem__ in the meantime
discard(self.data, key)
elif gen != self._last_updated[key]:
# Another thread called __setitem__ in the meantime. We have no idea which
# of the two ended up actually setting self.data.
# Case 1: the other thread did not enter this locked code block yet.
# Prevent it from setting the cache.
self._last_updated[key] += 1
# Case 2: the other thread already exited this locked code block and set the
# cache. Invalidate it.
discard(self.cache, key)
else:
# No race condition
self._last_updated[key] += 1
if self.update_on_set:
self.cache[key] = value

@locked
def __delitem__(self, key: KT) -> None:
try:
del self.cache[key]
except KeyError:
pass
del self.data[key]
del self._last_updated[key]
discard(self.cache, key)

def __len__(self) -> int:
return len(self.data)
Expand Down
Loading