Skip to content

Commit

Permalink
Lock-based thread safety
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 31, 2023
1 parent fea097d commit 981f796
Show file tree
Hide file tree
Showing 23 changed files with 707 additions and 387 deletions.
6 changes: 3 additions & 3 deletions doc/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ Changelog
(:pr:`78`) `Guido Imperiale`_
- ``LMDB`` now uses memory-mapped I/O on MacOSX and is usable on Windows.
(:pr:`78`) `Guido Imperiale`_
- The library is now partially thread-safe.
(:pr:`82`, :pr:`90`, :pr:`93`) `Guido Imperiale`_
- The library is now almost completely thread-safe.
(:pr:`82`, :pr:`90`, :pr:`92`, :pr:`93`) `Guido Imperiale`_
- :class:`LRU` and :class:`Buffer` now support delayed eviction.
New objects :class:`Accumulator` and :class:`InsertionSortedSet`.
New object :class:`InsertionSortedSet`.
(:pr:`87`) `Guido Imperiale`_
- All mappings now return proper KeysView, ItemsView, and ValuesView objects from their
keys(), items(), and values() methods (:pr:`93`) `Guido Imperiale`_
Expand Down
6 changes: 2 additions & 4 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ zlib-compressed, directory of files.
Thread-safety
-------------
This library is only partially thread-safe. Refer to the documentation of the individual
mappings for details.
Most classes in this library are thread-safe.
Refer to the documentation of the individual mappings for exceptions.

API
---
Expand All @@ -64,8 +64,6 @@ API

Additionally, **zict** makes available the following general-purpose objects:

.. autoclass:: Accumulator
:members:
.. autoclass:: InsertionSortedSet
:members:
.. autoclass:: WeakValueMapping
Expand Down
6 changes: 3 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ addopts =
# 'thread' kills off the whole test suite. 'signal' only kills the offending test.
# However, 'signal' doesn't work on Windows (due to lack of SIGALRM).
timeout_method = thread
# This should not be reduced; Windows CI has been observed to be occasionally
# exceptionally slow.
timeout = 300
timeout = 30
markers =
stress: slow-running stress test with a random component. Pass --stress <n> to change number of reruns.


[isort]
Expand Down
1 change: 0 additions & 1 deletion zict/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from zict.lmdb import LMDB as LMDB
from zict.lru import LRU as LRU
from zict.sieve import Sieve as Sieve
from zict.utils import Accumulator as Accumulator
from zict.utils import InsertionSortedSet as InsertionSortedSet
from zict.zip import Zip as Zip

Expand Down
91 changes: 71 additions & 20 deletions zict/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
ValuesView,
)

from zict.common import KT, VT, ZictBase, close, flush
from zict.common import KT, VT, ZictBase, close, discard, flush, locked
from zict.lru import LRU


Expand Down Expand Up @@ -37,8 +37,10 @@ class Buffer(ZictBase[KT, VT]):
Notes
-----
``__contains__`` and ``__len__`` are thread-safe if the same methods on both
``fast`` and ``slow`` are thread-safe. All other methods are not thread-safe.
If you call methods of this class from multiple threads, access will be fast as long
as all methods of ``fast``, plus ``slow.__contains__`` and ``slow.__delitem__``, are
fast. ``slow.__getitem__``, ``slow.__setitem__`` and callbacks are not protected
by locks.
Examples
--------
Expand All @@ -58,6 +60,7 @@ class Buffer(ZictBase[KT, VT]):
weight: Callable[[KT, VT], float]
fast_to_slow_callbacks: list[Callable[[KT, VT], None]]
slow_to_fast_callbacks: list[Callable[[KT, VT], None]]
_cancel_restore: dict[KT, bool]

def __init__(
self,
Expand All @@ -72,7 +75,14 @@ def __init__(
| list[Callable[[KT, VT], None]]
| None = None,
):
self.fast = LRU(n, fast, weight=weight, on_evict=[self.fast_to_slow])
super().__init__()
self.fast = LRU(
n,
fast,
weight=weight,
on_evict=[self.fast_to_slow],
on_cancel_evict=[self._cancel_evict],
)
self.slow = slow
self.weight = weight
if callable(fast_to_slow_callbacks):
Expand All @@ -81,6 +91,7 @@ def __init__(
slow_to_fast_callbacks = [slow_to_fast_callbacks]
self.fast_to_slow_callbacks = fast_to_slow_callbacks or []
self.slow_to_fast_callbacks = slow_to_fast_callbacks or []
self._cancel_restore = {}

@property
def n(self) -> float:
Expand All @@ -98,56 +109,96 @@ def fast_to_slow(self, key: KT, value: VT) -> None:
raise

def slow_to_fast(self, key: KT) -> VT:
value = self.slow[key]
self._cancel_restore[key] = False
try:
with self.unlock():
value = self.slow[key]
if self._cancel_restore[key]:
raise KeyError(key)
finally:
self._cancel_restore.pop(key)

# Avoid useless movement for heavy values
w = self.weight(key, value)
if w <= self.n:
# Multithreaded edge case:
# - Thread 1 starts slow_to_fast(x) and puts it at the top of fast
# - This causes the eviction of older key(s)
# - While thread 1 is evicting older keys, thread 2 is loading fast with
# set_noevict()
# - By the time the eviction of the older key(s) is done, there is
# enough weight in fast that thread 1 will spill x
# - If the below code was just `self.fast[key] = value; del
# self.slow[key]` now the key would be in neither slow nor fast!
self.fast.set_noevict(key, value)
del self.slow[key]
self.fast[key] = value
for cb in self.slow_to_fast_callbacks:
cb(key, value)

with self.unlock():
self.fast.evict_until_below_target()
for cb in self.slow_to_fast_callbacks:
cb(key, value)

return value

@locked
def __getitem__(self, key: KT) -> VT:
try:
return self.fast[key]
except KeyError:
return self.slow_to_fast(key)

def __setitem__(self, key: KT, value: VT) -> None:
try:
del self.slow[key]
except KeyError:
pass
# This may trigger an eviction from fast to slow of older keys.
# If the weight is individually greater than n, then key/value will be stored
# into self.slow instead (see LRU.__setitem__).
with self.lock:
discard(self.slow, key)
if key in self._cancel_restore:
self._cancel_restore[key] = True
self.fast[key] = value

@locked
def set_noevict(self, key: KT, value: VT) -> None:
"""Variant of ``__setitem__`` that does not move keys from fast to slow if the
total weight exceeds n
"""
try:
del self.slow[key]
except KeyError:
pass
discard(self.slow, key)
if key in self._cancel_restore:
self._cancel_restore[key] = True
self.fast.set_noevict(key, value)

def evict_until_below_target(self, n: float | None = None) -> None:
"""Wrapper around :meth:`zict.LRU.evict_until_below_target`.
Presented here to allow easier overriding.
"""
self.fast.evict_until_below_target(n)

@locked
def __delitem__(self, key: KT) -> None:
if key in self._cancel_restore:
self._cancel_restore[key] = True
try:
del self.fast[key]
except KeyError:
del self.slow[key]

@locked
def _cancel_evict(self, key: KT, value: VT) -> None:
discard(self.slow, key)

def values(self) -> ValuesView[VT]:
return BufferValuesView(self)

def items(self) -> ItemsView[KT, VT]:
return BufferItemsView(self)

def __len__(self) -> int:
return len(self.fast) + len(self.slow)
with self.lock, self.fast.lock:
return (
len(self.fast)
+ len(self.slow)
- sum(
k in self.fast and k in self.slow
for k in chain(self._cancel_restore, self.fast._cancel_evict)
)
)

def __iter__(self) -> Iterator[KT]:
"""Make sure that the iteration is not disrupted if you evict/restore a key in
Expand Down
48 changes: 30 additions & 18 deletions zict/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections.abc import Iterator, MutableMapping
from typing import TYPE_CHECKING

from zict.common import KT, VT, ZictBase, close, flush
from zict.common import KT, VT, ZictBase, close, discard, flush


class Cache(ZictBase[KT, VT]):
Expand All @@ -24,11 +24,9 @@ class Cache(ZictBase[KT, VT]):
Notes
-----
All methods are thread-safe if all methods on both ``data`` and ``cache`` are
thread-safe; however, only one thread can call ``__setitem__`` and ``__delitem__``
at any given time.
``__contains__`` and ``__len__`` are thread-safe if the same methods on ``data`` are
thread-safe.
If you call methods of this class from multiple threads, access will be fast as long
as all methods of ``cache`` are fast. Methods of ``data`` are not protected by
locks.
Examples
--------
Expand All @@ -43,6 +41,7 @@ class Cache(ZictBase[KT, VT]):

data: MutableMapping[KT, VT]
cache: MutableMapping[KT, VT]
gen: int
update_on_set: bool

def __init__(
Expand All @@ -51,36 +50,49 @@ def __init__(
cache: MutableMapping[KT, VT],
update_on_set: bool = True,
):
super().__init__()
self.data = data
self.cache = cache
self.gen = 0
self.update_on_set = update_on_set

def __getitem__(self, key: KT) -> VT:
try:
return self.cache[key]
except KeyError:
pass

gen = self.gen
value = self.data[key]
self.cache[key] = value
# Could another thread have called __setitem__ or __delitem__ on the
# same key in the meantime? If not, update the cache
with self.lock:
if gen == self.gen:
self.cache[key] = value
return value

def __setitem__(self, key: KT, value: VT) -> None:
# If the item was already in cache and data.__setitem__ fails, e.g. because it's
# a File and the disk is full, make sure that the cache is invalidated.
try:
del self.cache[key]
except KeyError:
pass
with self.lock:
# If the item was already in cache and data.__setitem__ fails, e.g. because
# it's a File and the disk is full, make sure that the cache is invalidated.
discard(self.cache, key)
self.gen += 1
gen = self.gen

self.data[key] = value

if self.update_on_set:
self.cache[key] = value
# Could another thread have called __setitem__ or __delitem__ on the
# same key in the meantime? If not, update the cache
with self.lock:
if gen == self.gen:
self.cache[key] = value

def __delitem__(self, key: KT) -> None:
try:
del self.cache[key]
except KeyError:
pass
with self.lock:
discard(self.cache, key)
self.gen += 1

del self.data[key]

def __len__(self) -> int:
Expand Down
Loading

0 comments on commit 981f796

Please sign in to comment.