-
Notifications
You must be signed in to change notification settings - Fork 34
/
sqlite-python-concurrency-test
executable file
·202 lines (177 loc) · 8.64 KB
/
sqlite-python-concurrency-test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/env python
import contextlib as cl, collections as cs, multiprocessing as mp
import os, sys, random, time, secrets, pathlib as pl
class TestDB:
_db, _db_schema = None, ['''
create table if not exists data (
id integer not null primary key autoincrement,
datapoint integer not null, some_text text not null );
create unique index if not exists data_points on data (datapoint);''']
def __init__(self, opts):
import sqlite3
self._sqlite, conn_kws = sqlite3, dict( (k, opts[k]) for k in
'autocommit isolation_level timeout'.split() if opts.get(k) is not None )
self._db = self._sqlite.connect(database=opts.path, **conn_kws )
with self() as c:
if opts.journal_mode: c.execute(f'pragma journal_mode={opts.journal_mode}')
c.execute('pragma user_version')
if (sv := c.fetchall()[0][0]) == (sv_chk := len(self._db_schema)): return
elif sv > sv_chk:
raise RuntimeError('DB schema [{sv}] newer than the script [{sv_chk}]')
for sv, sql in enumerate(self._db_schema[sv:], sv+1):
for st in sql.split(';'): c.execute(st)
c.execute(f'pragma user_version = {sv}')
def close(self):
if not self._db: return
self._db = self._db.close()
def __enter__(self): return self
def __exit__(self, *err): self.close()
@cl.contextmanager
def __call__(self):
with self._db as conn, cl.closing(conn.cursor()) as c: yield c
def test_reads(self, stats):
with self() as c:
c.execute('select id from data order by id desc limit 1')
if not ((rows := c.fetchall()) and (id_max := rows[0][0])): return
stats['id_max'] = id_max
for n in range(id_max): # query roughly all IDs, in random order
n = random.randint(0, id_max)
c.execute('select datapoint, some_text from data where id = ?', (n,))
stats['select'] += 1
if not (rows := c.fetchall()): stats['select_miss'] += 1
return stats
def test_writes(self, stats, update_chance=0.2):
with self() as c:
data_n = random.randint(0, 2**32 - 1)
data_text = secrets.token_urlsafe( random.randint(3, 30)
if random.random() > 0.2 else random.randint(30, 100) )
if random.random() < update_chance:
c.execute('select id from data order by id desc limit 1')
if not ((rows := c.fetchall()) and (id_max := rows[0][0])): return
stats['update'] += 1; stats['id_max'] = id_max
try:
c.execute( 'update data set datapoint = ?, some_text = ?'
' where id = ? ', (data_n, data_text, random.randint(0, id_max)) )
if not c.rowcount: stats['update_missing'] += 1
except self._sqlite.IntegrityError: stats['collision'] += 1
else:
stats['insert'] += 1
try: c.execute( 'insert into data (datapoint,'
' some_text) values (?, ?)', (data_n, data_text) )
except self._sqlite.IntegrityError: stats['collision'] += 1
stats['id_max'] = c.lastrowid
return stats
class adict(dict):
def __init__(self, *args, **kws):
super().__init__(*args, **kws)
self.__dict__ = self
def test_reader(opts, pipe):
random.seed()
with TestDB(opts.db) as db:
stats, ts0 = cs.Counter(), time.monotonic()
while time.monotonic() < opts.ts_max:
stats['runs'] += 1
db.test_reads(stats)
if opts.r_loops and stats['runs'] >= opts.r_loops: break
stats = dict( stats, type='reader',
pid=str(os.getpid()), runtime=time.monotonic() - ts0 )
pipe.send(stats)
def test_writer(opts, pipe):
random.seed()
with TestDB(opts.db) as db:
stats, ts0 = cs.Counter(), time.monotonic()
while time.monotonic() < opts.ts_max:
stats['runs'] += 1
db.test_writes(stats)
stats = dict( stats, type='writer',
pid=str(os.getpid()), runtime=time.monotonic() - ts0 )
pipe.send(stats)
def main(argv=None):
import argparse, re, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
usage='%(prog)s [opts] db.sqlite',
formatter_class=argparse.RawTextHelpFormatter, description=dd('''
Tool to test various sqlite db options and its python wrapper pathologies.
Creates db file at specified argument path and runs requested tests.'''))
parser.add_argument('db_file', help=dd('''
SQLite database file to create/use. Will be reused as-is, if exists already.'''))
parser.add_argument('-c', '--create-db', action='store_true', help=dd('''
Create fresh db file, removing the old one, if necessary.'''))
parser.add_argument('-r', '--rm-db', action='store_true', help=dd('''
Remove specified sqlite db file after test finishes
successfully, i.e. unless interrupted by Ctrl-C or whatever errors.'''))
group = parser.add_argument_group('Test-run limits/parameters')
group.add_argument('-t', '--test-duration', type=float, metavar='sec', default=10, help=dd('''
Stop running the test roughly after specified number
of seconds, rounded up to test-loop durations. Default: %(default)ss'''))
group.add_argument('-n', '--reader-procs', type=int, metavar='n', default=0, help=dd('''
Run looped queries from specified number of "reader" processes,
setup same as writers, just not running any actual insert/update/delete ops.'''))
group.add_argument('-l', '--reader-loops', type=int, metavar='n', help=dd('''
Run N db-read-loops in every started reader process, instead of using time limit.'''))
group.add_argument('-m', '--writer-procs', type=int, metavar='n', default=0, help=dd('''
Run looped insert/update queries from specified number of "writer" processes.'''))
group = parser.add_argument_group(
'SQLite db/access opts (single-letter ones are all capitalized)' )
group.add_argument('-I', '--sqlite-isolation-level',
metavar='level', default='deferred', help=dd('''
isolation_level option to use for all sqlite.connect() operations.
One of (case-insensitive): deferred, exclusive, immediate or none. Default: %(default)s'''))
group.add_argument('-J', '--sqlite-journal-mode', metavar='mode', help=dd('''
Use "pragma journal_mode=..." after opening sqlite database.
Not set by default (not the same as "-W off"!), meaning that previously-set
value will be used, or whatever is sqlite default when creating a new database file.
Can be set to one of (case-insensitive): delete, truncate, persist, memory, wal, off.
See https://www.sqlite.org/pragma.html#pragma_journal_mode for details.'''))
group.add_argument('-L', '--sqlite-lock-timeout',
type=float, metavar='sec', default=60, help=dd('''
lock_timeout option to use for sqlite.connect(). Default: %(default)ss'''))
group.add_argument('-A', '--sqlite-autocommit', metavar='on/off', help=dd('''
Explicitly set python sqlite module "autocommit" behavior.
Can be one of: on, off. Default is to not set the value, i.e. use module default.'''))
opts = parser.parse_args(argv)
pn, pm = opts.reader_procs, opts.writer_procs
if not (pn > 0 or pm > 0): parser.error('No number for readers/writers to test specified')
test_opts = adict(
db=adict(
path=pl.Path(opts.db_file), autocommit=opts.sqlite_autocommit,
isolation_level=opts.sqlite_isolation_level.upper(),
journal_mode=(jm := opts.sqlite_journal_mode) and jm.upper(),
timeout=opts.sqlite_lock_timeout ),
ts_max=0, r_loops=opts.reader_loops )
if test_opts.db.isolation_level == 'NONE': test_opts.db.isolation_level = None
if v := test_opts.db.autocommit: test_opts.db.autocommit = bool(['off', 'on'].index(v))
if opts.create_db: test_opts.db.path.unlink(missing_ok=True)
with TestDB(test_opts.db) as db: pass # check access/options
pipe, pipe_send = mp.Pipe()
procs, totals, ts0 = list(), cs.Counter(), time.monotonic()
test_opts.ts_max = time.monotonic() + opts.test_duration
for (func, name_tpl, n_proc) in [
(test_writer, 'writer.{}', pm), (test_reader, 'reader.{}', pn) ]:
for n in range(n_proc):
procs.append(p := mp.Process(
target=func, args=(test_opts, pipe_send),
name=name_tpl.format(n), daemon=True ))
p.start()
for n, p in enumerate(procs):
p.join()
if not pipe.poll(): raise RuntimeError('Process exited without sending result')
procs[n] = pipe.recv()
totals['runtime'] = time.monotonic() - ts0
def fmt_vals(d):
for k, v in d.items():
if isinstance(v, int): yield f'{k}={v:,d}'
elif isinstance(v, float): yield f'{k}={v:,.2f}'.removesuffix('.00')
else: yield f'{k}={v}'
for stats in procs:
st_pid, st_t = (stats.pop(k) for k in 'pid type'.split())
totals.update(dict((k, stats.get(k, 0)) for k in 'select insert update'.split()))
totals[f'mean_runtime_{st_t}'] += stats['runtime']; totals[f'n_{st_t}'] += 1
print(f'- pid {st_pid} {st_t} ::', ' '.join(fmt_vals(stats)))
for st_t in 'reader writer'.split():
totals[f'mean_runtime_{st_t}'] = totals[f'mean_runtime_{st_t}'] / totals[f'n_{st_t}']
print('Totals:', ' '.join(fmt_vals(totals)))
if opts.rm_db: test_opts.db.path.unlink(missing_ok=True)
if __name__ == '__main__': sys.exit(main())