-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
196 lines (160 loc) · 4.5 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
from threading import Lock, Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty
class Database(object):
thread_ = None
thread_lock_ = Lock()
def __init__(self, db, *args, **kws):
self.db = db
self.db_args = args
self.db_kws = kws
def check_thread(self):
with Database.thread_lock_:
if Database.thread_ and not Database.thread_.is_alive():
Database.thread_ = None
if not Database.thread_:
Database.thread_ = DatabaseThread(self.db, self.db_args, self.db_kws)
Database.thread_.daemon = False
Database.thread_.start()
def close(self):
self.check_thread()
Database.thread_.close()
def execute(self, query, args):
self.check_thread()
Database.thread_.execute(query, args)
def select(self, query, args):
self.check_thread()
return Database.thread_.select(query, args)
def get_tags(self, bookmark_id=None):
if bookmark_id:
iter = self.select(
'select tag_name from tag where tag.bookmark_id=? group by tag_name',
(bookmark_id,)
)
for i in iter:
yield i[0]
def get_bookmarks(self, url=None, id=None, quickmark=None, tags=()):
where = ''
args = []
if url:
where += 'url=? '
args.append(url)
if id:
if where: where += 'and '
where += 'id=? '
args.append(id)
if quickmark:
if where: where += 'and '
where += 'quickmark=? '
args.append(quickmark)
for tag in tags:
if where: where += 'and '
where += 'exists(select * from tag where tag.tag_name=? and tag.bookmark_id = bookmark.id) '
args.append(tag)
iter = self.select(
'select bookmark.id, bookmark.url, bookmark.title, bookmark.quickmark ' +
'from bookmark ' +
('where ' + where if where else '') +
'group by bookmark.id, bookmark.url, bookmark.title, bookmark.quickmark '
,
args
)
for i in iter:
yield {
'id': i[0],
'url': i[1],
'title': i[2],
'quickmark': i[3],
'tags': set(self.get_tags(bookmark_id=i[0])),
}
def delete_bookmark(self, id):
self.execute('delete from tag where bookmark_id=?', (id,))
self.execute('delete from bookmark where id=?', (id,))
def update(self, bookmark):
old = None
if 'id' in bookmark:
r = tuple(self.get_bookmarks(id=bookmark['id']))
if len(r): old = r[0]
elif 'url' in bookmark:
r = tuple(self.get_bookmarks(url=bookmark['url']))
if len(r): old = r[0]
if old: # UPDATE
to_remove = set(old['tags']) - set(bookmark.get('tags', ()))
to_add = set(bookmark.get('tags', ())) - set(old['tags'])
for tag_name in to_remove:
self.execute(
'delete from tag where tag_name=? and bookmark_id=?',
(tag_name, old['id'])
)
for tag_name in to_add:
self.execute(
'insert into tag (tag_name, bookmark_id) values (?, ?)',
(tag_name, old['id'])
)
self.execute(
'update bookmark set url=?, title=?, quickmark=? where id=?',
(
bookmark.get('url', old['url']),
bookmark.get('title', old['title']),
bookmark.get('quickmark', old['quickmark']) or None,
old['id']
)
)
else: # INSERT
self.execute(
'insert into bookmark (url, title, quickmark) values (?, ?, ?)',
(
bookmark['url'],
bookmark.get('title', bookmark['url']),
bookmark.get('quickmark') or None
)
)
for bm in self.select('select id from bookmark where url=?', (bookmark['url'],)):
for tag in bookmark['tags']:
self.execute(
'insert into tag (bookmark_id, tag_name) values (?, ?)',
(bm[0], tag)
)
class DatabaseThread(Thread):
class Close: pass
class End: pass
def __init__(self, db, db_args, db_kws):
Thread.__init__(self)
self.db = db
self.db_args = db_args
self.db_kws = db_kws
self.tasks = Queue()
def execute(self, query, args, result=None):
self.tasks.put((query, args, result))
def select(self, query, args):
result = Queue()
self.execute(query, args, result)
r = None
while True:
r = result.get(True, 5)
if r is self.End:
break
yield r
def close(self):
self.tasks.put((self.Close, None, None))
def run(self):
try:
connection = self.db.connect(*self.db_args, **self.db_kws)
cursor = connection.cursor()
while True:
try:
query, args, result = self.tasks.get(True, 20)
except Empty:
continue
if query is self.Close: break
print("EXECUTING:", query)
cursor.execute(query, args)
connection.commit()
if result:
for row in cursor:
result.put(row)
result.put(self.End)
finally:
connection.close()