-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathquery.lua
288 lines (248 loc) · 8.79 KB
/
query.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
--[==[
SQL database access
Written by Cosmin Apreutesei. Public Domain.
PREPROCESSOR
sqlval(s) -> s quote string to SQL literal
sqlname(s) -> s quote string to SQL identifier
sqlparams(s, t) -> s quote query with :name placeholders.
sqlquery(s, t) -> s quote query with any preprocessor directives.
sqlrows(rows[, opt]) -> s quote rows to SQL insert values list
null placeholder for null value in params and results
sql_default placeholder for "default value" in params
qsubst(typedef) create a substitution definition
qmacro.<name> = f(args...) create a macro definition
sqlpp([ns]) get the sqlpp instance, for extending.
sqlpps.ENGINE get the sqlpp instance, for extending.
EXECUTION
db([ns]) -> db get a sqlpp connection
[db:]create_db([ns]) create database
[db:]query([opt,]sql, ...) -> rows query and return rows in a table
[db:]first_row([opt,]sql, ...) -> t query and return first row or value
[db:]first_row_vals([opt,]sql, ...) -> v1,... query and return the first row unpacked
[db:]each_row([opt,]sql, ...) -> iter query and iterate rows
[db:]each_row_vals([opt,]sql, ...) -> iter query and iterate rows unpacked
[db:]each_group(col, [opt,]sql, ...) -> iter query, group rows and and iterate groups
[db:]atomic(f) run f in transaction
[db:]on_table_changed(f) run f(schema, table) when a table changes
[db:]start_transaction() start transaction
[db:]end_transaction('commit'|'rollback') end transaction
[db:]commit() commit
[db:]rollback() rollback
release_dbs() release db connections back into the pool
DDL
[db:]db_exists(dbname) -> t|f check if db exists
[db:]table_def(tbl) -> def table definition
[db:]drop_table(name) drop table
[db:]drop_tables('T1 T2 ...') drop multiple tables
[db:]table_exists(tbl) -> t|f check if table exists
[db:]add_column(tbl, name, type, pos) add column
[db:]rename_column(tbl, old_name, new_name) rename column
[db:]drop_column(tbl, col) remove column
[db:][re]add_fk(tbl, col, ...) (re)create foreign key
[db:][re]add_uk(tbl, col) (re)create unique key
[db:][re]add_ix(tbl, col) (re)create index
[db:]drop_fk(tbl, col) drop foreign key
[db:]drop_uk(tbl, col) drop unique key
[db:]drop_ix(tbl, col) drop index
[db:][re]add_trigger(name, tbl, on, code) (re)create trigger
[db:]drop_trigger(name, tbl, on) drop trigger
[db:][re]add_proc(name, args, code) (re)create stored proc
[db:]drop_proc(name) drop stored proc
[db:][re]add_column_locks(tbl, cols) trigger to make columns read-only
DEBUGGING
pqr(opt | rows,fields) pretty-print query result
outpqr(opt | rows,fields) same but using out()
]==]
require'webb'
require'connpool'
require'mysql_print'
require'sqlpp'
sqlpps = {}
sqlpps.mysql = sqlpp'mysql'
sqlpps.tarantool = sqlpp'tarantool'
local default_port = {mysql = 3306, tarantool = 3301}
local pool = connpool()
sql_default = {'default'}
sqlpps.mysql .define_symbol('null', null)
sqlpps.tarantool.define_symbol('null', null)
sqlpps.mysql .define_symbol('default', sql_default)
sqlpps.tarantool.define_symbol('default', sql_default)
qsubst = sqlpps.mysql.subst
qmacro = sqlpps.mysql.macro
ttsubst = sqlpps.tarantool.subst
ttmacro = sqlpps.tarantool.macro
local function pconfig(ns, k, default, use_default)
if ns then
return config(ns..'_'..k, use_default ~= false and config(k, default) or nil)
else
return config(k, default)
end
end
function dbname(ns)
return pconfig(ns, 'db_name')
or pconfig(ns, 'db_name', scriptname..(ns and '_'..ns or ''))
end
local conn_opt = memoize(function(ns)
local t = {}
local engine = pconfig(ns, 'db_engine', 'mysql')
t.sqlpp = assert(sqlpps[engine])
if ns then
local host = pconfig(ns, 'db_host', nil, false)
local port = pconfig(ns, 'db_port', nil, false)
assertf(host or port, 'host and/or port expected for %s', ns)
end
t.host = pconfig(ns, 'db_host', '127.0.0.1')
t.port = pconfig(ns, 'db_port', assert(default_port[engine]))
t.user = pconfig(ns, 'db_user', 'root')
t.pass = pconfig(ns, 'db_pass', 'root')
t.db = dbname(ns)
t.charset = 'utf8mb4'
t.pool_key = t.user..'@'..t.host..':'..t.port..':'..(t.db or '')
t.tracebacks = true
t.schema = pconfig(ns, 'db_schema')
return t
end)
function sqlpp(ns)
return conn_opt(ns or false).sqlpp
end
local DBS = {}
local all_dbs = {}
function close_all_dbs()
for db in pairs(all_dbs) do
db:close()
end
assert(isempty(all_dbs))
end
local function _release_dbs(dbs, ok, err)
for key, db in pairs(dbs) do
if db:in_transaction() then
db:end_transaction(ok and 'commit' or 'rollback')
end
end
for key, db in pairs(dbs) do
db:release_to_pool()
dbs[key] = nil
end
end
local function ownthreaddbs(thread, create_env)
local env = ownthreadenv(thread, create_env)
local dbs = env and rawget(env, DBS)
if not dbs then
if create_env ~= false then
dbs = {}
rawset(env, DBS, dbs)
onthreadfinish(thread, function(thread, ok, err)
_release_dbs(dbs, ok, err)
end)
end
end
return dbs
end
function release_dbs()
local dbs = ownthreaddbs(nil, false)
if not dbs then return end
_release_dbs(dbs, true)
end
function db(ns, without_current_db)
local opt = conn_opt(ns or false)
local key = opt.pool_key
local thread = currentthread()
local dbs = ownthreaddbs(thread)
local req = _G.http_request and http_request(thread)
--NOTE: browsers keep multiple connections open, and even keep them
--open for a while after closing the last browser window(!), and
--we don't want to hold on to pooled resources like db connections
--on idle http connections, so we're releasing them after each request.
if req and thread == req.thread then
if not req._release_dbs_hooked then
http_request():onfinish(function(req, ok, err)
_release_dbs(dbs, ok, err)
end)
req._release_dbs_hooked = true
end
end
local db, err = dbs[key]
if not db then
::again::
db, err = pool:get(key)
if not db then
assert(err == 'empty', err)
if without_current_db then
opt = update({}, opt)
opt.db = nil
end
db = opt.sqlpp.connect(opt)
all_dbs[db] = true
db.rawconn.f:onclose(function()
all_dbs[db] = nil
end)
pool:put(key, db, db.rawconn.f)
end
local ok, err = pcall(db.start_transaction, db)
if not ok then
if iserror(err, 'io') then --disconnected
goto again
else
error(err)
end
end
dbs[key] = db
end
return db
end
function create_db(ns)
local db = db(ns, true)
local dbname = dbname(ns)
db:create_db(dbname)
local schema = conn_opt(ns or false).schema
db:use(dbname, schema)
return db
end
for method in pairs{
--preprocessor
sqlval=1, sqlrows=1, sqlname=1, sqlparams=1, sqlquery=1,
--query execution
query=1, first_row=1, first_row_vals=1, each_row=1, each_row_vals=1, each_group=1,
atomic=1, on_table_changed=1,
start_transaction=1, end_transaction=1, commit=1, rollback=1, in_transaction=1,
--schema reflection
dbs=1, db_exists=1, table_def=1,
--ddl
drop_table=1, drop_tables=1, table_exists=1,
add_column=1, rename_column=1, drop_column=1,
add_check=1, readd_check=1, drop_check=1,
add_fk=1, readd_fk=1, drop_fk=1,
add_uk=1, readd_uk=1, drop_uk=1,
add_ix=1, readd_ix=1, drop_ix=1,
add_trigger=1, readd_trigger=1, drop_trigger=1,
add_proc=1, read_proc=1, drop_proc=1,
add_column_locks=1, readd_column_locks=1,
--mdl
insert_row=1, insert_or_update_row=1, update_row=1, delete_row=1,
} do
_G[method] = function(...)
local db = db()
return db[method](db, ...)
end
end
function pqr(rows, fields)
local opt = rows.rows and rows or {rows = rows, fields = fields}
return mysql_print_result(opt)
end
function outpqr(rows, fields)
local opt = rows.rows and update({}, rows) or {rows = rows, fields = fields}
opt.print = outprint
mysql_print_result(opt)
end
if not ... then
config('db_host', '10.0.0.5')
config('db_port', 3307)
config('db_name', 'information_schema')
run(function()
for _,s in each_row_vals([[
select table_name from tables where table_schema = ?
]], 'mysql') do
pr(s)
end
end)
end