forked from koding/kite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kontrolclient.go
468 lines (387 loc) · 11.9 KB
/
kontrolclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
// kontrolclient implements a kite.Client for interacting with Kontrol kite.
package kite
import (
"errors"
"fmt"
"math/rand"
"net/url"
"os"
"sync"
"time"
"github.com/koding/kite/dnode"
"github.com/koding/kite/protocol"
)
const (
kontrolRetryDuration = 10 * time.Second
proxyRetryDuration = 10 * time.Second
// kontrolConnectTimeout is the timeout for connecting to Kontrol in
// TellKontrol-like methods.
kontrolConnectTimeout = 10 * time.Second
)
// Returned from GetKites when query matches no kites.
var ErrNoKitesAvailable = errors.New("no kites availabile")
// kontrolClient is a kite for registering and querying Kites from Kontrol.
type kontrolClient struct {
*Client
sync.Mutex // protects Client
// used for synchronizing methods that needs to be called after
// successful connection or/and registiration to kontrol.
onceConnected sync.Once
onceRegistered sync.Once
readyConnected chan struct{}
readyRegistered chan struct{}
// lastRegisteredURL stores the Kite url what was send/registered
// succesfully to kontrol
lastRegisteredURL *url.URL
// registerChan registers the url's it receives from the channel to Kontrol
registerChan chan *url.URL
}
type registerResult struct {
URL *url.URL
}
// SetupKontrolClient setups and prepares a the kontrol instance. It connects
// to kontrol and reconnects again if there is any disconnections. This method
// is called internally whenever a kontrol client specific action is taking.
// However if you wish to connect earlier you may call this method.
func (k *Kite) SetupKontrolClient() error {
if k.kontrol.Client != nil {
return nil // already prepared
}
if k.Config.KontrolURL == "" {
return errors.New("no kontrol URL given in config")
}
client := k.NewClient(k.Config.KontrolURL)
client.Kite = protocol.Kite{Name: "kontrol"} // for logging purposes
client.Auth = &Auth{
Type: "kiteKey",
Key: k.Config.KiteKey,
}
k.kontrol.Lock()
k.kontrol.Client = client
k.kontrol.Unlock()
k.kontrol.OnConnect(func() {
k.Log.Info("Connected to Kontrol ")
// try to re-register on connect
if k.kontrol.lastRegisteredURL != nil {
select {
case k.kontrol.registerChan <- k.kontrol.lastRegisteredURL:
default:
}
}
// signal all other methods that are listening on this channel, that we
// are connected to kontrol.
k.kontrol.onceConnected.Do(func() { close(k.kontrol.readyConnected) })
})
k.kontrol.OnDisconnect(func() {
k.Log.Warning("Disconnected from Kontrol.")
})
// non blocking, is going to reconnect if the connection goes down.
if _, err := k.kontrol.DialForever(); err != nil {
return err
}
return nil
}
// GetKites returns the list of Kites matching the query. The returned list
// contains Ready to connect Client instances. The caller must connect
// with Client.Dial() before using each Kite. An error is returned when no
// kites are available.
func (k *Kite) GetKites(query *protocol.KontrolQuery) ([]*Client, error) {
if err := k.SetupKontrolClient(); err != nil {
return nil, err
}
clients, err := k.getKites(protocol.GetKitesArgs{Query: query})
if err != nil {
return nil, err
}
if len(clients) == 0 {
return nil, ErrNoKitesAvailable
}
return clients, nil
}
// used internally for GetKites() and WatchKites()
func (k *Kite) getKites(args protocol.GetKitesArgs) ([]*Client, error) {
<-k.kontrol.readyConnected
response, err := k.kontrol.TellWithTimeout("getKites", 4*time.Second, args)
if err != nil {
return nil, err
}
var result = new(protocol.GetKitesResult)
err = response.Unmarshal(&result)
if err != nil {
return nil, err
}
clients := make([]*Client, len(result.Kites))
for i, currentKite := range result.Kites {
auth := &Auth{
Type: "token",
Key: currentKite.Token,
}
clients[i] = k.NewClient(currentKite.URL)
clients[i].Kite = currentKite.Kite
clients[i].Auth = auth
}
// Renew tokens
for _, r := range clients {
token, err := NewTokenRenewer(r, k)
if err != nil {
k.Log.Error("Error in token. Token will not be renewed when it expires: %s", err.Error())
continue
}
token.RenewWhenExpires()
}
return clients, nil
}
// GetToken is used to get a new token for a single Kite.
func (k *Kite) GetToken(kite *protocol.Kite) (string, error) {
if err := k.SetupKontrolClient(); err != nil {
return "", err
}
<-k.kontrol.readyConnected
result, err := k.kontrol.TellWithTimeout("getToken", 4*time.Second, kite)
if err != nil {
return "", err
}
var tkn string
err = result.Unmarshal(&tkn)
if err != nil {
return "", err
}
return tkn, nil
}
// GetKey is used to get a new public key from kontrol if the current one is
// invalidated. The key is also replaced in memory and every request is going
// to use it. This means even if kite.key contains the old key, the kite itself
// uses the new one.
func (k *Kite) GetKey() (string, error) {
if err := k.SetupKontrolClient(); err != nil {
return "", err
}
<-k.kontrol.readyConnected
result, err := k.kontrol.TellWithTimeout("getKey", 4*time.Second)
if err != nil {
return "", err
}
var key string
err = result.Unmarshal(&key)
if err != nil {
return "", err
}
k.Config.KontrolKey = key
return key, nil
}
// NewKeyRenewer renews the internal key every given interval
func (k *Kite) NewKeyRenewer(interval time.Duration) {
ticker := time.NewTicker(interval)
for _ = range ticker.C {
_, err := k.GetKey()
if err != nil {
k.Log.Warning("Key renew failed: %s", err)
}
}
}
// KontrolReadyNotify returns a channel that is closed when a successful
// registiration to kontrol is done.
func (k *Kite) KontrolReadyNotify() chan struct{} {
return k.kontrol.readyRegistered
}
// signalReady is an internal method to notify that a sucessful registiration
// is done.
func (k *Kite) signalReady() {
k.kontrol.onceRegistered.Do(func() { close(k.kontrol.readyRegistered) })
}
// RegisterForever is equilavent to Register(), but it tries to re-register if
// there is a disconnection. The returned error is for the first register
// attempt. It returns nil if ReadNotify() is ready and it's registered
// succesfull.
func (k *Kite) RegisterForever(kiteURL *url.URL) error {
errs := make(chan error, 1)
go func() {
for u := range k.kontrol.registerChan {
_, err := k.Register(u)
if err == nil {
k.kontrol.lastRegisteredURL = u
k.signalReady()
continue
}
select {
case errs <- err:
default:
}
k.Log.Error("Cannot register to Kontrol: %s Will retry after %d seconds",
err, kontrolRetryDuration/time.Second)
time.AfterFunc(kontrolRetryDuration, func() {
select {
case k.kontrol.registerChan <- u:
default:
}
})
}
}()
// don't block if there the given url is nil
if kiteURL == nil {
return nil
}
// initiate a registiration if a url is given
k.kontrol.registerChan <- kiteURL
select {
case <-k.KontrolReadyNotify():
return nil
case err := <-errs:
return err
}
}
// Register registers current Kite to Kontrol. After registration other Kites
// can find it via GetKites() or WatchKites() method. This method does not
// handle the reconnection case. If you want to keep registered to kontrol, use
// RegisterForever().
func (k *Kite) Register(kiteURL *url.URL) (*registerResult, error) {
if err := k.SetupKontrolClient(); err != nil {
return nil, err
}
<-k.kontrol.readyConnected
args := protocol.RegisterArgs{
URL: kiteURL.String(),
}
k.Log.Info("Registering to kontrol with URL: %s", kiteURL.String())
response, err := k.kontrol.TellWithTimeout("register", 4*time.Second, args)
if err != nil {
return nil, err
}
var rr protocol.RegisterResult
err = response.Unmarshal(&rr)
if err != nil {
return nil, err
}
k.Log.Info("Registered to kontrol with URL: %s and Kite query: %s",
rr.URL, k.Kite())
parsed, err := url.Parse(rr.URL)
if err != nil {
k.Log.Error("Cannot parse registered URL: %s", err.Error())
}
// we also received a new public key (means the old one was invalidated).
// Use it now.
if rr.PublicKey != "" {
k.Config.KontrolKey = rr.PublicKey
}
return ®isterResult{parsed}, nil
}
// RegisterToTunnel finds a tunnel proxy kite by asking kontrol then registers
// itselfs on proxy. On error, retries forever. On every successfull
// registration, it sends the proxied URL to the registerChan channel. There is
// no register URL needed because the Tunnel Proxy automatically gets the IP
// from tunneling. This is a blocking function.
func (k *Kite) RegisterToTunnel() {
query := &protocol.KontrolQuery{
Username: k.Config.KontrolUser,
Environment: k.Config.Environment,
Name: "tunnelproxy",
}
k.RegisterToProxy(nil, query)
}
// RegisterToProxy is just like RegisterForever but registers the given URL
// to kontrol over a kite-proxy. A Kiteproxy is a reverseproxy that can be used
// for SSL termination or handling hundreds of kites behind a single. This is a
// blocking function.
func (k *Kite) RegisterToProxy(registerURL *url.URL, query *protocol.KontrolQuery) {
go k.RegisterForever(nil)
for {
var proxyKite *Client
// The proxy kite to connect can be overriden with the
// environmental variable "KITE_PROXY_URL". If it is not set
// we will ask Kontrol for available Proxy kites.
// As an authentication informain kiteKey method will be used,
// so be careful when using this feature.
kiteProxyURL := os.Getenv("KITE_PROXY_URL")
if kiteProxyURL != "" {
proxyKite = k.NewClient(kiteProxyURL)
proxyKite.Auth = &Auth{
Type: "kiteKey",
Key: k.Config.KiteKey,
}
} else {
kites, err := k.GetKites(query)
if err != nil {
k.Log.Error("Cannot get Proxy kites from Kontrol: %s", err.Error())
time.Sleep(proxyRetryDuration)
continue
}
// If more than one one Proxy Kite is available pick one randomly.
// It does not matter which one we connect.
proxyKite = kites[rand.Int()%len(kites)]
}
// Notify us on disconnect
disconnect := make(chan bool, 1)
proxyKite.OnDisconnect(func() {
select {
case disconnect <- true:
default:
}
})
proxyURL, err := k.registerToProxyKite(proxyKite, registerURL)
if err != nil {
time.Sleep(proxyRetryDuration)
continue
}
k.kontrol.registerChan <- proxyURL
// Block until disconnect from Proxy Kite.
<-disconnect
}
}
// registerToProxyKite dials the proxy kite and calls register method then
// returns the reverse-proxy URL.
func (k *Kite) registerToProxyKite(c *Client, kiteURL *url.URL) (*url.URL, error) {
err := c.Dial()
if err != nil {
k.Log.Error("Cannot connect to Proxy kite: %s", err.Error())
return nil, err
}
// Disconnect from Proxy Kite if error happens while registering.
defer func() {
if err != nil {
c.Close()
}
}()
// do not panic if we call Tell method below
if kiteURL == nil {
kiteURL = &url.URL{}
}
// this could be tunnelproxy or reverseproxy. Tunnelproxy doesn't need an
// URL however Reverseproxy needs one.
result, err := c.TellWithTimeout("register", 4*time.Second, kiteURL.String())
if err != nil {
k.Log.Error("Proxy register error: %s", err.Error())
return nil, err
}
proxyURL, err := result.String()
if err != nil {
k.Log.Error("Proxy register result error: %s", err.Error())
return nil, err
}
parsed, err := url.Parse(proxyURL)
if err != nil {
k.Log.Error("Cannot parse Proxy URL: %s", err.Error())
return nil, err
}
return parsed, nil
}
// TellKontrolWithTimeout is a lower level function for communicating directly with
// kontrol. Like GetKites and GetToken, this automatically sets up and connects to
// kontrol as needed.
func (k *Kite) TellKontrolWithTimeout(method string, timeout time.Duration, args ...interface{}) (result *dnode.Partial, err error) {
if err := k.SetupKontrolClient(); err != nil {
return nil, err
}
// Wait for readyConnect, or timeout
select {
case <-time.After(kontrolConnectTimeout):
return nil, &Error{
Type: "timeout",
Message: fmt.Sprintf(
"Timed out registering to kontrol for %s method after %s",
method, kontrolConnectTimeout,
),
}
case <-k.kontrol.readyConnected:
}
return k.kontrol.TellWithTimeout(method, timeout, args...)
}