Skip to content

Commit

Permalink
Merge pull request #14 from mkrd/benchmark-scenarios
Browse files Browse the repository at this point in the history
Add automatic indexing
  • Loading branch information
mkrd authored Oct 29, 2022
2 parents 30f7421 + 1ee9c2d commit 889de6c
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.venv/
.ddb_storage_testing/
.ddb_pytest_storage
.ddb/
ddb_storage
test_db/
*.prof
Expand Down
73 changes: 63 additions & 10 deletions dictdatabase/io_unsafe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
import zlib
import os
import hashlib
from pathlib import Path
from . import config, utils


Expand All @@ -16,6 +18,7 @@ class PartialFileHandle:
value_end_index: int
original_data_str: str
indent_level: int
indent_with: str


################################################################################
Expand Down Expand Up @@ -59,6 +62,26 @@ def read(db_name: str) -> dict:
return orjson.loads(data) if config.use_orjson else json.loads(data)



def read_index_file(db_name: str):
path = f"{config.storage_directory}/.ddb/{db_name.replace('/', '___')}.index"
Path(path).parent.mkdir(parents=True, exist_ok=True)
if not os.path.exists(path):
return {}
with open(path, "r") as f:
return json.load(f)


def write_index_file(db_name: str, key, start_index, end_index, indent_level, indent_with, value_hash):
path = f"{config.storage_directory}/.ddb/{db_name}.index"
Path(path).parent.mkdir(parents=True, exist_ok=True)
indices = read_index_file(db_name)
indices[key] = [start_index, end_index, indent_level, indent_with, value_hash]
with open(path, "w") as f:
json.dump(indices, f)



def partial_read(db_name: str, key: str) -> PartialFileHandle:
"""
Partially read a key from a db.
Expand All @@ -70,34 +93,64 @@ def partial_read(db_name: str, key: str) -> PartialFileHandle:
"""

data = read_string(db_name)
index = read_index_file(db_name).get(key, None)
if index is not None:
partial_str = data[index[0]:index[1]]
if index[4] == hashlib.sha256(partial_str.encode()).hexdigest():
return PartialFileHandle(
db_name=db_name,
key=key,
key_value=json.loads(partial_str),
value_start_index=index[0],
value_end_index=index[1],
indent_level=index[2],
indent_with=index[3],
original_data_str=data,
)

key_str = f"\"{key}\":"
key_str_index = utils.find_outermost_key_str_index(data, key_str)

if key_str_index == -1:
raise KeyError(f"Key \"{key}\" not found in db \"{db_name}\"")

space_after_semicolon = 1 if data[key_str_index + len(key_str)] == " " else 0

# Count the amount of whitespace before the key
# to determine the indentation level
indentation_level = 0
indentation_str = ""
for i in range(key_str_index-1, -1, -1):
if data[i] not in [" ", "\t"]:
break
indentation_level += 1
indentation_str += data[i]

if "\t" in indentation_str:
indent_with = "\t"
indent_level = len(indentation_str)
elif isinstance(config.indent, int) and config.indent > 0:
indent_with = " " * (len(indentation_str) // config.indent)
indent_level = len(indentation_str) // config.indent
elif isinstance(config.indent, str):
indent_with = " "
indent_level = len(indentation_str) // 2
else:
indent_with, indent_level = "", 0

if isinstance(config.indent, int) and config.indent > 0:
indentation_level //= config.indent

value_start_index = key_str_index + len(key_str)
value_start_index = key_str_index + len(key_str) + space_after_semicolon
value_end_index = utils.seek_index_through_value(data, value_start_index)

write_index_file(db_name, key, value_start_index, value_end_index, indent_level, indent_with, hashlib.sha256(data[value_start_index:value_end_index].encode()).hexdigest())

return PartialFileHandle(
db_name=db_name,
key=key,
key_value=json.loads(data[value_start_index:value_end_index]),
value_start_index=value_start_index,
value_end_index=value_end_index,
original_data_str=data,
indent_level=indentation_level,
indent_level=indent_level,
indent_with=indent_with,
)


Expand Down Expand Up @@ -159,10 +212,10 @@ def partial_write(pf: PartialFileHandle):
else:
partial_dump = json.dumps(pf.key_value, indent=config.indent, sort_keys=config.sort_keys)

if config.indent is not None:
indent_with = " " * config.indent if isinstance(config.indent, int) else config.indent
partial_dump = partial_dump.replace("\n", "\n" + (pf.indent_level * indent_with))
if pf.indent_level > 0 and pf.indent_with:
partial_dump = partial_dump.replace("\n", "\n" + (pf.indent_level * pf.indent_with))

dump_start = pf.original_data_str[:pf.value_start_index]
dump_end = pf.original_data_str[pf.value_end_index:]
write_dump(pf.db_name, f"{dump_start} {partial_dump}{dump_end}")
write_index_file(pf.db_name, pf.key, len(dump_start), len(dump_start) + len(partial_dump), pf.indent_level, pf.indent_with, hashlib.sha256(partial_dump.encode()).hexdigest())
write_dump(pf.db_name, f"{dump_start}{partial_dump}{dump_end}")
12 changes: 4 additions & 8 deletions dictdatabase/locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ def remove_dead_locks(db_name, ignore=None):


def path_str(db_name, lock_id, time_ns, lock_type):
path = f"{config.storage_directory}/"
if "/" in db_name:
db_name = db_name.split("/")
db_name[-1] = f".{db_name[-1]}"
db_name = "/".join(db_name)
else:
path += "."
return f"{path}{db_name}.{lock_id}.{time_ns}.{lock_type}.lock"
path = f"{config.storage_directory}/.ddb/"
path = f"{path}{db_name}.{lock_id}.{time_ns}.{lock_type}.lock"
Path(path).parent.mkdir(parents=True, exist_ok=True)
return path


def check_if_lock_exists(db_name: str, thread_id: str, lock_type: str):
Expand Down
5 changes: 4 additions & 1 deletion profiler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from distutils.command.config import config
import dictdatabase as DDB
from path_dict import PathDict
from pyinstrument import profiler
Expand All @@ -6,8 +7,10 @@

DDB.config.storage_directory = "./test_db/production_database"
DDB.config.use_orjson = True
DDB.config.indent = 2

p = profiler.Profiler(interval=0.00001)

p = profiler.Profiler(interval=0.0001)
with p:
# fM44 is small
# a2lU has many annotations
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TEST_DIR = ".ddb_pytest_storage"
69 changes: 48 additions & 21 deletions tests/benchmark/run_parallel.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,79 @@
from calendar import c
import json
import dictdatabase as DDB
from multiprocessing import Pool
import shutil
import time
import os

from utils import incrementor, print_and_assert_results
from utils import print_and_assert_results, db_job, make_table


def proc_job(i, n, tables, sd, uc, uo, id, sk):
def proc_job(mode, n, tables, sd, uc, uo, id, sk):
DDB.config.storage_directory = sd
DDB.config.use_compression = uc
DDB.config.use_orjson = uo
DDB.config.indent = id
DDB.config.sort_keys = sk
DDB.locking.SLEEP_TIMEOUT = 0.0 if i % 4 < 2 else 0.001
incrementor(i, n, tables)
DDB.locking.SLEEP_TIMEOUT = 0.001
db_job(mode, tables, n)


def parallel_stress(tables=2, proc_count=8, per_process=512):
def parallel_stressor(file_count, readers, writers, operations_per_process, big_file, compression):
# Create Tables
for t in range(tables):
DDB.at(f"incr{t}").create({"counter": 0}, force_overwrite=True)
for t in range(file_count):
if big_file:
with open(os.path.join(os.getcwd(), "test_db/production_database/tasks.json"), "r") as f:
db = json.loads(f.read())
db["counter"] = {"counter": 0}
else:
db = {"counter": {"counter": 0}}

DDB.at(f"incr{t}").create(db, force_overwrite=True)

# Execute process pool running incrementor as the target task
t1 = time.monotonic()
pool = Pool(processes=proc_count)
for i in range(proc_count):
# Each process will enter this file again, but not as __main__
# So only the outside context is executed, and then the incrementor function
# This means we need to pass the config since the process is "fresh"
pool.apply_async(proc_job, args=(i, per_process, tables,
pool = Pool(processes=readers + writers)
for mode in "w" * writers + "r" * readers:
pool.apply_async(proc_job, args=(mode, operations_per_process, file_count,
DDB.config.storage_directory,
DDB.config.use_compression,
compression,
DDB.config.use_orjson,
DDB.config.indent,
DDB.config.sort_keys,
))
pool.close()
pool.join()
t2 = time.monotonic()
print_and_assert_results(proc_count, per_process, tables, t1, t2)
print_and_assert_results(readers, writers, operations_per_process, file_count, big_file, compression, t1, t2)




if __name__ == "__main__":
DDB.config.storage_directory = ".ddb_bench_parallel"
try:
shutil.rmtree(".ddb_bench_parallel", ignore_errors=True)
os.mkdir(".ddb_bench_parallel")
parallel_stress()
finally:
shutil.rmtree(".ddb_bench_parallel", ignore_errors=True)
operations_per_process = 4
for file_count, readers, writers in [(1, 4, 4), (1, 8, 1), (1, 1, 8), (4, 8, 8)]:
print("")
print(f"✨ Scenario: {file_count} files, {readers} readers, {writers} writers, {operations_per_process} operations per process")
for big_file, compression in [(False, False), (False, True), (True, False), (True, True)]:
try:
shutil.rmtree(".ddb_bench_parallel", ignore_errors=True)
os.mkdir(".ddb_bench_parallel")
parallel_stressor(file_count, readers, writers, operations_per_process, big_file, compression)
finally:
shutil.rmtree(".ddb_bench_parallel", ignore_errors=True)



# Test Matrix Rows (Scenarios)
# 1: 1 file, 4 reading 4 writing 200 times each
# 2: 1 file, 8 reading 200 times each
# 3: 1 file, 8 writing 200 times each
# 4: 4 files, 8 reading 8 writing 200 times each

# Test Matrix Columns (Configurations)
# 1: Big File (50mb), compression
# 2: Small File, compression
# 3: Big File (50mb), no compression
# 4: Small File, no compression
40 changes: 30 additions & 10 deletions tests/benchmark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,49 @@
from path_dict import pd
import random
import time
import json


def make_table(recursion_depth=3, keys_per_level=50):
d = {"key1": "val1", "key2": 2, "key3": [1, "2", [3, 3]]}
for i in range(recursion_depth):
d = {f"key{i}{j}": d for j in range(keys_per_level)}
#print(f"Made table of size {len(json.dumps(d)) // 1e6}mb")
return {"counter": {"counter": 0}, "big": d}


def print_stats(i, durations):
avg = f"{sum(durations) / len(durations):.0f}"
median = f"{sorted(durations)[len(durations) // 2]:.0f}"
min_t = f"{min(durations):.0f}"
max_t = f"{max(durations):.0f}"
type = "read" if i % 2 == 0 else "write"
print(f"{type}: total: {len(durations)}, avg: {avg}ms (med: {median}), {min_t}-{max_t}ms")

# print(f"{i}: total: {len(durations)}, avg: {avg}ms (med: {median}), {min_t}-{max_t}ms")


def print_and_assert_results(proc_count, per_proc, tables, t1, t2):
ops = proc_count * per_proc * tables
def print_and_assert_results(readers, writers, per_proc, tables, big_file, compression, t1, t2):
ops = (writers + readers) * per_proc * tables
ops_sec = f"{(ops / (t2 - t1)):.0f}"
print(f"⏱️ {ops_sec}op/s ({ops} in {t2 - t1:.2f}s), {tables = }, {proc_count = }")
print(f"⏱️ {ops_sec}op/s ({ops} in {t2 - t1:.2f}s), {tables = }, {writers = }, {readers = }, {per_proc = }, {big_file = }, {compression = }")
for t in range(tables):
db = DDB.at(f"incr{t}").read()
assert db["counter"] == per_proc * (proc_count // 2)
print(f"✅ counter={db['counter']}")
assert db["counter"]["counter"] == per_proc * writers
# print(f"✅ counter={db['counter']}")


def random_reads(file_count):
""" Read the n tables in random order """
for t in sorted(range(file_count), key=lambda _: random.random()):
DDB.at(f"incr{t}").read()
DDB.at(f"incr{t}").read(key="counter")


def random_writes(file_count):
""" Iterated the n tables in random order and increment the counter """
for t in sorted(range(file_count), key=lambda _: random.random()):
with DDB.at(f"incr{t}").session(as_type=pd) as (session, d):
d["counter"] = lambda x: (x or 0) + 1
with DDB.at(f"incr{t}").session(key="counter", as_type=pd) as (session, d):

d.at("counter").set(d.at("counter").get() + 1)

session.write()


Expand All @@ -46,3 +56,13 @@ def incrementor(i, iterations, file_count):
t_end = time.monotonic_ns()
durations.append((t_end - t_start) / 1e6)
print_stats(i, durations)


def db_job(mode="r", file_count=1, per_proc=1):
durations = []
for _ in range(per_proc):
t_start = time.monotonic_ns()
random_writes(file_count) if mode == "w" else random_reads(file_count)
t_end = time.monotonic_ns()
durations.append((t_end - t_start) / 1e6)
print_stats(mode, durations)
7 changes: 4 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import dictdatabase as DDB
from tests import TEST_DIR
import pytest
import shutil


@pytest.fixture(scope="session")
def env(request):
dir = "./.ddb_pytest_storage"
DDB.config.storage_directory = dir
request.addfinalizer(lambda: shutil.rmtree(dir))
DDB.config.storage_directory = TEST_DIR
request.addfinalizer(lambda: shutil.rmtree(TEST_DIR))



Expand Down
8 changes: 8 additions & 0 deletions tests/test_locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dictdatabase.locking import path_str
from tests import TEST_DIR


def test_path_str():
# Testing the function path_str.
assert path_str("db", "1", 1, "1") == f"{TEST_DIR}/.ddb/db.1.1.1.lock"
assert path_str("db/nest", "1", 1, "1") == f"{TEST_DIR}/.ddb/db/nest.1.1.1.lock"

0 comments on commit 889de6c

Please sign in to comment.