Skip to content

Commit

Permalink
Merge pull request #13 from nordicdyno/master
Browse files Browse the repository at this point in the history
Fixes for better Prometheus support
  • Loading branch information
Ed Ganiukov authored Feb 15, 2017
2 parents e250db4 + ab866b7 commit 3354bbb
Show file tree
Hide file tree
Showing 58 changed files with 2,475 additions and 3,545 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ GOX_ARGS = -output="$(BUILD_DIR)/{{.Dir}}_{{.OS}}_{{.Arch}}" -osarch="linux/amd6
build:
$(GO) build -o $(BUILD_DIR)/nsq_exporter .

.PHONY: deps-init deps-get
deps-init:
@go get -u github.com/kardianos/govendor
$(GOPATH)/bin/govendor init

deps-get: deps-init
@$(GOPATH)/bin/govendor get github.com/lovoo/nsq_exporter

.PHONY: clean
clean:
rm -R $(BUILD_DIR)/* || true
Expand Down
10 changes: 7 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package collector

import "github.com/prometheus/client_golang/prometheus"

// Collector defines the interface for collecting all metrics for Prometheus.
type Collector interface {
Collect(out chan<- prometheus.Metric) error
// StatsCollector defines an interface for collecting specific stats
// from a nsqd exported stats data.
type StatsCollector interface {
set(s *stats)
collect(out chan<- prometheus.Metric)
describe(ch chan<- *prometheus.Desc)
reset()
}
85 changes: 48 additions & 37 deletions collector/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,63 +11,74 @@ import (
// This type implements the prometheus.Collector interface and can be
// registered in the metrics collection.
//
// The executor takes the time needed by each registered collector and
// The executor takes the time needed for scraping nsqd stat endpoint and
// provides an extra metric for this. This metric is labeled with the
// result ("success" or "error") and the collector.
// scrape result ("success" or "error").
type NsqExecutor struct {
collectors map[string]Collector
nsqdURL string

collectors []StatsCollector
summary *prometheus.SummaryVec
mutex sync.RWMutex
}

// NewNsqExecutor creates a new executor for the NSQ metrics.
func NewNsqExecutor(namespace string) *NsqExecutor {
// NewNsqExecutor creates a new executor for collecting NSQ metrics.
func NewNsqExecutor(namespace, nsqdURL string) *NsqExecutor {
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: "exporter",
Name: "scrape_duration_seconds",
Help: "Duration of a scrape job of the NSQ exporter",
}, []string{"result"})
prometheus.MustRegister(sum)
return &NsqExecutor{
collectors: make(map[string]Collector),
summary: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: "exporter",
Name: "scape_duration_seconds",
Help: "Duration of a scrape job of the NSQ exporter",
}, []string{"collector", "result"}),
nsqdURL: nsqdURL,
summary: sum,
}
}

// AddCollector adds a new collector for the metrics collection.
// Each collector needs a unique name which is used as a label
// for the executor metric.
func (e *NsqExecutor) AddCollector(name string, c Collector) {
e.collectors[name] = c
// Use configures a specific stats collector, so the stats could be
// exposed to the Prometheus system.
func (e *NsqExecutor) Use(c StatsCollector) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.collectors = append(e.collectors, c)
}

// Describe implements the prometheus.Collector interface.
func (e *NsqExecutor) Describe(out chan<- *prometheus.Desc) {
e.summary.Describe(out)
func (e *NsqExecutor) Describe(ch chan<- *prometheus.Desc) {
for _, c := range e.collectors {
c.describe(ch)
}
}

// Collect implements the prometheus.Collector interface.
func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
var wg sync.WaitGroup
wg.Add(len(e.collectors))
for name, coll := range e.collectors {
go func(name string, coll Collector) {
e.exec(name, coll, out)
wg.Done()
}(name, coll)
start := time.Now()
e.mutex.Lock()
defer e.mutex.Unlock()

// reset state, because metrics can gone
for _, c := range e.collectors {
c.reset()
}
wg.Wait()
}

func (e *NsqExecutor) exec(name string, coll Collector, out chan<- prometheus.Metric) {
start := time.Now()
err := coll.Collect(out)
dur := time.Since(start)
stats, err := getNsqdStats(e.nsqdURL)
tScrape := time.Since(start).Seconds()

labels := prometheus.Labels{"collector": name}
result := "success"
if err != nil {
labels["result"] = "error"
} else {
labels["result"] = "success"
result = "error"
}

e.summary.With(labels).Observe(dur.Seconds())
e.summary.WithLabelValues(result).Observe(tScrape)

if err == nil {
for _, c := range e.collectors {
c.set(stats)
}
for _, c := range e.collectors {
c.collect(out)
}
}
}
62 changes: 0 additions & 62 deletions collector/nsqd.go

This file was deleted.

25 changes: 21 additions & 4 deletions collector/stats_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ type channelStats []struct {
// expose the channel metrics of a nsqd node to Prometheus. The
// channel metrics are reported per topic.
func ChannelStats(namespace string) StatsCollector {
labels := []string{"type", "topic", "channel", "paused"}
labels := []string{"topic", "channel", "paused"}
namespace += "_channel"

return channelStats{
{
Expand Down Expand Up @@ -101,20 +102,36 @@ func ChannelStats(namespace string) StatsCollector {
}
}

func (cs channelStats) collect(s *stats, out chan<- prometheus.Metric) {
func (cs channelStats) set(s *stats) {
for _, topic := range s.Topics {
for _, channel := range topic.Channels {
labels := prometheus.Labels{
"type": "channel",
"topic": topic.Name,
"channel": channel.Name,
"paused": strconv.FormatBool(channel.Paused),
}

for _, c := range cs {
c.vec.With(labels).Set(c.val(channel))
c.vec.Collect(out)
}
}
}
}

func (cs channelStats) collect(out chan<- prometheus.Metric) {
for _, c := range cs {
c.vec.Collect(out)
}
}

func (cs channelStats) describe(ch chan<- *prometheus.Desc) {
for _, c := range cs {
c.vec.Describe(ch)
}
}

func (cs channelStats) reset() {
for _, c := range cs {
c.vec.Reset()
}
}
25 changes: 21 additions & 4 deletions collector/stats_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type clientStats []struct {
// Prometheus collection process. So be sure the number of clients
// is small enough when using this collector.
func ClientStats(namespace string) StatsCollector {
labels := []string{"type", "topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"}
labels := []string{"topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"}
namespace += "_client"

return clientStats{
{
Expand Down Expand Up @@ -90,12 +91,11 @@ func ClientStats(namespace string) StatsCollector {
}
}

func (cs clientStats) collect(s *stats, out chan<- prometheus.Metric) {
func (cs clientStats) set(s *stats) {
for _, topic := range s.Topics {
for _, channel := range topic.Channels {
for _, client := range channel.Clients {
labels := prometheus.Labels{
"type": "client",
"topic": topic.Name,
"channel": channel.Name,
"deflate": strconv.FormatBool(client.Deflate),
Expand All @@ -109,9 +109,26 @@ func (cs clientStats) collect(s *stats, out chan<- prometheus.Metric) {

for _, c := range cs {
c.vec.With(labels).Set(c.val(client))
c.vec.Collect(out)
}
}
}
}
}

func (cs clientStats) collect(out chan<- prometheus.Metric) {
for _, c := range cs {
c.vec.Collect(out)
}
}

func (cs clientStats) describe(ch chan<- *prometheus.Desc) {
for _, c := range cs {
c.vec.Describe(ch)
}
}

func (cs clientStats) reset() {
for _, c := range cs {
c.vec.Reset()
}
}
24 changes: 20 additions & 4 deletions collector/stats_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ type topicStats []struct {
// TopicStats creates a new stats collector which is able to
// expose the topic metrics of a nsqd node to Prometheus.
func TopicStats(namespace string) StatsCollector {
labels := []string{"type", "topic", "paused"}
labels := []string{"topic", "paused"}
namespace += "_topic"

return topicStats{
{
Expand Down Expand Up @@ -68,17 +69,32 @@ func TopicStats(namespace string) StatsCollector {
}
}

func (ts topicStats) collect(s *stats, out chan<- prometheus.Metric) {
func (ts topicStats) set(s *stats) {
for _, topic := range s.Topics {
labels := prometheus.Labels{
"type": "topic",
"topic": topic.Name,
"paused": strconv.FormatBool(topic.Paused),
}

for _, c := range ts {
c.vec.With(labels).Set(c.val(topic))
c.vec.Collect(out)
}
}
}
func (ts topicStats) collect(out chan<- prometheus.Metric) {
for _, c := range ts {
c.vec.Collect(out)
}
}

func (ts topicStats) describe(ch chan<- *prometheus.Desc) {
for _, c := range ts {
c.vec.Describe(ch)
}
}

func (ts topicStats) reset() {
for _, c := range ts {
c.vec.Reset()
}
}
Loading

0 comments on commit 3354bbb

Please sign in to comment.