-
Notifications
You must be signed in to change notification settings - Fork 6
/
tarantool.lua
443 lines (392 loc) · 12.3 KB
/
tarantool.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
--[=[
Tarantool client for Tarantool 2.8 (based on sock and msgpack).
Written by Cosmin Apreutesei. Public Domain.
Features: streams, prepared statements, UUID & DECIMAL types.
CONECTING
[try_]tarantool_connect(opt) -> tt connect to server
opt.host host (`'127.0.0.1'`)
opt.port port (`3301`)
opt.user user (optional)
opt.password password (optional)
opt.timeout timeout (`5`)
opt.mp msgpack instance to use (optional)
tt:stream() -> tt create a stream
SELECTING
tt:select(space,[index],[key],[sopt]) -> tuples select tuples from a space
sopt.limit limit (`4GB-1`)
sopt.offset offset (`0`)
sopt.iterator iterator
UPDATING
tt:[try_]insert(space, tuple) insert a tuple in a space
tt:[try_]replace(space, tuple) insert or update a tuple in a space
tt:[try_]delete(space, key) delete tuples from a space
tt:[try_]update(space, index, key, oplist) update tuples in bulk
https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/
tt:[try_]upsert(space, index, key, oplist) insert or update tuples in bulk
https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/upsert/
LUA RPC
tt:[try_]eval(expr, ...) -> ... eval Lua expression on the server
tt:[try_]call(fn, ...) -> ... call Lua function on the server
SQL QUERIES
tt:[try_]exec(sql, params, [xopt]) -> rows execute SQL statement
SQL PREPARED STATEMENTS
tt:[try_]prepare(sql) -> st prepare SQL statement
st:[try_]exec(params, [xopt]) -> rows exec prepared statement
st:[try_]free() unprepare statement
st.fields field list with field info
st.params param list with param info
MISC
tt:[try_]ping() ping
tt:clear_metadata_cache() clear `space` and `index` names
tt.mp msgpack instance used
What the args mean:
* `space` and `index` can be given by name or number. Resolved names are
cached so you need to call `tt:clear_metadata_cache()` if you know that
a space or index got renamed or removed (but not when new ones are created).
If you're using SQL exclusively, you don't have to worry about this.
* `tuple` is an array of values.
* `key` can be a string or an array of values.
* `oplist` is an array of update operations of form `{op, field, value}`.
* `params` in `tt:exec()` must always be an array, even when you're using
named params in the query. `st:exec()` doesn't have that limitation and
requires you to put `'?'` params in the array part and the named params in
the hash part of the params table.
* there's no valid `xopt` options yet.
]=]
if not ... then require'tarantool_test'; return end
require'sock'
require'glue'
require'base64'
require'sha1'
require'msgpack'
local
u8a, u8p, buffer, empty, memoize, object =
u8a, u8p, buffer, empty, memoize, object
local c = {host = '127.0.0.1', port = 3301, timeout = 5, tracebacks = false}
--IPROTO_*
local OK = 0
local SELECT = 1
local INSERT = 2
local REPLACE = 3
local UPDATE = 4
local DELETE = 5
local AUTH = 7
local EVAL = 8
local UPSERT = 9
local CALL = 10
local EXECUTE = 11
local NOP = 12
local PREPARE = 13
local PING = 0x40
local REQUEST_TYPE = 0x00
local SYNC = 0x01
local SPACE_ID = 0x10
local INDEX_ID = 0x11
local LIMIT = 0x12
local OFFSET = 0x13
local ITERATOR = 0x14
local KEY = 0x20
local TUPLE = 0x21
local FUNCTION_NAME = 0x22
local USER_NAME = 0x23
local EXPR = 0x27
local OPS = 0x28
local OPTIONS = 0x2b
local DATA = 0x30
local ERROR = 0x31
local METADATA = 0x32
local BIND_METADATA = 0x33
local BIND_COUNT = 0x34
local SQL_TEXT = 0x40
local SQL_BIND = 0x41
local SQL_INFO = 0x42
local STMT_ID = 0x43
local FIELD_NAME = 0x00
local FIELD_TYPE = 0x01
local FIELD_COLL = 0x02
local FIELD_IS_NULLABLE = 0x03
local FIELD_IS_AUTOINCREMENT = 0x04
local FIELD_SPAN = 0x05
local STREAM_ID = 0x0a
local SQL_INFO_ROW_COUNT = 0
local SQL_INFO_AUTOINCREMENT_IDS = 1
-- default views
local VIEW_SPACE = 281
local VIEW_INDEX = 289
-- index info
local INDEX_SPACE_NAME = 2
local INDEX_INDEX_NAME = 2
local function xor_strings(s1, s2)
assert(#s1 == #s2)
local n = #s1
local p1 = cast(u8p, s1)
local p2 = cast(u8p, s2)
local b = u8a(n)
for i = 0, n-1 do
b[i] = xor(p1[i], p2[i])
end
return str(b, n)
end
local request, tselect --fw. decl.
local MP_DECIMAL = 1
local MP_UUID = 2
--NOTE: only works with luapower's ldecnumber which has frompacked().
local function decode_decimal(mp, p, i, len)
local ldecnumber = require'ldecnumber'
local i2 = i + len
local i1, scale = mp:decode_next(p, i2, i)
local s = str(p+i1, i2-i1) --lame that we have to intern a string for this.
return ldecnumber.frompacked(s, scale)
end
local function decode_uuid(mp, p, i, len) --16 bytes binary UUID
return str(p+i, len)
end
tarantool_connect = function(opt)
local c = object(c, opt)
log('note', 'taran', 'connect', '%s:%s user=%s', c.host, c.port, c.user or '')
c:clear_metadata_cache()
c.tcp = tcp()
c.tcp:settimeout(c.timeout)
c.tcp:connect(c.host, c.port)
c._b = buffer()
c.mp = opt.mp or msgpack()
c.mp.error = function(err) c.tcp:checkp(false, '%s', err) end
c.mp.decoder[MP_DECIMAL] = decode_decimal
c.mp.decoder[MP_UUID ] = decode_uuid
c._mb = c.mp:encoding_buffer()
local b = c._b(64)
c.tcp:recvn(b, 64) --greeting
local salt = str(c.tcp:recvn(b, 64), 44)
if c.user then
local body = {[USER_NAME] = c.user}
if c.password and c.password ~= '' then
local salt = base64_decode(salt):sub(1, 20)
local s1 = sha1(c.password)
local s2 = sha1(s1)
local s3 = sha1(salt .. s2)
local scramble = xor_strings(s1, s3)
body[TUPLE] = c.mp.array('chap-sha1', scramble)
end
request(c, AUTH, body)
end
return c
end
try_tarantool_connect = protect_io(tarantool_connect)
c.stream = function(c)
c.last_stream_id = (c.last_stream_id or 0) + 1
return object(c, {stream_id = c.last_stream_id})
end
c.close = function(c)
log('note', 'taran', 'close', '%s:%s', c.host, c.port)
return c.tcp:close()
end
--[[local]] function request(c, req_type, body)
c.tcp:settimeout(c.timeout)
c.sync_num = (c.sync_num or 0) + 1
local header = {
[SYNC] = c.sync_num,
[REQUEST_TYPE] = req_type,
[STREAM_ID] = c.stream_id,
}
local mp = c.mp
local mb = c._mb
local req = mb:reset():encode_map(header):encode_map(body):tostring()
local len = mb:reset():encode_int(#req):tostring()
c.tcp:send(len .. req)
local p = c.tcp:recvn(c._b(5), 5)
local _, size = mp:decode_next(p, 5)
local p = c.tcp:recvn(c._b(size), size)
local i, res_header = mp:decode_next(p, size)
c.tcp:checkp(res_header[SYNC] == c.sync_num)
local i, res_body = mp:decode_next(p, size, i)
local code = res_header[REQUEST_TYPE]
if code ~= OK then
c.tcp:checkp(false, res_body[ERROR])
end
return res_body
end
local function resolve_space(c, space)
return type(space) == 'number' and space or c._lookup_space(space)
end
local function resolve_index(c, space, index)
index = index or 0
local space = resolve_space(c, space)
return space, type(index) == 'number' and index or c._lookup_index(space, index)
end
c.clear_metadata_cache = function(c)
c._lookup_space = memoize(function(space)
local t = tselect(c, VIEW_SPACE, INDEX_SPACE_NAME, space)
return c.tcp:checknp(t[1] and t[1][1], "no space '%s'", space)
end)
c._lookup_index = memoize(function(spaceno, index)
if not spaceno then return end
local t = tselect(c, VIEW_INDEX, INDEX_INDEX_NAME, {spaceno, index})
return c.tcp:checknp(t[1] and t[1][2], "no index '%s'", index)
end)
end
local function key_arg(mp, key)
return mp.toarray(type(key) == 'table' and key or key == nil and {} or {key})
end
local function fields(t)
if not t then return end
local dt = {}
for i, t in ipairs(t) do
dt[i] = {
name = t[FIELD_NAME],
type = t[FIELD_TYPE],
collation = t[FIELD_COLL],
not_null = not t[FIELD_IS_NULLABLE],
autoinc = t[FIELD_IS_AUTOINCREMENT],
span = t[FIELD_SPAN],
}
end
return dt
end
local function apply_sqlinfo(dt, t)
if t then --update query
dt.affected_rows = t[SQL_INFO_ROW_COUNT]
dt.autoinc_ids = t[SQL_INFO_AUTOINCREMENT_IDS]
end
return dt
end
local function exec_response(res)
return apply_sqlinfo(res[DATA] or {}, res[SQL_INFO]), fields(res[METADATA])
end
function c.select(c, space, index, key, opt)
opt = opt or empty
local space, index = resolve_index(c, space, index)
local body = {
[SPACE_ID] = space,
[INDEX_ID] = index,
[KEY] = key_arg(c.mp, key),
}
body[LIMIT] = opt.limit or 0xFFFFFFFF
body[OFFSET] = opt.offset or 0
body[ITERATOR] = opt.iterator
return exec_response(request(c, SELECT, body))
end
tselect = c.select
c.try_select = protect_io(c.select)
function c.insert(c, space, tuple)
return request(c, INSERT, {
[SPACE_ID] = resolve_space(c, space),
[TUPLE] = mp.toarray(tuple),
})[DATA]
end
c.try_insert = protect_io(c.insert)
function c.replace(c, space, tuple)
return request(c, REPLACE, {
[SPACE_ID] = resolve_space(c, space),
[TUPLE] = mp.toarray(tuple),
})[DATA]
end
c.try_replace = protect_io(c.replace)
function c.update(c, space, index, key, oplist)
local space, index = resolve_index(c, space, index)
return request(c, UPDATE, {
[SPACE_ID] = space,
[INDEX_ID] = index,
[KEY] = key_arg(c.mp, key),
[TUPLE] = mp.toarray(oplist),
})[DATA]
end
c.try_update = protect_io(c.update)
function c.delete(c, space, key)
local space, index = resolve_index(c, space, index)
return request(c, DELETE, {
[SPACE_ID] = space,
[INDEX_ID] = index,
[KEY] = key_arg(c.mp, key),
})[DATA]
end
c.try_delete = protect_io(c.delete)
function c.upsert(c, space, index, key, oplist)
return request(c, UPSERT, {
[SPACE_ID] = resolve_space(c, space),
[INDEX_ID] = index,
[OPS] = oplist,
[TUPLE] = key_arg(c.mp, key),
})[DATA]
end
c.try_upsert = protect_io(c.upsert)
function c.eval(c, expr, ...)
if type(expr) == 'function' then
expr = require'pp'.format(expr)
expr = format('return assert(%s)(...)', expr)
end
return unpack(request(c, EVAL, {[EXPR] = expr, [TUPLE] = c.mp.array(...)})[DATA])
end
c.try_eval = protect_io(c.eval)
function c.call(c, fn, ...)
return unpack(request(c, CALL, {[FUNCTION_NAME] = fn, [TUPLE] = c.mp.array(...)})[DATA])
end
c.try_call = protect_io(c.call)
function c.exec(c, sql, params, xopt, param_meta)
if param_meta and param_meta.has_named_params then --pick params from named keys
local t = params
params = {}
for i,f in ipairs(param_meta) do
if f.index then
params[i] = t[f.index]
else
params[i] = t[f.name]
end
end
end
log('', 'taran', 'exec', '%s', sql)
return exec_response(request(c, EXECUTE, {
[STMT_ID] = type(sql) == 'number' and sql or nil,
[SQL_TEXT] = type(sql) == 'string' and sql or nil,
[SQL_BIND] = params,
[OPTIONS] = xopt or empty,
}))
end
c.try_exec = protect_io(c.exec)
local st = {}
local function params(t)
t = fields(t)
local j = 0
for i,f in ipairs(t) do
if f.name:sub(1, 1) == ':' then
f.name = f.name:sub(2)
t.has_named_params = true
else
j = j + 1
f.index = j
end
end
return t
end
function c.prepare(c, sql)
local res = request(c, PREPARE, {
[SQL_TEXT] = type(sql) == 'string' and sql or nil,
})
return object(st, {
id = res[STMT_ID],
conn = c,
fields = fields(res[METADATA]),
params = params(res[BIND_METADATA]),
})
end
c.try_prepare = protect_io(c.prepare)
function st:exec(params, xopt)
return self.conn:exec(self.id, params, xopt, self.params)
end
local function unprepare(c, stmt_id)
return request(c, PREPARE, {[STMT_ID] = stmt_id})[STMT_ID]
end
local try_unprepare = protect_io(unprepare)
function st:free()
return unprepare(self.conn, self.id)
end
function st:try_free()
return try_unprepare(self.conn, self.id)
end
function c.ping(c)
return request(c, PING, empty)
end
c.try_ping = protect_io(c.ping)
local function esc_quote(s) return "''" end
function c.esc(s)
return s:gsub("'", esc_quote)
end