-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathconnpool.lua
185 lines (152 loc) · 4.39 KB
/
connpool.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
--[=[
Connection pools.
Written by Cosmin Apreutesei. Public domain.
Connection pools allow reusing and sharing a limited number of connections
between multiple threads in order to 1) avoid creating too many connections
and 2) avoid the lag of connecting and authenticating every time a connection
is needed.
connpool([opt]) -> pools
* max_connections : max connections for all pools (100)
* max_waiting_threads : max threads to queue up (1000)
pools:setlimits(key, opt) set limits for a specific pool
pools:get(key, [expires]) -> c get a connection from a pool
pools:put(key, c, s) put a connection in a pool
pools:setlimits(key, opt)
Set limits for a specific pool identified by `key`.
* max_connections : max connections for all pools (pools.max_connections)
* max_waiting_threads : max threads to queue up (pools.max_waiting_threads)
pools:get(key, [expires]) -> c
Get a connection from the pool identified by `key`. Returns nil if the
pool is empty, in which case the caller has to create a connection itself,
use it, and put it in the pool after it's done with it.
The optional `expires` arg specifies how much to wait for a connection
when the pool is full. If not given, there's no waiting.
pools:put(key, c, s)
Put a connection in a pool to be reused.
* `s` is a connected TCP client socket.
* `c` is the hi-level protocol state object that encapsulates the
low-level socket connection.
IMPLEMENTATION
The pool mechanics is simple (it's just a free list) until the connection
limit is reached and then it gets more complicated because we need to put
the threads on a waiting list and resume them in fifo order and we also
need to remove them from wherever they are on the waiting list on timeout.
This is made easy because we have: 1) a ring buffer that allows removal at
arbitrary positions and 2) sock's interruptible timers.
]=]
if not ... then require'connpool_test'; return end
require'glue'
require'sock'
require'queue'
function connpool(opt)
local all_limit = opt and opt.max_connections or 100
local all_waitlist_limit = opt and opt.max_waiting_threads or 1000
assert(all_limit >= 1)
assert(all_waitlist_limit >= 0)
local pools = {}
local servers = {}
local function pool(key)
local pool = servers[key]
if pool then
return pool
end
pool = {}
servers[key] = pool
local n = 0
local free = {}
local limit = all_limit
local waitlist_limit = all_waitlist_limit
local function dbg(event, c)
log('', 'cnpool', event, '%-4s %-4s n=%d free=%d',
currentthread(), c or '', n, #free)
end
function pool:setlimits(opt)
limit = opt.max_connections or limit
waitlist_limit = opt.max_waiting_threads or waitlist_limit
assert(limit >= 1)
assert(waitlist_limit >= 0)
end
local q
local function wait(expires)
if waitlist_limit < 1 or not expires or expires <= clock() then
dbg'notime'
return nil, 'timeout'
end
q = q or queue(waitlist_limit, 'queue_index')
if q:full() then
dbg'q-full'
return nil, 'timeout'
end
local wait_job = wait_job()
q:push(wait_job)
if wait_job:wait_until(expires) then
return true
else
q:remove(wait_job)
return nil, 'timeout'
end
end
local function check_waitlist()
local wait_job = q and q:pop()
if not wait_job then return end
wait_job:resume(true)
end
function pool:get(expires)
local c = pop(free)
if c then
dbg('pop', c)
return c
end
if n >= limit then
dbg('wait', _('%.2ds', expires - clock()))
local ok, err = wait(expires)
if not ok then return nil, err end
local c = pop(free)
if c then
dbg('pop', c)
return c
end
if n >= limit then
dbg'full'
return nil, 'busy'
end
end
dbg'empty'
return nil, 'empty'
end
function pool:put(c, s)
assert(n < limit)
assert(not pool[c])
pool[c] = true
n = n + 1
dbg'put'
function c:release_to_pool()
local c = self
add(free, c)
dbg('release', c)
check_waitlist()
end
s:onclose(function()
assert(pool[c])
pool[c] = nil
n = n - 1
remove_value(free, c)
dbg('close', c)
check_waitlist()
end)
return c
end
return pool
end
function pools:setlimits(key, opt)
assert(limit >= 1)
pool(key):setlimits(opt)
end
function pools:get(key, expires)
return pool(key):get(expires)
end
function pools:put(key, c, s)
return pool(key):put(c, s)
end
return pools
end