forked from zoni/nagios-check-runner
-
Notifications
You must be signed in to change notification settings - Fork 1
/
runner.go
136 lines (122 loc) · 3.14 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package nca
import (
log "gopkg.in/inconshreveable/log15.v2"
"os"
"os/signal"
)
// Runner is the main component of the application. It runs the Checker and
// Publisher subcomponents and facilitates communication between them.
type Runner struct {
config Config
checker *checker
publishers map[string]Publisher
publishChan chan *CheckResult // The cchannel check results are received from
log log.Logger
done chan struct{} // Used for signalling goroutines that we're shutting down
}
// NewRunner creates a new Runner with the given configuration.
func NewRunner(cfg Config) (*Runner, error) {
r := &Runner{}
if err := r.Init(cfg); err != nil {
return nil, err
}
return r, nil
}
// NewRunnerFromFile creates a new Runner using the configuration loaded
// from the given file.
func NewRunnerFromFile(filename string) (*Runner, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
cfg, err := ReadConfig(f)
if err != nil {
return nil, err
}
r, err := NewRunner(*cfg)
if err != nil {
return nil, err
}
return r, nil
}
// Init initializes the Runner with the given configuration.
func (r *Runner) Init(cfg Config) error {
r.config = cfg
r.log = Log.New("component", "runner")
r.checker = &checker{}
r.checker.RegisterChecks(r.config.Checks)
r.publishers = make(map[string]Publisher)
for label, config := range cfg.Publishers {
ptype := config["type"].(string)
p := newPublisher(ptype)
if err := p.Configure(config); err != nil {
r.log.Error("Invalid publisher configuration", "publisher", label, "error", err)
return err
}
r.publishers[label] = p
}
return nil
}
// Start starts the Runner and begins running checks, publishing them as they
// complete.
func (r *Runner) Start() error {
r.log.Info("Runner starting")
r.done = make(chan struct{})
r.publishChan, _ = r.checker.Start()
for _, publisher := range r.publishers {
publisher.Start()
}
go r.process()
return nil
}
// process reads results produced by the checker and distributes them
// to the publishers.
func (r *Runner) process() {
r.log.Debug("process() loop start")
for {
select {
case result := <-r.publishChan:
if result == nil {
continue
}
for label, publisher := range r.publishers {
go func() {
l := r.log.New("check", result.Name, "publisher", label)
l.Debug("Publishing check result to publisher")
if err := publisher.Publish(result); err != nil {
l.Warn("Error publishing check result", "error", err)
} else {
l.Debug("Check result published")
}
}()
}
case <-r.done:
return
}
}
r.log.Debug("process() loop end")
}
// Stop stops and shuts down the Runner.
func (r *Runner) Stop() error {
r.log.Info("Runner stopping")
r.checker.Stop()
for _, publisher := range r.publishers {
publisher.Stop()
}
return nil
}
// Run starts the runner and blocks until an interrupt signal is received.
func (r *Runner) Run() error {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
r.Start()
for sig := range c {
if sig == os.Interrupt {
r.log.Info("Received interrupt, shutting down")
r.Stop()
break
}
}
return nil
}