From 6a6fb334ed3aa55443a3687ce53f70dfec178481 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 28 Sep 2024 11:25:29 +0200 Subject: [PATCH] remove MStore from master branch for now We first have to develop the Store better before building stuff on top of it. MStore was rather in the way of doing that. Before ripping MStore out, I made a branch "mstore" with the current state (as in master branch before this commit), so it can be developed there. --- README.rst | 12 +- src/borgstore/mstore.py | 204 ---------------------------------- tests/test_mstore.py | 236 ---------------------------------------- 3 files changed, 1 insertion(+), 451 deletions(-) delete mode 100644 src/borgstore/mstore.py delete mode 100644 tests/test_mstore.py diff --git a/README.rst b/README.rst index 6c0d2a3..4f652b6 100644 --- a/README.rst +++ b/README.rst @@ -1,8 +1,7 @@ BorgStore ========= -A key/value store implementation in Python, supporting multiple backends, -data redundancy and distribution. +A key/value store implementation in Python, supporting multiple backends. Keys ---- @@ -103,15 +102,6 @@ Currently, these storage backends are implemented: - Rclone - access any of the 100s of cloud providers [rclone](https://rclone.org/) supports - (more might come in future) -MStore ------- - -API of MStore is very similar to Store, but instead of directly using one backend -only (like Store does), it uses multiple Stores internally to implement: - -- redundancy (keep same data at multiple places) -- distribution (keep different data at multiple places) - Scalability ----------- diff --git a/src/borgstore/mstore.py b/src/borgstore/mstore.py deleted file mode 100644 index ebded16..0000000 --- a/src/borgstore/mstore.py +++ /dev/null @@ -1,204 +0,0 @@ -""" -Multi-Store Key/Value Implementation. - -Distributed: MStore can store into multiple stores (e.g. different directories on different disks, on diff. servers) - with different sizes. -Redundant: The same mechanism also implements simple redundancy (like storing same item N times). - -Similar to a hashtable, we use 256 buckets within the MStore and create a map mapping the bucket number to the Store(s) -it resides on. When storing an item, the key part of the name (namespace/key) is assumed to be a hex hash value and -the first 2 hex digits determine which bucket the data goes into (and thus: which Store(s) it is stored into). - -Examples: -MStore gets a list of stores and a list of related bucket counts. Bucket numbers are calculated modulo 256, so if -the total bucket count is more than 256 (like 512, 768, ...), stuff will get stored multiple times (usually into -different stores). -MStore([store0], [256]) - simplest configuration: store everything into store0 -MStore([st0, st1], [192, 64]) - JBOD-like: store 3/4 into st0 (bucket 0..191), 1/4 into st1 (bucket 192..255) -MStore([st0, st1], [256, 256]) - Mirror: store each item into st0 **and** into st1 (both have buckets 0..255) -MStore([st0, st1, st2], [256, 256, 256]) - store each item into st0, st1 **and** st2 -""" - -from collections import defaultdict -from typing import Iterator, Optional - -from .utils.nesting import split_key -from .store import Store, ItemInfo, ObjectNotFound - - -def create_bucket_map(buckets: list[int]) -> dict[int, list[int]]: - """ - use a list of bucket counts (of the stores) and create a lookup dictionary: - bucket (0..255) -> list of store indexes that store this bucket - """ - total = sum(buckets) - if total < 256: - raise ValueError("each of the 256 possible values must have at least one corresponding bucket") - if total % 256 != 0: - raise ValueError("all 256 values should be covered equally with buckets") - map = defaultdict(list) - base = 0 - for store_index, bucket_count in enumerate(buckets): - for offset in range(bucket_count): - bucket = (base + offset) % 256 - map[bucket].append(store_index) - base += bucket_count - return map - - -def lookup_stores(map: dict, bucket: int) -> list[int]: - """lookup the store index(es) for a specific bucket""" - if not isinstance(bucket, int): - raise TypeError("bucket must be an integer") - if bucket < 0 or bucket > 255: - raise ValueError("bucket must be between 0 and 255") - return map[bucket] - - -class MStore: - def __init__(self, stores: list[Store], buckets: list[int], kinds: Optional[dict] = None): - if not len(stores): - raise ValueError("stores list must not be empty") - if len(stores) != len(buckets): - raise ValueError("stores list and buckets count list must have same length") - self.stores = stores - self.buckets = buckets - self.all_stores = list(range(len(self.stores))) - self.map = create_bucket_map(buckets) - # kinds = prefix -> kind, kind can be "hex-hash", "generic". - kinds = kinds if kinds else {} - # we accept kinds as a dict, but we rather want a list of (prefix, kind) tuples, longest prefix first: - self.kinds = [entry for entry in sorted(kinds.items(), key=lambda item: len(item[0]), reverse=True)] - - def __repr__(self): - return f"" - - def create(self) -> None: - for store in self.stores: - store.create() - - def destroy(self) -> None: - for store in self.stores: - store.destroy() - - def __enter__(self): - self.open() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - return False - - def open(self) -> None: - for store in self.stores: - store.open() - - def close(self) -> None: - for store in self.stores: - store.close() - - def _get_kind(self, name): - """get kind of store from configuration depending on namespace""" - for prefix, kind in self.kinds: - if name.startswith(prefix): - return kind - return "generic" # "generic" is the default, if no prefix matched - - def _find_stores(self, name: str, mode: str = "r") -> list[int]: - kind = self._get_kind(name) - if kind == "hex-hash": - key = split_key(name)[1] # we do not care for the namespace part here - key_binary = bytes.fromhex(key) # and assume key is a good hash, represented as a hex str - bucket = key_binary[0] # use first 8bits of key to determine bucket (int) - w_stores = self.map[bucket] # list of store indexes (for writing) - if mode not in ["r", "w", "d", "m"]: - raise ValueError("mode must be either 'r', 'w', 'd' or 'm'.") - if mode == "w": - # for writing just return the stores currently configured - return w_stores - else: # mode == "r" or "d" or "m" - # for reading, return the stores currently configured *first*, - # but also add all other stores after these, so items can be found - # there while we redistribute them. - # for deleting, guess we also want to try deleting an item from all stores. - # for moving, guess we want to try to move an item in all stores. - fallback_r_stores = [idx for idx in self.all_stores if idx not in w_stores] - return w_stores + fallback_r_stores - elif kind == "generic": - # for generic storage, we store to ALL stores. - # usually this is important and small stuff, like configs, keys, ... - return self.all_stores - else: - raise NotImplementedError(f"kind '{kind}' is not implemented.") - - def info(self, name: str, *, deleted=False) -> ItemInfo: - for store_idx in self._find_stores(name, mode="r"): - store = self.stores[store_idx] - try: - return store.info(name, deleted=deleted) - except ObjectNotFound: - pass # TODO: we expected the key to be there, but it was not. fix that by storing it there. - else: - raise ObjectNotFound(name) # didn't find it in any store - - def load(self, name: str, *, size=None, offset=0, deleted=False) -> bytes: - for store_idx in self._find_stores(name, mode="r"): - store = self.stores[store_idx] - try: - return store.load(name, size=size, offset=offset, deleted=deleted) - except ObjectNotFound: - pass # TODO: we expected the key to be there, but it was not. fix that by storing it there. - else: - raise ObjectNotFound(name) # didn't find it in any store - - def store(self, name: str, value: bytes) -> None: - for store_idx in self._find_stores(name, mode="w"): - store = self.stores[store_idx] - store.store(name, value) - - def delete(self, name: str, *, deleted=False) -> None: - for store_idx in self._find_stores(name, mode="d"): - store = self.stores[store_idx] - try: - store.delete(name, deleted=deleted) - except ObjectNotFound: - pass # ignore it if it is already gone - - def move( - self, - name: str, - new_name: Optional[str] = None, - *, - delete: bool = False, - undelete: bool = False, - change_level: bool = False, - deleted: bool = False, - ) -> None: - for store_idx in self._find_stores(name, mode="m"): - store = self.stores[store_idx] - try: - if delete: - # use case: keep name, but soft "delete" the item - store.move(name, delete=True) - elif undelete: - # use case: keep name, undelete a previously soft "deleted" item - store.move(name, undelete=True) - elif change_level: - # use case: keep name, changing to another nesting level - store.move(name, change_level=True, deleted=deleted) - else: - # generic use (be careful!) - if not new_name: - raise ValueError("generic move needs new_name to be given.") - store.move(name, new_name, deleted=deleted) - except ObjectNotFound: - pass # ignore it, if it is not present in this store - - def list(self, name: str, deleted: bool = False) -> Iterator[ItemInfo]: - # when using multiple stores, the results yielded might be only partially sorted. - seen = set() - for store in self.stores: - for item_info in store.list(name, deleted=deleted): - if item_info.name not in seen: - yield item_info - seen.add(item_info.name) diff --git a/tests/test_mstore.py b/tests/test_mstore.py deleted file mode 100644 index 20d3257..0000000 --- a/tests/test_mstore.py +++ /dev/null @@ -1,236 +0,0 @@ -""" -Testing for high-level MStore API. -""" - -import pytest - -from . import key, lkey, list_store_names, list_store_names_sorted - -from borgstore.backends.errors import ObjectNotFound -from borgstore.store import Store -from borgstore.mstore import create_bucket_map, lookup_stores, MStore - - -@pytest.mark.parametrize("buckets", [[], [0], [42], [300], [256, 23], [23, 256]]) -def test_bucket_map_invalid(buckets): - with pytest.raises(ValueError): - create_bucket_map(buckets) # does not cover 256 buckets exactly N times - - -@pytest.mark.parametrize( - "buckets, n_stores", - [ - ([256], 1), # single store having all buckets ("single disk") - ([128, 128], 1), # 2 stores each having half of the buckets ("raid0") - ([256, 256], 2), # 2 stores each having all the buckets ("raid1") - ([128, 128, 128, 128], 2), # 4 stores each having half of the buckets ("raid10") - ([256, 128, 128], 2), # one big store mirroring 2 smaller ones - ([200, 56], 1), # store 0 is bigger than store 1 ("jbod") - ([256, 256, 256], 3), # 3 stores each having all buckets ("3-disk mirror") - ], -) -def test_bucket_map_valid(buckets, n_stores): - # n_stores means an item is stored in n stores (1 = standard, 2+ = with redundancy) - map = create_bucket_map(buckets) - for bucket in range(256): - assert bucket in map # we want to map ALL the 256 buckets - stores = map[bucket] - assert len(stores) == n_stores # each bucket shall exist in N stores - assert len(set(stores)) == n_stores # each bucket shall exist in N *different* stores - - -@pytest.mark.parametrize( - "buckets,key,store", - [ - ([256], 0, [0]), - ([256], 255, [0]), - ([128, 128], 0, [0]), - ([128, 128], 127, [0]), - ([128, 128], 128, [1]), - ([128, 128], 255, [1]), - ([256, 256], 0, [0, 1]), - ([256, 256], 127, [0, 1]), - ([256, 256], 128, [0, 1]), - ([256, 256], 255, [0, 1]), - ], -) -def test_lookup_bucket(buckets, key, store): - map = create_bucket_map(buckets) - assert lookup_stores(map, key) == store - - -@pytest.fixture() -def mstore_jbod_created(tmp_path): - stores = [Store(url=f"file://{tmp_path}0"), Store(url=f"file://{tmp_path}1")] - mstore = MStore(stores=stores, buckets=[192, 64], kinds={"": "hex-hash"}) - mstore.create() - try: - yield mstore - finally: - mstore.destroy() - - -@pytest.fixture() -def mstore_mirror_created(tmp_path): - stores = [Store(url=f"file://{tmp_path}0"), Store(url=f"file://{tmp_path}1")] - mstore = MStore(stores=stores, buckets=[256, 256], kinds={"": "hex-hash"}) - mstore.create() - try: - yield mstore - finally: - mstore.destroy() - - -def fill_storage(store: MStore, count: int, *, start: int = 0) -> None: - for i in range(start, start + count, 1): - k, v = lkey(i), str(i).encode() - store.store(k, v) - - -def read_storage(store: MStore, count: int, *, start: int = 0) -> None: - # can we still read all data? - for i in range(start, start + count, 1): - k, v = lkey(i), str(i).encode() - assert store.load(k) == v - - -def test_list(mstore_mirror_created): - with mstore_mirror_created as mstore: - fill_storage(mstore, 1024) - # there must be no duplication of keys from the mirror mstore - assert list_store_names(mstore, "") == sorted([lkey(i) for i in range(1024)]) - - -def test_list(mstore_jbod_created): - with mstore_jbod_created as mstore: - fill_storage(mstore, 1024) - # check if we get all expected keys from the jbod mstore - assert list_store_names(mstore, "") == sorted([lkey(i) for i in range(1024)]) - - -def test_load_store_list_distribution(mstore_jbod_created): - with mstore_jbod_created as mstore: - fill_storage(mstore, 1024) - # check if all data is readable and as expected: - for i in range(1024): - k, v = lkey(i), str(i).encode() - assert mstore.load(k) == v - # check if data ended up in the stores according to the ratio configured in mstore_jbod (192 : 64) - keys_mstore = list_store_names(mstore, "") - keys_store0 = list_store_names(mstore.stores[0], "") - keys_store1 = list_store_names(mstore.stores[1], "") - assert len(keys_mstore) == len(set(keys_mstore)) == 1024 - assert len(keys_store0) == len(set(keys_store0)) == 768 - assert len(keys_store1) == len(set(keys_store1)) == 256 - - -def test_load_store_list_redundancy(mstore_mirror_created): - with mstore_mirror_created as mstore: - fill_storage(mstore, 1024) - # delete stuff from store 0: - for i in 0, 23, 42, 1001: - mstore.stores[0].delete(lkey(i)) - # check if it is really gone: - for i in 0, 23, 42, 1001: - with pytest.raises(ObjectNotFound): - mstore.stores[0].load(lkey(i)) - # delete other stuff from store 1: - for i in 123, 456, 789: - mstore.stores[1].delete(lkey(i)) - # check if it is really gone: - for i in 123, 456, 789: - with pytest.raises(ObjectNotFound): - mstore.stores[1].load(lkey(i)) - # check if we can still read everything from the mirror: - for i in range(1024): - k, v = lkey(i), str(i).encode() - assert mstore.load(k) == v - # also check if list still works ok: - assert list_store_names_sorted(mstore, "") == sorted([lkey(i) for i in range(1024)]) - # now delete some values also from the other side of the mirror: - for i in 0, 23, 42, 1001: - mstore.stores[1].delete(lkey(i)) - for i in 123, 456, 789: - mstore.stores[0].delete(lkey(i)) - # now the mirror is expected to be partially corrupted at these places: - for i in 0, 23, 42, 1001, 123, 456, 789: - with pytest.raises(ObjectNotFound): - mstore.load(lkey(i)) - # list is expected to miss some elements: - assert list_store_names(mstore, "") == sorted( - [lkey(i) for i in range(1024) if i not in [0, 23, 42, 1001, 123, 456, 789]] - ) - - -def test_move_delete_undelete(mstore_mirror_created): - k0, v0 = key(0), b"value0" - k1, v1 = key(1), b"value1" - with mstore_mirror_created as mstore: - mstore.store(k0, v0) - mstore.store(k1, v1) - # delete - mstore.move(k0, delete=True) # soft delete - assert list_store_names(mstore, "", deleted=False) == [k1] - assert list_store_names(mstore, "", deleted=True) == [k0, k1] - # undelete - mstore.move(k0, undelete=True) # undelete previously soft deleted item - assert list_store_names(mstore, "", deleted=False) == [k0, k1] - assert list_store_names(mstore, "", deleted=True) == [k0, k1] - - -def test_namespaces(mstore_jbod_created): - with mstore_jbod_created as mstore: - mstore.kinds = [("config/", "generic"), ("data/", "hex-hash")] - mstore.store("config/main", b"some config") - mstore.store("data/0000", b"value_00") - mstore.store("data/bf00", b"value_bf") - mstore.store("data/c000", b"value_c0") - mstore.store("data/ff00", b"value_ff") - # now let's check where stuff ended up being stored. - st0, st1 = mstore.stores - # hex-hash kind of data should be spread into buckets according to its hash: - assert st0.load("data/0000") == b"value_00" - assert st0.load("data/bf00") == b"value_bf" - with pytest.raises(ObjectNotFound): - st0.load("data/c000") - with pytest.raises(ObjectNotFound): - st0.load("data/ff00") - with pytest.raises(ObjectNotFound): - st1.load("data/0000") - with pytest.raises(ObjectNotFound): - st1.load("data/bf00") - assert st1.load("data/c000") == b"value_c0" - assert st1.load("data/ff00") == b"value_ff" - # generic kind config should be mirrored to all stores: - assert st0.load("config/main") == b"some config" - assert st1.load("config/main") == b"some config" - - -def test_reduce_prepare(tmp_path): - # assume we want to stop using a store, then: - # - we don't want to write new data to it - # - we want to be able to read all data from the mstore at all times - # - # test setup: we have 3 stores with data distributed over them: - entries = 1024 - stores = [Store(url=f"file://{tmp_path}0"), Store(url=f"file://{tmp_path}1"), Store(url=f"file://{tmp_path}2")] - mstore = MStore(stores=stores, buckets=[128, 64, 64], kinds={"": "hex-hash"}) - mstore.create() - with mstore: - fill_storage(mstore, entries) - read_storage(mstore, entries) - assert len(list_store_names(mstore.stores[0], "")) == 512 - assert len(list_store_names(mstore.stores[1], "")) == 256 - assert len(list_store_names(mstore.stores[2], "")) == 256 - # test: still have the 3 stores available, but bucket count 0 in store 2 means no new data will go into it: - stores = [Store(url=f"file://{tmp_path}0"), Store(url=f"file://{tmp_path}1"), Store(url=f"file://{tmp_path}2")] - mstore = MStore(stores=stores, buckets=[128, 128, 0], kinds={"": "hex-hash"}) - with mstore: - read_storage(mstore, entries) - # store new stuff into the mstore: - fill_storage(mstore, entries, start=entries) - read_storage(mstore, entries * 2) - assert len(list_store_names(mstore.stores[0], "")) == 512 + 512 - assert len(list_store_names(mstore.stores[1], "")) == 256 + 512 - assert len(list_store_names(mstore.stores[2], "")) == 256 # no new data was written to store 2 - mstore.destroy()