Skip to content

Commit

Permalink
Buffer(keep_slow=True)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 20, 2023
1 parent fed29a4 commit 22a3405
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 89 deletions.
3 changes: 3 additions & 0 deletions doc/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Changelog
- New method ``File.link()``, which acquires a file-based key from another source
(e.g. a different memory-mapped File object)
(:pr:`80`) `Guido Imperiale`_
- ``Buffer`` has gained the option to preserve keys in ``slow`` when they are
moved back to ``fast``
(:pr:`80`) `Guido Imperiale`_


3.0.0 - 2023-04-17
Expand Down
105 changes: 37 additions & 68 deletions zict/buffer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from __future__ import annotations

from collections.abc import Callable, Iterator, MutableMapping
from itertools import chain
from typing import ( # TODO import from collections.abc (needs Python >=3.9)
ItemsView,
ValuesView,
)

from zict.common import KT, VT, ZictBase, close, discard, flush, locked
from zict.lru import LRU
from zict.utils import InsertionSortedSet


class Buffer(ZictBase[KT, VT]):
Expand All @@ -34,6 +30,11 @@ class Buffer(ZictBase[KT, VT]):
storing to disk and raised a disk full error) the key will remain in the LRU.
slow_to_fast_callbacks: list of callables
These functions run every time data moves form the slow to the fast mapping.
keep_slow: bool, optional
If False (default), delete key/value pairs in slow when they are moved back to
fast.
If True, keep them in slow until deleted; this will avoid repeating the fast to
slow transition when they are evicted again, but at the cost of duplication.
Notes
-----
Expand All @@ -60,7 +61,9 @@ class Buffer(ZictBase[KT, VT]):
weight: Callable[[KT, VT], float]
fast_to_slow_callbacks: list[Callable[[KT, VT], None]]
slow_to_fast_callbacks: list[Callable[[KT, VT], None]]
keep_slow: bool
_cancel_restore: dict[KT, bool]
_keys: InsertionSortedSet[KT]

def __init__(
self,
Expand All @@ -74,6 +77,7 @@ def __init__(
slow_to_fast_callbacks: Callable[[KT, VT], None]
| list[Callable[[KT, VT], None]]
| None = None,
keep_slow: bool = False,
):
super().__init__()
self.fast = LRU(
Expand All @@ -91,7 +95,9 @@ def __init__(
slow_to_fast_callbacks = [slow_to_fast_callbacks]
self.fast_to_slow_callbacks = fast_to_slow_callbacks or []
self.slow_to_fast_callbacks = slow_to_fast_callbacks or []
self.keep_slow = keep_slow
self._cancel_restore = {}
self._keys = InsertionSortedSet((*self.fast, *self.slow))

@property
def n(self) -> float:
Expand Down Expand Up @@ -136,6 +142,9 @@ def offset(self, value: float) -> None:
self.fast.offset = value

def fast_to_slow(self, key: KT, value: VT) -> None:
if self.keep_slow and key in self.slow:
return

self.slow[key] = value
try:
for cb in self.fast_to_slow_callbacks:
Expand Down Expand Up @@ -169,7 +178,8 @@ def slow_to_fast(self, key: KT) -> VT:
# - If the below code was just `self.fast[key] = value; del
# self.slow[key]` now the key would be in neither slow nor fast!
self.fast.set_noevict(key, value)
del self.slow[key]
if not self.keep_slow:
del self.slow[key]

with self.unlock():
self.fast.evict_until_below_target()
Expand All @@ -180,17 +190,20 @@ def slow_to_fast(self, key: KT) -> VT:

@locked
def __getitem__(self, key: KT) -> VT:
if key not in self._keys:
raise KeyError(key)
try:
return self.fast[key]
except KeyError:
return self.slow_to_fast(key)

def __setitem__(self, key: KT, value: VT) -> None:
with self.lock:
discard(self.slow, key)
if key in self._cancel_restore:
self._cancel_restore[key] = True
self.fast[key] = value
self.set_noevict(key, value)
try:
self.fast.evict_until_below_target()
except Exception:
self.fast._setitem_exception(key)
raise

@locked
def set_noevict(self, key: KT, value: VT) -> None:
Expand All @@ -201,6 +214,7 @@ def set_noevict(self, key: KT, value: VT) -> None:
if key in self._cancel_restore:
self._cancel_restore[key] = True
self.fast.set_noevict(key, value)
self._keys.add(key)

def evict_until_below_target(self, n: float | None = None) -> None:
"""Wrapper around :meth:`zict.LRU.evict_until_below_target`.
Expand All @@ -210,55 +224,32 @@ def evict_until_below_target(self, n: float | None = None) -> None:

@locked
def __delitem__(self, key: KT) -> None:
self._keys.remove(key)
if key in self._cancel_restore:
self._cancel_restore[key] = True
try:
del self.fast[key]
except KeyError:
del self.slow[key]
discard(self.fast, key)
discard(self.slow, key)

@locked
def _cancel_evict(self, key: KT, value: VT) -> None:
discard(self.slow, key)

def values(self) -> ValuesView[VT]:
return BufferValuesView(self)

def items(self) -> ItemsView[KT, VT]:
return BufferItemsView(self)

def __len__(self) -> int:
with self.lock, self.fast.lock:
return (
len(self.fast)
+ len(self.slow)
- sum(
k in self.fast and k in self.slow
for k in chain(self._cancel_restore, self.fast._cancel_evict)
)
)
return len(self._keys)

def __iter__(self) -> Iterator[KT]:
"""Make sure that the iteration is not disrupted if you evict/restore a key in
the middle of it
"""
seen = set()
while True:
try:
for d in (self.fast, self.slow):
for key in d:
if key not in seen:
seen.add(key)
yield key
return
except RuntimeError:
pass
return iter(self._keys)

def __contains__(self, key: object) -> bool:
return key in self.fast or key in self.slow
return key in self._keys

@locked
def __str__(self) -> str:
return f"Buffer<{self.fast}, {self.slow}>"
s = f"Buffer<fast: {len(self.fast)}, slow: {len(self.slow)}"
if self.keep_slow:
ndup = len(self.fast) + len(self.slow) - len(self._keys)
s += f", unique: {len(self._keys)}, duplicates: {ndup}"
return s + ">"

__repr__ = __str__

Expand All @@ -267,25 +258,3 @@ def flush(self) -> None:

def close(self) -> None:
close(self.fast, self.slow)


class BufferItemsView(ItemsView[KT, VT]):
_mapping: Buffer # FIXME CPython implementation detail
__slots__ = ()

def __iter__(self) -> Iterator[tuple[KT, VT]]:
# Avoid changing the LRU
return chain(self._mapping.fast.items(), self._mapping.slow.items())


class BufferValuesView(ValuesView[VT]):
_mapping: Buffer # FIXME CPython implementation detail
__slots__ = ()

def __contains__(self, value: object) -> bool:
# Avoid changing the LRU
return any(value == v for v in self)

def __iter__(self) -> Iterator[VT]:
# Avoid changing the LRU
return chain(self._mapping.fast.values(), self._mapping.slow.values())
20 changes: 12 additions & 8 deletions zict/lru.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,20 @@ def __setitem__(self, key: KT, value: VT) -> None:
try:
self.evict_until_below_target()
except Exception:
if self.weights.get(key, 0) > self.n and key not in self.heavy:
# weight(value) > n and evicting the key we just inserted failed.
# Evict the rest of the LRU instead.
try:
while len(self.d) > 1:
self.evict()
except Exception:
pass
self._setitem_exception(key)
raise

@locked
def _setitem_exception(self, key: KT) -> None:
if self.weights.get(key, 0) > self.n and key not in self.heavy:
# weight(value) > n and evicting the key we just inserted failed.
# Evict the rest of the LRU instead.
try:
while len(self.d) > 1:
self.evict()
except Exception:
pass

@locked
def set_noevict(self, key: KT, value: VT) -> None:
"""Variant of ``__setitem__`` that does not evict if the total weight exceeds n.
Expand Down
6 changes: 2 additions & 4 deletions zict/tests/test_async_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ async def test_close_during_evict(check_thread_leaks):

@pytest.mark.asyncio
async def test_close_during_get(check_thread_leaks):
buff = AsyncBuffer({}, utils_test.SlowDict(0.01), n=100)
buff.slow.data.update({i: i for i in range(100)})
buff = AsyncBuffer({}, utils_test.SlowDict(0.01, {i: i for i in range(100)}), n=100)
assert len(buff) == 100
assert not buff.fast

Expand Down Expand Up @@ -199,8 +198,7 @@ def __getitem__(self, key):
time.sleep(0.01)
return super().__getitem__(key)

with AsyncBuffer({}, Slow(), n=100) as buff:
buff.slow.update({i: i for i in range(100)})
with AsyncBuffer({}, Slow({i: i for i in range(100)}), n=100) as buff:
assert len(buff) == 100

future = buff.async_get(list(range(100)), missing=missing)
Expand Down
Loading

0 comments on commit 22a3405

Please sign in to comment.