-
Notifications
You must be signed in to change notification settings - Fork 3
/
_filelock.py
57 lines (47 loc) · 1.49 KB
/
_filelock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import os
import time
base_dir = os.path.dirname(os.path.abspath(__file__))
try:
from filelock import FileLock
except:
class FileLock:
def __init__(self, path):
self.__path = path
@property
def lock_file(self):
return self.__path
def create_file(self):
try:
open(self.__path, "x")
return True
except:
return False
def delete_file(self):
try:
os.remove(self.__path)
return True
except:
return False
def lock(self, timeout=None):
future = time.time() + timeout if timeout is not None else 0xffff
while not self.create_file():
time.sleep(0.005)
if timeout is not None and time.time() < future:
return False
return True
def unlock(self, timeout=None):
future = time.time() + timeout if timeout is not None else 0xffff
while not self.delete_file():
time.sleep(0.005)
if timeout is not None and time.time() < future:
return False
return True
acquire = lock
release = unlock
def __enter__(self):
self.lock()
# print("__enter__")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# print("__exit__")
self.unlock()