Skip to content

Commit

Permalink
Add TLS support for nsqd stats connections.
Browse files Browse the repository at this point in the history
  • Loading branch information
ickymettle committed Nov 29, 2017
1 parent 34a11d6 commit db85b3f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
34 changes: 31 additions & 3 deletions collector/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package collector

import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/http"
"sync"
"time"

Expand All @@ -19,22 +23,46 @@ type NsqExecutor struct {

collectors []StatsCollector
summary *prometheus.SummaryVec
client *http.Client
mutex sync.RWMutex
}

// NewNsqExecutor creates a new executor for collecting NSQ metrics.
func NewNsqExecutor(namespace, nsqdURL string) *NsqExecutor {
func NewNsqExecutor(namespace, nsqdURL, tlsCACert, tlsCert, tlsKey string) (*NsqExecutor, error) {
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: "exporter",
Name: "scrape_duration_seconds",
Help: "Duration of a scrape job of the NSQ exporter",
}, []string{"result"})
prometheus.MustRegister(sum)

transport := &http.Transport{}
if tlsCert != "" && tlsKey != "" {
cert, err := tls.LoadX509KeyPair(tlsCert, tlsKey)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
if tlsCACert != "" {
caCert, err := ioutil.ReadFile(tlsCACert)
if err != nil {
return nil, err
}
caCertPool.AppendCertsFromPEM(caCert)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
tlsConfig.BuildNameToCertificate()
transport.TLSClientConfig = tlsConfig
}
return &NsqExecutor{
nsqdURL: nsqdURL,
summary: sum,
}
client: &http.Client{Transport: transport},
}, nil
}

// Use configures a specific stats collector, so the stats could be
Expand Down Expand Up @@ -63,7 +91,7 @@ func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
c.reset()
}

stats, err := getNsqdStats(e.nsqdURL)
stats, err := getNsqdStats(e.client, e.nsqdURL)
tScrape := time.Since(start).Seconds()

result := "success"
Expand Down
5 changes: 2 additions & 3 deletions collector/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ func getPercentile(t *topic, percentile int) float64 {
return 0
}

func getNsqdStats(nsqdURL string) (*stats, error) {

resp, err := http.Get(nsqdURL)
func getNsqdStats(client *http.Client, nsqdURL string) (*stats, error) {
resp, err := client.Get(nsqdURL)
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var (
nsqdURL = flag.String("nsqd.addr", "http://localhost:4151/stats", "Address of the nsqd node.")
enabledCollectors = flag.String("collect", "stats.topics,stats.channels", "Comma-separated list of collectors to use.")
namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.")
tlsCACert = flag.String("tls.ca_cert", "", "CA certificate file to be used for nsqd connections.")
tlsCert = flag.String("tls.cert", "", "TLS certificate file to be used for client connections to nsqd.")
tlsKey = flag.String("tls.key", "", "TLS key file to be used for TLS client connections to nsqd.")

statsRegistry = map[string]func(namespace string) collector.StatsCollector{
"topics": collector.TopicStats,
Expand Down Expand Up @@ -60,13 +63,15 @@ func main() {
}

func createNsqExecutor() (*collector.NsqExecutor, error) {

nsqdURL, err := normalizeURL(*nsqdURL)
if err != nil {
return nil, err
}

ex := collector.NewNsqExecutor(*namespace, nsqdURL)
ex, err := collector.NewNsqExecutor(*namespace, nsqdURL, *tlsCACert, *tlsCert, *tlsKey)
if err != nil {
log.Fatal(err)
}
for _, param := range strings.Split(*enabledCollectors, ",") {
param = strings.TrimSpace(param)
parts := strings.SplitN(param, ".", 2)
Expand Down

0 comments on commit db85b3f

Please sign in to comment.