-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.go
114 lines (93 loc) · 2.09 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main
import (
"os"
"os/signal"
"syscall"
"github.com/criteo/consul-timeline/consul"
"github.com/criteo/consul-timeline/storage"
_ "github.com/go-sql-driver/mysql"
"github.com/criteo/consul-timeline/server"
cass "github.com/criteo/consul-timeline/storage/cassandra"
"github.com/criteo/consul-timeline/storage/memory"
"github.com/criteo/consul-timeline/storage/mysql"
tl "github.com/criteo/consul-timeline/timeline"
"github.com/criteo/consul-timeline/watch"
log "github.com/sirupsen/logrus"
)
const (
eventsBuffer = 200
)
func dupEvents(in <-chan tl.Event) (<-chan tl.Event, <-chan tl.Event) {
o1 := make(chan tl.Event, eventsBuffer)
o2 := make(chan tl.Event, eventsBuffer)
go func() {
for e := range in {
o1 <- e
o2 <- e
}
}()
return o1, o2
}
func main() {
cfg := GetConfig()
if cfg.Mysql.PrintSchema {
mysql.PrintSchema()
return
}
logLvl, err := log.ParseLevel(cfg.LogLevel)
if err != nil {
log.Fatal(err)
}
log.SetLevel(logLvl)
// consul client
consul := consul.New(cfg.Consul)
// storage
var strg storage.Storage
switch cfg.Storage {
case mysql.Name:
strg, err = mysql.New(cfg.Mysql, consul.Datacenter)
if err != nil {
log.Fatal(err)
}
case cass.Name:
strg, err = cass.New(cfg.Cassandra)
if err != nil {
log.Fatal(err)
}
case memory.Name:
fallthrough
default:
log.Warnf("storing up to %d events in memory", cfg.Memory.MaxSize)
strg = memory.New(cfg.Memory)
}
if cfg.Consul.EnableDistributedLock {
dstrg := storage.NewDistributed(consul, strg)
defer dstrg.Stop()
strg = dstrg
}
strg = storage.NewMetrics(strg)
// consul watch
w := watch.New(consul, eventsBuffer)
events := w.Run()
storageEvents, apiEvents := dupEvents(events)
// HTTP api
api := server.New(cfg.Server, strg, w, apiEvents)
go func() {
err := api.Serve()
if err != nil {
log.Fatal(err)
}
}()
go func() {
for e := range storageEvents {
err := strg.Store(e)
if err != nil {
log.Error(err)
}
}
}()
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
<-c
log.Info("stopping...")
}