From 7f8293b10404df9416b8b60b6a6f113bd6d95ce1 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 20 Apr 2023 22:59:34 +0100 Subject: [PATCH] Buffer(keep_slow=True) --- doc/source/changelog.rst | 3 + zict/buffer.py | 107 +++++++++++------------------ zict/lru.py | 20 +++--- zict/tests/test_async_buffer.py | 6 +- zict/tests/test_buffer.py | 115 ++++++++++++++++++++++++++++++-- zict/tests/utils_test.py | 4 +- 6 files changed, 165 insertions(+), 90 deletions(-) diff --git a/doc/source/changelog.rst b/doc/source/changelog.rst index a9499c2..3b2a915 100644 --- a/doc/source/changelog.rst +++ b/doc/source/changelog.rst @@ -7,6 +7,9 @@ Changelog - New method ``File.link()``, which acquires a file-based key from another source (e.g. a different memory-mapped File object) (:pr:`80`) `Guido Imperiale`_ +- ``Buffer`` has gained the option to preserve keys in ``slow`` when they are + moved back to ``fast`` + (:pr:`80`) `Guido Imperiale`_ 3.0.0 - 2023-04-17 diff --git a/zict/buffer.py b/zict/buffer.py index 720dd3f..d3dd789 100644 --- a/zict/buffer.py +++ b/zict/buffer.py @@ -1,14 +1,10 @@ from __future__ import annotations from collections.abc import Callable, Iterator, MutableMapping -from itertools import chain -from typing import ( # TODO import from collections.abc (needs Python >=3.9) - ItemsView, - ValuesView, -) from zict.common import KT, VT, ZictBase, close, discard, flush, locked from zict.lru import LRU +from zict.utils import InsertionSortedSet class Buffer(ZictBase[KT, VT]): @@ -34,6 +30,11 @@ class Buffer(ZictBase[KT, VT]): storing to disk and raised a disk full error) the key will remain in the LRU. slow_to_fast_callbacks: list of callables These functions run every time data moves form the slow to the fast mapping. + keep_slow: bool, optional + If False (default), delete key/value pairs in slow when they are moved back to + fast. + If True, keep them in slow until deleted; this will avoid repeating the fast to + slow transition when they are evicted again, but at the cost of duplication. Notes ----- @@ -60,7 +61,9 @@ class Buffer(ZictBase[KT, VT]): weight: Callable[[KT, VT], float] fast_to_slow_callbacks: list[Callable[[KT, VT], None]] slow_to_fast_callbacks: list[Callable[[KT, VT], None]] + keep_slow: bool _cancel_restore: dict[KT, bool] + _keys: InsertionSortedSet[KT] def __init__( self, @@ -74,6 +77,7 @@ def __init__( slow_to_fast_callbacks: Callable[[KT, VT], None] | list[Callable[[KT, VT], None]] | None = None, + keep_slow: bool = False, ): super().__init__() self.fast = LRU( @@ -91,7 +95,9 @@ def __init__( slow_to_fast_callbacks = [slow_to_fast_callbacks] self.fast_to_slow_callbacks = fast_to_slow_callbacks or [] self.slow_to_fast_callbacks = slow_to_fast_callbacks or [] + self.keep_slow = keep_slow self._cancel_restore = {} + self._keys = InsertionSortedSet((*self.fast, *self.slow)) @property def n(self) -> float: @@ -136,6 +142,9 @@ def offset(self, value: float) -> None: self.fast.offset = value def fast_to_slow(self, key: KT, value: VT) -> None: + if self.keep_slow and key in self.slow: + return + self.slow[key] = value try: for cb in self.fast_to_slow_callbacks: @@ -169,7 +178,8 @@ def slow_to_fast(self, key: KT) -> VT: # - If the below code was just `self.fast[key] = value; del # self.slow[key]` now the key would be in neither slow nor fast! self.fast.set_noevict(key, value) - del self.slow[key] + if not self.keep_slow: + del self.slow[key] with self.unlock(): self.fast.evict_until_below_target() @@ -180,27 +190,31 @@ def slow_to_fast(self, key: KT) -> VT: @locked def __getitem__(self, key: KT) -> VT: + if key not in self._keys: + raise KeyError(key) try: return self.fast[key] except KeyError: return self.slow_to_fast(key) def __setitem__(self, key: KT, value: VT) -> None: - with self.lock: - discard(self.slow, key) - if key in self._cancel_restore: - self._cancel_restore[key] = True - self.fast[key] = value + self.set_noevict(key, value) + try: + self.fast.evict_until_below_target() + except Exception: + self.fast._setitem_exception(key) + raise @locked def set_noevict(self, key: KT, value: VT) -> None: """Variant of ``__setitem__`` that does not move keys from fast to slow if the total weight exceeds n """ - discard(self.slow, key) + discard(self, key) if key in self._cancel_restore: self._cancel_restore[key] = True self.fast.set_noevict(key, value) + self._keys.add(key) def evict_until_below_target(self, n: float | None = None) -> None: """Wrapper around :meth:`zict.LRU.evict_until_below_target`. @@ -210,55 +224,32 @@ def evict_until_below_target(self, n: float | None = None) -> None: @locked def __delitem__(self, key: KT) -> None: + self._keys.remove(key) if key in self._cancel_restore: self._cancel_restore[key] = True - try: - del self.fast[key] - except KeyError: - del self.slow[key] + discard(self.fast, key) + discard(self.slow, key) @locked def _cancel_evict(self, key: KT, value: VT) -> None: discard(self.slow, key) - def values(self) -> ValuesView[VT]: - return BufferValuesView(self) - - def items(self) -> ItemsView[KT, VT]: - return BufferItemsView(self) - def __len__(self) -> int: - with self.lock, self.fast.lock: - return ( - len(self.fast) - + len(self.slow) - - sum( - k in self.fast and k in self.slow - for k in chain(self._cancel_restore, self.fast._cancel_evict) - ) - ) + return len(self._keys) def __iter__(self) -> Iterator[KT]: - """Make sure that the iteration is not disrupted if you evict/restore a key in - the middle of it - """ - seen = set() - while True: - try: - for d in (self.fast, self.slow): - for key in d: - if key not in seen: - seen.add(key) - yield key - return - except RuntimeError: - pass + return iter(self._keys) def __contains__(self, key: object) -> bool: - return key in self.fast or key in self.slow + return key in self._keys + @locked def __str__(self) -> str: - return f"Buffer<{self.fast}, {self.slow}>" + s = f"Buffer" __repr__ = __str__ @@ -267,25 +258,3 @@ def flush(self) -> None: def close(self) -> None: close(self.fast, self.slow) - - -class BufferItemsView(ItemsView[KT, VT]): - _mapping: Buffer # FIXME CPython implementation detail - __slots__ = () - - def __iter__(self) -> Iterator[tuple[KT, VT]]: - # Avoid changing the LRU - return chain(self._mapping.fast.items(), self._mapping.slow.items()) - - -class BufferValuesView(ValuesView[VT]): - _mapping: Buffer # FIXME CPython implementation detail - __slots__ = () - - def __contains__(self, value: object) -> bool: - # Avoid changing the LRU - return any(value == v for v in self) - - def __iter__(self) -> Iterator[VT]: - # Avoid changing the LRU - return chain(self._mapping.fast.values(), self._mapping.slow.values()) diff --git a/zict/lru.py b/zict/lru.py index c3f8b42..40b27da 100644 --- a/zict/lru.py +++ b/zict/lru.py @@ -137,16 +137,20 @@ def __setitem__(self, key: KT, value: VT) -> None: try: self.evict_until_below_target() except Exception: - if self.weights.get(key, 0) > self.n and key not in self.heavy: - # weight(value) > n and evicting the key we just inserted failed. - # Evict the rest of the LRU instead. - try: - while len(self.d) > 1: - self.evict() - except Exception: - pass + self._setitem_exception(key) raise + @locked + def _setitem_exception(self, key: KT) -> None: + if self.weights.get(key, 0) > self.n and key not in self.heavy: + # weight(value) > n and evicting the key we just inserted failed. + # Evict the rest of the LRU instead. + try: + while len(self.d) > 1: + self.evict() + except Exception: + pass + @locked def set_noevict(self, key: KT, value: VT) -> None: """Variant of ``__setitem__`` that does not evict if the total weight exceeds n. diff --git a/zict/tests/test_async_buffer.py b/zict/tests/test_async_buffer.py index 5165faf..2e73e0c 100644 --- a/zict/tests/test_async_buffer.py +++ b/zict/tests/test_async_buffer.py @@ -132,8 +132,7 @@ async def test_close_during_evict(check_thread_leaks): @pytest.mark.asyncio async def test_close_during_get(check_thread_leaks): - buff = AsyncBuffer({}, utils_test.SlowDict(0.01), n=100) - buff.slow.data.update({i: i for i in range(100)}) + buff = AsyncBuffer({}, utils_test.SlowDict(0.01, {i: i for i in range(100)}), n=100) assert len(buff) == 100 assert not buff.fast @@ -199,8 +198,7 @@ def __getitem__(self, key): time.sleep(0.01) return super().__getitem__(key) - with AsyncBuffer({}, Slow(), n=100) as buff: - buff.slow.update({i: i for i in range(100)}) + with AsyncBuffer({}, Slow({i: i for i in range(100)}), n=100) as buff: assert len(buff) == 100 future = buff.async_get(list(range(100)), missing=missing) diff --git a/zict/tests/test_buffer.py b/zict/tests/test_buffer.py index b5d23bd..191b26a 100644 --- a/zict/tests/test_buffer.py +++ b/zict/tests/test_buffer.py @@ -68,6 +68,98 @@ def test_simple(): assert buff["b"] == 1000 +def test_keep_slow(): + a = {} + b = {} + f2s = [] + s2f = [] + buff = Buffer( + a, + b, + n=10, + weight=lambda k, v: v, + keep_slow=True, + fast_to_slow_callbacks=lambda k, v: f2s.append(k), + slow_to_fast_callbacks=lambda k, v: s2f.append(k), + ) + + buff["x"] = 1 + buff["y"] = 2 + buff["z"] = 11 + buff.fast.evict() + assert a == {"y": 2} + assert b == {"x": 1, "z": 11} + assert f2s == ["z", "x"] + assert s2f == [] + assert buff.fast.total_weight == 2 + f2s.clear() + + assert buff["x"] == 1 # Get from slow + assert buff["x"] == 1 # It's in both + assert buff["z"] == 11 # Too large to stay in fast + assert a == {"x": 1, "y": 2} + assert b == {"x": 1, "z": 11} + assert f2s == [] + assert s2f == ["x", "z"] # x has been moved only once + assert buff.fast.total_weight == 3 + # Test no duplicates + assert len(buff) == 3 + assert list(buff) == list(buff.keys()) == ["x", "y", "z"] + assert list(buff.items()) == [("x", 1), ("y", 2), ("z", 11)] + assert list(buff.values()) == [1, 2, 11] + f2s.clear() + s2f.clear() + + assert ( + str(buff) + == repr(buff) + == ("Buffer") + ) + + # Evict a key that is already in slow + _ = buff["y"] + buff.fast.evict() + assert a == {"y": 2} + assert b == {"x": 1, "z": 11} + assert f2s == [] # fast_to_slow_callback was not called + assert s2f == [] + assert buff.fast.total_weight == 2 + assert len(buff) == 3 + _ = buff["x"] + s2f.clear() + + # Overwrite + buff["x"] = 3 + buff["y"] = 4 + buff["z"] = 12 + assert a == {"x": 3, "y": 4} + assert b == {"z": 12} + assert f2s == ["z"] # One more spill for z + assert s2f == [] + assert buff.fast.total_weight == 7 + assert len(buff) == 3 + f2s.clear() + + # Delete + del buff["x"] + del buff["y"] + del buff["z"] + assert a == b == {} + assert f2s == s2f == [] + assert buff.fast.total_weight == 0 + assert len(buff) == 0 + + +@pytest.mark.parametrize("keep_slow", [False, True]) +def test_init_nonempty(keep_slow): + a = {1: 10, 2: 20} + b = {2: 20, 3: 30} + buff = Buffer(a, b, n=10) + assert len(buff) == 3 + assert list(buff) == [1, 2, 3] + assert buff == {1: 10, 2: 20, 3: 30} + + def test_setitem_avoid_fast_slow_duplicate(): a = {} b = {} @@ -89,13 +181,14 @@ def test_setitem_avoid_fast_slow_duplicate(): assert "a" not in b -def test_mapping(): +@pytest.mark.parametrize("keep_slow", [False, True]) +def test_mapping(keep_slow): """ Test mapping interface for Buffer(). """ a = {} b = {} - buff = Buffer(a, b, n=2) + buff = Buffer(a, b, n=2, keep_slow=keep_slow) utils_test.check_mapping(buff) utils_test.check_closing(buff) @@ -107,6 +200,7 @@ def test_mapping(): assert not buff.fast.weights assert not buff.fast.total_weight assert not buff.fast._cancel_evict + assert len(buff) == 0 def test_callbacks(): @@ -222,7 +316,8 @@ def test_n_offset(): assert buff.fast.offset == 2 -def test_set_noevict(): +@pytest.mark.parametrize("keep_slow", [False, True]) +def test_set_noevict(keep_slow): a = {} b = {} f2s = [] @@ -234,6 +329,7 @@ def test_set_noevict(): weight=lambda k, v: v, fast_to_slow_callbacks=lambda k, v: f2s.append(k), slow_to_fast_callbacks=lambda k, v: s2f.append(k), + keep_slow=keep_slow, ) buff.set_noevict("x", 3) buff.set_noevict("y", 3) # Would cause x to move to slow @@ -264,9 +360,10 @@ def test_set_noevict(): assert s2f == [] -def test_evict_restore_during_iter(): +@pytest.mark.parametrize("keep_slow", [False, True]) +def test_evict_restore_during_iter(keep_slow): """Test that __iter__ won't be disrupted if another thread evicts or restores a key""" - buff = Buffer({"x": 1, "y": 2}, {"z": 3}, n=5) + buff = Buffer({"x": 1, "y": 2}, {"z": 3}, n=5, keep_slow=keep_slow) assert list(buff) == ["x", "y", "z"] it = iter(buff) assert next(it) == "x" @@ -393,7 +490,8 @@ def __getitem__(self, k): @pytest.mark.stress @pytest.mark.repeat(utils_test.REPEAT_STRESS_TESTS) -def test_stress_different_keys_threadsafe(): +@pytest.mark.parametrize("keep_slow", [False, True]) +def test_stress_different_keys_threadsafe(keep_slow): # Sometimes x and y can cohexist without triggering eviction # Sometimes x and y are individually