Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs introduced by hot reloading of targets #108

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 57 additions & 40 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,66 @@ import (
"github.com/czerwonk/ping_exporter/config"
)

var (
labelNames []string
type pingCollector struct {
monitor *mon.Monitor
enableDeprecatedMetrics bool
rttUnit rttUnit

cfg *config.Config

mutex sync.RWMutex

customLabels *customLabelSet
metrics map[string]*mon.Metrics

rttDesc scaledMetrics
bestDesc scaledMetrics
worstDesc scaledMetrics
meanDesc scaledMetrics
stddevDesc scaledMetrics
lossDesc *prometheus.Desc
progDesc *prometheus.Desc
mutex *sync.Mutex
)
}

type pingCollector struct {
cfg *config.Config
customLabels *customLabelSet
monitor *mon.Monitor
metrics map[string]*mon.Metrics
func NewPingCollector(enableDeprecatedMetrics bool, unit rttUnit, monitor *mon.Monitor, cfg *config.Config) *pingCollector {
ret := &pingCollector{
monitor: monitor,
enableDeprecatedMetrics: enableDeprecatedMetrics,
rttUnit: unit,
cfg: cfg,
}
ret.customLabels = newCustomLabelSet(cfg.Targets)
ret.createDesc()
return ret
}

func (p *pingCollector) Describe(ch chan<- *prometheus.Desc) {
p.createDesc()
func (p *pingCollector) UpdateConfig(cfg *config.Config) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.cfg.Targets = cfg.Targets
}

if enableDeprecatedMetrics {
rttDesc.Describe(ch)
func (p *pingCollector) Describe(ch chan<- *prometheus.Desc) {
if p.enableDeprecatedMetrics {
p.rttDesc.Describe(ch)
}
bestDesc.Describe(ch)
worstDesc.Describe(ch)
meanDesc.Describe(ch)
stddevDesc.Describe(ch)
ch <- lossDesc
ch <- progDesc
p.bestDesc.Describe(ch)
p.worstDesc.Describe(ch)
p.meanDesc.Describe(ch)
p.stddevDesc.Describe(ch)
ch <- p.lossDesc
ch <- p.progDesc
}

func (p *pingCollector) Collect(ch chan<- prometheus.Metric) {
mutex.Lock()
defer mutex.Unlock()
p.mutex.Lock()
defer p.mutex.Unlock()

if m := p.monitor.Export(); len(m) > 0 {
p.metrics = m
}

ch <- prometheus.MustNewConstMetric(progDesc, prometheus.GaugeValue, 1)
ch <- prometheus.MustNewConstMetric(p.progDesc, prometheus.GaugeValue, 1)

for target, metrics := range p.metrics {
l := strings.SplitN(target, " ", 3)
Expand All @@ -63,35 +81,34 @@ func (p *pingCollector) Collect(ch chan<- prometheus.Metric) {

if metrics.PacketsSent > metrics.PacketsLost {
if enableDeprecatedMetrics {
rttDesc.Collect(ch, metrics.Best, append(l, "best")...)
rttDesc.Collect(ch, metrics.Worst, append(l, "worst")...)
rttDesc.Collect(ch, metrics.Mean, append(l, "mean")...)
rttDesc.Collect(ch, metrics.StdDev, append(l, "std_dev")...)
p.rttDesc.Collect(ch, metrics.Best, append(l, "best")...)
p.rttDesc.Collect(ch, metrics.Worst, append(l, "worst")...)
p.rttDesc.Collect(ch, metrics.Mean, append(l, "mean")...)
p.rttDesc.Collect(ch, metrics.StdDev, append(l, "std_dev")...)
}

bestDesc.Collect(ch, metrics.Best, l...)
worstDesc.Collect(ch, metrics.Worst, l...)
meanDesc.Collect(ch, metrics.Mean, l...)
stddevDesc.Collect(ch, metrics.StdDev, l...)
p.bestDesc.Collect(ch, metrics.Best, l...)
p.worstDesc.Collect(ch, metrics.Worst, l...)
p.meanDesc.Collect(ch, metrics.Mean, l...)
p.stddevDesc.Collect(ch, metrics.StdDev, l...)
}

loss := float64(metrics.PacketsLost) / float64(metrics.PacketsSent)
ch <- prometheus.MustNewConstMetric(lossDesc, prometheus.GaugeValue, loss, l...)
ch <- prometheus.MustNewConstMetric(p.lossDesc, prometheus.GaugeValue, loss, l...)
}
}

func (p *pingCollector) createDesc() {
labelNames = []string{"target", "ip", "ip_version"}
labelNames := []string{"target", "ip", "ip_version"}
labelNames = append(labelNames, p.customLabels.labelNames()...)

rttDesc = newScaledDesc("rtt", "Round trip time", append(labelNames, "type"))
bestDesc = newScaledDesc("rtt_best", "Best round trip time", labelNames)
worstDesc = newScaledDesc("rtt_worst", "Worst round trip time", labelNames)
meanDesc = newScaledDesc("rtt_mean", "Mean round trip time", labelNames)
stddevDesc = newScaledDesc("rtt_std_deviation", "Standard deviation", labelNames)
lossDesc = newDesc("loss_ratio", "Packet loss from 0.0 to 1.0", labelNames, nil)
progDesc = newDesc("up", "ping_exporter version", nil, prometheus.Labels{"version": version})
mutex = &sync.Mutex{}
p.rttDesc = newScaledDesc("rtt", "Round trip time", p.rttUnit, append(labelNames, "type"))
p.bestDesc = newScaledDesc("rtt_best", "Best round trip time", p.rttUnit, labelNames)
p.worstDesc = newScaledDesc("rtt_worst", "Worst round trip time", p.rttUnit, labelNames)
p.meanDesc = newScaledDesc("rtt_mean", "Mean round trip time", p.rttUnit, labelNames)
p.stddevDesc = newScaledDesc("rtt_std_deviation", "Standard deviation", p.rttUnit, labelNames)
p.lossDesc = newDesc("loss_ratio", "Packet loss from 0.0 to 1.0", labelNames, nil)
p.progDesc = newDesc("up", "ping_exporter version", nil, prometheus.Labels{"version": version})
}

func newDesc(name, help string, variableLabels []string, constLabels prometheus.Labels) *prometheus.Desc {
Expand Down
80 changes: 51 additions & 29 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/digineo/go-ping"
Expand Down Expand Up @@ -116,13 +117,18 @@ func main() {
kingpin.FatalUsage("No targets specified")
}

m, err := startMonitor(cfg)
resolver := setupResolver(cfg)

m, err := startMonitor(cfg, resolver)
if err != nil {
log.Errorln(err)
os.Exit(2)
}

startServer(cfg, m)
collector := NewPingCollector(enableDeprecatedMetrics, rttMetricsScale, m, cfg)
go watchConfig(desiredTargets, resolver, m, collector)

startServer(cfg, collector)
}

func printVersion() {
Expand All @@ -132,8 +138,7 @@ func printVersion() {
fmt.Println("Metric exporter for go-icmp")
}

func startMonitor(cfg *config.Config) (*mon.Monitor, error) {
resolver := setupResolver(cfg)
func startMonitor(cfg *config.Config, resolver *net.Resolver) (*mon.Monitor, error) {
var bind4, bind6 string
if ln, err := net.Listen("tcp4", "127.0.0.1:0"); err == nil {
// ipv4 enabled
Expand Down Expand Up @@ -165,42 +170,50 @@ func startMonitor(cfg *config.Config) (*mon.Monitor, error) {
}

go startDNSAutoRefresh(cfg.DNS.Refresh.Duration(), desiredTargets, monitor, cfg)
go watchConfig(desiredTargets, resolver, monitor)
return monitor, nil
}

func upsertTargets(globalTargets *targets, resolver *net.Resolver, cfg *config.Config, monitor *mon.Monitor) error {
oldTargets := globalTargets.Targets()
newTargets := make([]*target, len(cfg.Targets))
var wg sync.WaitGroup
for i, t := range cfg.Targets {
t := &target{
host: t.Addr,
addresses: make([]net.IPAddr, 0),
delay: time.Duration(10*i) * time.Millisecond,
resolver: resolver,
}
newTargets[i] = t

err := t.addOrUpdateMonitor(monitor, targetOpts{
disableIPv4: cfg.Options.DisableIPv4,
disableIPv6: cfg.Options.DisableIPv6,
})
if err != nil {
return fmt.Errorf("failed to setup target: %w", err)
newTarget := globalTargets.Get(t.Addr)
if newTarget == nil {
newTarget = &target{
host: t.Addr,
addresses: make([]net.IPAddr, 0),
delay: time.Duration(10*i) * time.Millisecond,
resolver: resolver,
}
}
}

newTargets[i] = newTarget

wg.Add(1)
go func() {
err := newTarget.addOrUpdateMonitor(monitor, targetOpts{
disableIPv4: cfg.Options.DisableIPv4,
disableIPv6: cfg.Options.DisableIPv6,
})
if err != nil {
log.Errorf("failed to setup target: %v", err)
}
wg.Done()
}()
}
wg.Wait()
globalTargets.SetTargets(newTargets)

removed := removedTargets(oldTargets, globalTargets)
for _, removedTarget := range removed {
log.Infof("remove target: %s\n", removedTarget.host)
log.Infof("remove target: %s", removedTarget.host)
removedTarget.removeFromMonitor(monitor)
}
return nil
}

func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Monitor) {
func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Monitor, collector *pingCollector) {
watcher, err := inotify.NewWatcher()
if err != nil {
log.Fatalf("unable to create file watcher: %v", err)
Expand All @@ -212,17 +225,30 @@ func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Mo
}
for {
select {
case <-watcher.Events:
case event := <-watcher.Events:
log.Debugf("Got file inotify event: %s", event)
// If the file is removed, the inotify watcher will lose track of the file. Add it again.
if event.Op == inotify.Remove {
if err = watcher.Add(*configFile); err != nil {
log.Fatalf("failed to renew watch for file: %v", err)
}
}
cfg, err := loadConfig()
if err != nil {
log.Errorf("unable to load config: %v", err)
continue
}
// We get zero targets if the file was truncated. This happens if an automation tool rewrites
// the complete file, instead of alternating only parts of it.
if len(cfg.Targets) == 0 {
continue
}
log.Infof("reloading config file %s", *configFile)
if err := upsertTargets(globalTargets, resolver, cfg, monitor); err != nil {
log.Errorf("failed to reload config: %v", err)
continue
}
collector.UpdateConfig(cfg)
case err := <-watcher.Errors:
log.Errorf("watching file failed: %v", err)
}
Expand Down Expand Up @@ -264,19 +290,15 @@ func refreshDNS(tar *targets, monitor *mon.Monitor, cfg *config.Config) {
}
}

func startServer(cfg *config.Config, monitor *mon.Monitor) {
func startServer(cfg *config.Config, collector *pingCollector) {
var err error
log.Infof("Starting ping exporter (Version: %s)", version)
http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, indexHTML, *metricsPath)
})

reg := prometheus.NewRegistry()
reg.MustRegister(&pingCollector{
cfg: cfg,
monitor: monitor,
customLabels: newCustomLabelSet(cfg.Targets),
})
reg.MustRegister(collector)

l := log.New()
l.Level = log.ErrorLevel
Expand Down
12 changes: 7 additions & 5 deletions rttscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,30 @@ func rttUnitFromString(s string) rttUnit {
type scaledMetrics struct {
Millis *prometheus.Desc
Seconds *prometheus.Desc
scale rttUnit
}

func (s *scaledMetrics) Describe(ch chan<- *prometheus.Desc) {
if rttMetricsScale == rttInMills || rttMetricsScale == rttBoth {
if s.scale == rttInMills || s.scale == rttBoth {
ch <- s.Millis
}
if rttMetricsScale == rttInSeconds || rttMetricsScale == rttBoth {
if s.scale == rttInSeconds || s.scale == rttBoth {
ch <- s.Seconds
}
}

func (s *scaledMetrics) Collect(ch chan<- prometheus.Metric, value float32, labelValues ...string) {
if rttMetricsScale == rttInMills || rttMetricsScale == rttBoth {
if s.scale == rttInMills || s.scale == rttBoth {
ch <- prometheus.MustNewConstMetric(s.Millis, prometheus.GaugeValue, float64(value), labelValues...)
}
if rttMetricsScale == rttInSeconds || rttMetricsScale == rttBoth {
if s.scale == rttInSeconds || s.scale == rttBoth {
ch <- prometheus.MustNewConstMetric(s.Seconds, prometheus.GaugeValue, float64(value)/1000, labelValues...)
}
}

func newScaledDesc(name, help string, variableLabels []string) scaledMetrics {
func newScaledDesc(name, help string, scale rttUnit, variableLabels []string) scaledMetrics {
return scaledMetrics{
scale: scale,
Millis: newDesc(name+"_ms", help+" in millis (deprecated)", variableLabels, nil),
Seconds: newDesc(name+"_seconds", help+" in seconds", variableLabels, nil),
}
Expand Down
13 changes: 13 additions & 0 deletions target.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (t *targets) SetTargets(tar []*target) {
}

func (t *targets) Contains(tar *target) bool {
t.mutex.RLock()
defer t.mutex.RUnlock()
for _, ta := range t.t {
if ta.host == tar.host {
return true
Expand All @@ -45,6 +47,17 @@ func (t *targets) Contains(tar *target) bool {
return false
}

func (t *targets) Get(host string) *target {
t.mutex.RLock()
defer t.mutex.RUnlock()
for _, ta := range t.t {
if ta.host == host {
return ta
}
}
return nil
}

func (t *targets) Targets() []*target {
t.mutex.RLock()
defer t.mutex.RUnlock()
Expand Down
Loading