diff --git a/.gitignore b/.gitignore index 5f68a7b..260ca61 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .venv/ .ddb_storage_testing/ .ddb_pytest_storage +.ddb/ ddb_storage test_db/ *.prof diff --git a/dictdatabase/io_unsafe.py b/dictdatabase/io_unsafe.py index 0edcf6d..22c3333 100644 --- a/dictdatabase/io_unsafe.py +++ b/dictdatabase/io_unsafe.py @@ -4,6 +4,8 @@ import json import zlib import os +import hashlib +from pathlib import Path from . import config, utils @@ -16,6 +18,7 @@ class PartialFileHandle: value_end_index: int original_data_str: str indent_level: int + indent_with: str ################################################################################ @@ -59,6 +62,26 @@ def read(db_name: str) -> dict: return orjson.loads(data) if config.use_orjson else json.loads(data) + +def read_index_file(db_name: str): + path = f"{config.storage_directory}/.ddb/{db_name.replace('/', '___')}.index" + Path(path).parent.mkdir(parents=True, exist_ok=True) + if not os.path.exists(path): + return {} + with open(path, "r") as f: + return json.load(f) + + +def write_index_file(db_name: str, key, start_index, end_index, indent_level, indent_with, value_hash): + path = f"{config.storage_directory}/.ddb/{db_name}.index" + Path(path).parent.mkdir(parents=True, exist_ok=True) + indices = read_index_file(db_name) + indices[key] = [start_index, end_index, indent_level, indent_with, value_hash] + with open(path, "w") as f: + json.dump(indices, f) + + + def partial_read(db_name: str, key: str) -> PartialFileHandle: """ Partially read a key from a db. @@ -70,26 +93,55 @@ def partial_read(db_name: str, key: str) -> PartialFileHandle: """ data = read_string(db_name) + index = read_index_file(db_name).get(key, None) + if index is not None: + partial_str = data[index[0]:index[1]] + if index[4] == hashlib.sha256(partial_str.encode()).hexdigest(): + return PartialFileHandle( + db_name=db_name, + key=key, + key_value=json.loads(partial_str), + value_start_index=index[0], + value_end_index=index[1], + indent_level=index[2], + indent_with=index[3], + original_data_str=data, + ) + key_str = f"\"{key}\":" key_str_index = utils.find_outermost_key_str_index(data, key_str) if key_str_index == -1: raise KeyError(f"Key \"{key}\" not found in db \"{db_name}\"") + space_after_semicolon = 1 if data[key_str_index + len(key_str)] == " " else 0 + # Count the amount of whitespace before the key # to determine the indentation level - indentation_level = 0 + indentation_str = "" for i in range(key_str_index-1, -1, -1): if data[i] not in [" ", "\t"]: break - indentation_level += 1 + indentation_str += data[i] + + if "\t" in indentation_str: + indent_with = "\t" + indent_level = len(indentation_str) + elif isinstance(config.indent, int) and config.indent > 0: + indent_with = " " * (len(indentation_str) // config.indent) + indent_level = len(indentation_str) // config.indent + elif isinstance(config.indent, str): + indent_with = " " + indent_level = len(indentation_str) // 2 + else: + indent_with, indent_level = "", 0 - if isinstance(config.indent, int) and config.indent > 0: - indentation_level //= config.indent - value_start_index = key_str_index + len(key_str) + value_start_index = key_str_index + len(key_str) + space_after_semicolon value_end_index = utils.seek_index_through_value(data, value_start_index) + write_index_file(db_name, key, value_start_index, value_end_index, indent_level, indent_with, hashlib.sha256(data[value_start_index:value_end_index].encode()).hexdigest()) + return PartialFileHandle( db_name=db_name, key=key, @@ -97,7 +149,8 @@ def partial_read(db_name: str, key: str) -> PartialFileHandle: value_start_index=value_start_index, value_end_index=value_end_index, original_data_str=data, - indent_level=indentation_level, + indent_level=indent_level, + indent_with=indent_with, ) @@ -159,10 +212,10 @@ def partial_write(pf: PartialFileHandle): else: partial_dump = json.dumps(pf.key_value, indent=config.indent, sort_keys=config.sort_keys) - if config.indent is not None: - indent_with = " " * config.indent if isinstance(config.indent, int) else config.indent - partial_dump = partial_dump.replace("\n", "\n" + (pf.indent_level * indent_with)) + if pf.indent_level > 0 and pf.indent_with: + partial_dump = partial_dump.replace("\n", "\n" + (pf.indent_level * pf.indent_with)) dump_start = pf.original_data_str[:pf.value_start_index] dump_end = pf.original_data_str[pf.value_end_index:] - write_dump(pf.db_name, f"{dump_start} {partial_dump}{dump_end}") + write_index_file(pf.db_name, pf.key, len(dump_start), len(dump_start) + len(partial_dump), pf.indent_level, pf.indent_with, hashlib.sha256(partial_dump.encode()).hexdigest()) + write_dump(pf.db_name, f"{dump_start}{partial_dump}{dump_end}") diff --git a/dictdatabase/locking.py b/dictdatabase/locking.py index 3d17204..e6a7470 100644 --- a/dictdatabase/locking.py +++ b/dictdatabase/locking.py @@ -24,14 +24,10 @@ def remove_dead_locks(db_name, ignore=None): def path_str(db_name, lock_id, time_ns, lock_type): - path = f"{config.storage_directory}/" - if "/" in db_name: - db_name = db_name.split("/") - db_name[-1] = f".{db_name[-1]}" - db_name = "/".join(db_name) - else: - path += "." - return f"{path}{db_name}.{lock_id}.{time_ns}.{lock_type}.lock" + path = f"{config.storage_directory}/.ddb/" + path = f"{path}{db_name}.{lock_id}.{time_ns}.{lock_type}.lock" + Path(path).parent.mkdir(parents=True, exist_ok=True) + return path def check_if_lock_exists(db_name: str, thread_id: str, lock_type: str): diff --git a/profiler.py b/profiler.py index cb87deb..6649927 100644 --- a/profiler.py +++ b/profiler.py @@ -1,3 +1,4 @@ +from distutils.command.config import config import dictdatabase as DDB from path_dict import PathDict from pyinstrument import profiler @@ -6,8 +7,10 @@ DDB.config.storage_directory = "./test_db/production_database" DDB.config.use_orjson = True +DDB.config.indent = 2 -p = profiler.Profiler(interval=0.00001) + +p = profiler.Profiler(interval=0.0001) with p: # fM44 is small # a2lU has many annotations diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..d893b26 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ +TEST_DIR = ".ddb_pytest_storage" diff --git a/tests/benchmark/run_parallel.py b/tests/benchmark/run_parallel.py index f643065..5406f03 100644 --- a/tests/benchmark/run_parallel.py +++ b/tests/benchmark/run_parallel.py @@ -1,37 +1,43 @@ +from calendar import c +import json import dictdatabase as DDB from multiprocessing import Pool import shutil import time import os -from utils import incrementor, print_and_assert_results +from utils import print_and_assert_results, db_job, make_table -def proc_job(i, n, tables, sd, uc, uo, id, sk): +def proc_job(mode, n, tables, sd, uc, uo, id, sk): DDB.config.storage_directory = sd DDB.config.use_compression = uc DDB.config.use_orjson = uo DDB.config.indent = id DDB.config.sort_keys = sk - DDB.locking.SLEEP_TIMEOUT = 0.0 if i % 4 < 2 else 0.001 - incrementor(i, n, tables) + DDB.locking.SLEEP_TIMEOUT = 0.001 + db_job(mode, tables, n) -def parallel_stress(tables=2, proc_count=8, per_process=512): +def parallel_stressor(file_count, readers, writers, operations_per_process, big_file, compression): # Create Tables - for t in range(tables): - DDB.at(f"incr{t}").create({"counter": 0}, force_overwrite=True) + for t in range(file_count): + if big_file: + with open(os.path.join(os.getcwd(), "test_db/production_database/tasks.json"), "r") as f: + db = json.loads(f.read()) + db["counter"] = {"counter": 0} + else: + db = {"counter": {"counter": 0}} + + DDB.at(f"incr{t}").create(db, force_overwrite=True) # Execute process pool running incrementor as the target task t1 = time.monotonic() - pool = Pool(processes=proc_count) - for i in range(proc_count): - # Each process will enter this file again, but not as __main__ - # So only the outside context is executed, and then the incrementor function - # This means we need to pass the config since the process is "fresh" - pool.apply_async(proc_job, args=(i, per_process, tables, + pool = Pool(processes=readers + writers) + for mode in "w" * writers + "r" * readers: + pool.apply_async(proc_job, args=(mode, operations_per_process, file_count, DDB.config.storage_directory, - DDB.config.use_compression, + compression, DDB.config.use_orjson, DDB.config.indent, DDB.config.sort_keys, @@ -39,14 +45,35 @@ def parallel_stress(tables=2, proc_count=8, per_process=512): pool.close() pool.join() t2 = time.monotonic() - print_and_assert_results(proc_count, per_process, tables, t1, t2) + print_and_assert_results(readers, writers, operations_per_process, file_count, big_file, compression, t1, t2) + + if __name__ == "__main__": DDB.config.storage_directory = ".ddb_bench_parallel" - try: - shutil.rmtree(".ddb_bench_parallel", ignore_errors=True) - os.mkdir(".ddb_bench_parallel") - parallel_stress() - finally: - shutil.rmtree(".ddb_bench_parallel", ignore_errors=True) + operations_per_process = 4 + for file_count, readers, writers in [(1, 4, 4), (1, 8, 1), (1, 1, 8), (4, 8, 8)]: + print("") + print(f"✨ Scenario: {file_count} files, {readers} readers, {writers} writers, {operations_per_process} operations per process") + for big_file, compression in [(False, False), (False, True), (True, False), (True, True)]: + try: + shutil.rmtree(".ddb_bench_parallel", ignore_errors=True) + os.mkdir(".ddb_bench_parallel") + parallel_stressor(file_count, readers, writers, operations_per_process, big_file, compression) + finally: + shutil.rmtree(".ddb_bench_parallel", ignore_errors=True) + + + +# Test Matrix Rows (Scenarios) +# 1: 1 file, 4 reading 4 writing 200 times each +# 2: 1 file, 8 reading 200 times each +# 3: 1 file, 8 writing 200 times each +# 4: 4 files, 8 reading 8 writing 200 times each + +# Test Matrix Columns (Configurations) +# 1: Big File (50mb), compression +# 2: Small File, compression +# 3: Big File (50mb), no compression +# 4: Small File, no compression diff --git a/tests/benchmark/utils.py b/tests/benchmark/utils.py index fc8b5fe..2cac037 100644 --- a/tests/benchmark/utils.py +++ b/tests/benchmark/utils.py @@ -2,39 +2,49 @@ from path_dict import pd import random import time +import json +def make_table(recursion_depth=3, keys_per_level=50): + d = {"key1": "val1", "key2": 2, "key3": [1, "2", [3, 3]]} + for i in range(recursion_depth): + d = {f"key{i}{j}": d for j in range(keys_per_level)} + #print(f"Made table of size {len(json.dumps(d)) // 1e6}mb") + return {"counter": {"counter": 0}, "big": d} + def print_stats(i, durations): avg = f"{sum(durations) / len(durations):.0f}" median = f"{sorted(durations)[len(durations) // 2]:.0f}" min_t = f"{min(durations):.0f}" max_t = f"{max(durations):.0f}" - type = "read" if i % 2 == 0 else "write" - print(f"{type}: total: {len(durations)}, avg: {avg}ms (med: {median}), {min_t}-{max_t}ms") + + # print(f"{i}: total: {len(durations)}, avg: {avg}ms (med: {median}), {min_t}-{max_t}ms") -def print_and_assert_results(proc_count, per_proc, tables, t1, t2): - ops = proc_count * per_proc * tables +def print_and_assert_results(readers, writers, per_proc, tables, big_file, compression, t1, t2): + ops = (writers + readers) * per_proc * tables ops_sec = f"{(ops / (t2 - t1)):.0f}" - print(f"⏱️ {ops_sec}op/s ({ops} in {t2 - t1:.2f}s), {tables = }, {proc_count = }") + print(f"⏱️ {ops_sec}op/s ({ops} in {t2 - t1:.2f}s), {tables = }, {writers = }, {readers = }, {per_proc = }, {big_file = }, {compression = }") for t in range(tables): db = DDB.at(f"incr{t}").read() - assert db["counter"] == per_proc * (proc_count // 2) - print(f"✅ counter={db['counter']}") + assert db["counter"]["counter"] == per_proc * writers + # print(f"✅ counter={db['counter']}") def random_reads(file_count): """ Read the n tables in random order """ for t in sorted(range(file_count), key=lambda _: random.random()): - DDB.at(f"incr{t}").read() + DDB.at(f"incr{t}").read(key="counter") def random_writes(file_count): """ Iterated the n tables in random order and increment the counter """ for t in sorted(range(file_count), key=lambda _: random.random()): - with DDB.at(f"incr{t}").session(as_type=pd) as (session, d): - d["counter"] = lambda x: (x or 0) + 1 + with DDB.at(f"incr{t}").session(key="counter", as_type=pd) as (session, d): + + d.at("counter").set(d.at("counter").get() + 1) + session.write() @@ -46,3 +56,13 @@ def incrementor(i, iterations, file_count): t_end = time.monotonic_ns() durations.append((t_end - t_start) / 1e6) print_stats(i, durations) + + +def db_job(mode="r", file_count=1, per_proc=1): + durations = [] + for _ in range(per_proc): + t_start = time.monotonic_ns() + random_writes(file_count) if mode == "w" else random_reads(file_count) + t_end = time.monotonic_ns() + durations.append((t_end - t_start) / 1e6) + print_stats(mode, durations) diff --git a/tests/conftest.py b/tests/conftest.py index 7dffa4c..af2f422 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,13 @@ import dictdatabase as DDB +from tests import TEST_DIR import pytest import shutil + @pytest.fixture(scope="session") def env(request): - dir = "./.ddb_pytest_storage" - DDB.config.storage_directory = dir - request.addfinalizer(lambda: shutil.rmtree(dir)) + DDB.config.storage_directory = TEST_DIR + request.addfinalizer(lambda: shutil.rmtree(TEST_DIR)) diff --git a/tests/test_locking.py b/tests/test_locking.py new file mode 100644 index 0000000..4dc94b0 --- /dev/null +++ b/tests/test_locking.py @@ -0,0 +1,8 @@ +from dictdatabase.locking import path_str +from tests import TEST_DIR + + +def test_path_str(): + # Testing the function path_str. + assert path_str("db", "1", 1, "1") == f"{TEST_DIR}/.ddb/db.1.1.1.lock" + assert path_str("db/nest", "1", 1, "1") == f"{TEST_DIR}/.ddb/db/nest.1.1.1.lock"