Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better locking #12

Open
mkrd opened this issue Oct 28, 2022 · 2 comments
Open

Better locking #12

mkrd opened this issue Oct 28, 2022 · 2 comments
Labels
enhancement New feature or request

Comments

@mkrd
Copy link
Owner

mkrd commented Oct 28, 2022

Problems

  1. Currently, a lock is removed after a certain threshold of time, so automatically remove dead locks. But if a session is taking longer than the timeout, it will also remove that active lock
  2. Locking is relatively slow under heavy multithreaded work

Solution for 1

  • The timeout problem could be fixed if the active lock periodically (less than timeout duration) rewrites its lock with a newer timestamp, so no other process will remove it. The difficulty is to how this should work, having an extra thread or process doing the refresh work is an option

Solution for 1 and 2

Using a different locking mechanism could fix both problems at once

@mkrd mkrd added this to the v2.1.0 milestone Oct 28, 2022
@mkrd mkrd removed this from the v2.2.0 milestone Nov 9, 2022
@mkrd mkrd added the enhancement New feature or request label Nov 17, 2022
@mkrd mkrd changed the title Refresh locks if process is still active Better locking Nov 28, 2022
@mkrd
Copy link
Owner Author

mkrd commented Nov 28, 2022

Proof of concept with fcntl:

from multiprocessing.pool import Pool
import os
import json
import contextlib
import fcntl
import time


RUNS_PER_WORKER = 30_000
WORKERS = 10


def writer():
    for _ in range(RUNS_PER_WORKER):
        with open("test.json", "r+b") as f:
            fcntl.lockf(f, fcntl.LOCK_EX)
            counter = json.loads(f.read())
            counter["counter"] += 1
            f.seek(0)
            f.write(json.dumps(counter).encode())
            f.truncate()
            fcntl.lockf(f, fcntl.LOCK_UN)


def reader():
    for _ in range(RUNS_PER_WORKER):
        with open("test.json", "r+") as f:
            fcntl.lockf(f, fcntl.LOCK_SH)
            counter = json.loads(f.read())
            fcntl.lockf(f, fcntl.LOCK_UN)


if __name__ == "__main__":
    t1 = time.time()
    with contextlib.suppress(FileExistsError):
        fd = os.open("test.json", os.O_CREAT | os.O_RDWR | os.O_EXCL)
        os.write(fd, json.dumps({"counter": 0}).encode())
        os.close(fd)
    pool = Pool(WORKERS)
    for _ in range(WORKERS):
        pool.apply_async(writer)
        pool.apply_async(reader)
    pool.close()
    pool.join()
    td = time.time() - t1
    print(f"Time: {td:.2f} seconds, per second: {WORKERS * RUNS_PER_WORKER / td:.2f}")

-> 19076 op/s

@mkrd
Copy link
Owner Author

mkrd commented Nov 28, 2022

Current solution:

from multiprocessing.pool import Pool
import os
import json
import contextlib
import time

import dictdatabase as DDB

RUNS_PER_WORKER = 3_000
WORKERS = 10


def writer():
    DDB.config.storage_directory = "."
    for _ in range(RUNS_PER_WORKER):
        with DDB.at("test").session() as (session, t):
            t["counter"] = t["counter"] + 1
            session.write()


def reader():
    DDB.config.storage_directory = "."
    for _ in range(RUNS_PER_WORKER):
        DDB.at("test").read()


if __name__ == "__main__":
    t1 = time.time()
    with contextlib.suppress(FileExistsError):
        fd = os.open("test.json", os.O_CREAT | os.O_RDWR | os.O_EXCL)
        os.write(fd, json.dumps({"counter": 0}).encode())
        os.close(fd)
    pool = Pool(WORKERS)
    for _ in range(WORKERS):
        pool.apply_async(writer)
        pool.apply_async(reader)
    pool.close()
    pool.join()
    td = time.time() - t1
    print(f"Time: {td:.2f} seconds, per second: {WORKERS * RUNS_PER_WORKER / td:.2f}")

-> 538 op/s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant