Skip to content

Commit

Permalink
Rewire metrics. (#78)
Browse files Browse the repository at this point in the history
* Rewire metrics.

Prior to this metrics were not being reported correctly.
  • Loading branch information
robholland authored Feb 22, 2024
1 parent 98efd24 commit 5f4065a
Showing 1 changed file with 80 additions and 28 deletions.
108 changes: 80 additions & 28 deletions cmd/cmdoptions/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package cmdoptions

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -16,49 +17,98 @@ import (
)

type metricsHandler struct {
registry *prometheus.Registry
tags map[string]string
metrics *Metrics
labels []string
values []string
}

var _ client.MetricsHandler = (*metricsHandler)(nil)

func (h *metricsHandler) WithTags(tags map[string]string) client.MetricsHandler {
// Make enough space for the handlers tags which are populated first
mergedTags := make(map[string]string, len(h.tags))
for t, v := range h.tags {
mergedTags[t] = v
mergedTags := make(map[string]string, len(h.labels))
for i, l := range h.labels {
mergedTags[l] = h.values[i]
}
for t, v := range tags {
mergedTags[t] = v
for l, v := range tags {
mergedTags[l] = v
}
return &metricsHandler{registry: h.registry, tags: mergedTags}
}

func (h *metricsHandler) mustRegisterIgnoreDuplicate(c prometheus.Collector) {
err := h.registry.Register(c)
var alreadyRegisteredError prometheus.AlreadyRegisteredError
if err != nil && !errors.As(err, &alreadyRegisteredError) {
panic(err)
var labels, values []string
for l, v := range mergedTags {
labels = append(labels, l)
values = append(values, v)
}

return &metricsHandler{
metrics: h.metrics,
labels: labels,
values: values,
}
}

func (h *metricsHandler) Counter(name string) client.MetricsCounter {
ctr := prometheus.NewCounter(prometheus.CounterOpts{Name: name, ConstLabels: prometheus.Labels(h.tags)})
h.mustRegisterIgnoreDuplicate(ctr)
return metricsCounter{ctr}
h.metrics.mutex.Lock()
defer h.metrics.mutex.Unlock()

var ctr *prometheus.CounterVec
if c, ok := h.metrics.cache[name]; ok {
ctr, ok = c.(*prometheus.CounterVec)
if !ok {
panic(fmt.Errorf("duplicate metric with different type: %s", name))
}
} else {
ctr = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: name},
h.labels,
)
h.metrics.registry.MustRegister(ctr)
h.metrics.cache[name] = ctr
}

return metricsCounter{ctr.WithLabelValues(h.values...)}
}

func (h *metricsHandler) Gauge(name string) client.MetricsGauge {
gauge := prometheus.NewGauge(prometheus.GaugeOpts{Name: name, ConstLabels: prometheus.Labels(h.tags)})
h.mustRegisterIgnoreDuplicate(gauge)
return metricsGauge{gauge}
h.metrics.mutex.Lock()
defer h.metrics.mutex.Unlock()

var gauge *prometheus.GaugeVec
if c, ok := h.metrics.cache[name]; ok {
gauge, ok = c.(*prometheus.GaugeVec)
if !ok {
panic(fmt.Errorf("duplicate metric with different type: %s", name))
}
} else {
gauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: name},
h.labels,
)
h.metrics.registry.MustRegister(gauge)
h.metrics.cache[name] = gauge
}

return metricsGauge{gauge.WithLabelValues(h.values...)}
}

func (h *metricsHandler) Timer(name string) client.MetricsTimer {
// TODO: buckets
timer := prometheus.NewHistogram(prometheus.HistogramOpts{Name: name, ConstLabels: prometheus.Labels(h.tags)})
h.mustRegisterIgnoreDuplicate(timer)
return metricsTimer{timer}
h.metrics.mutex.Lock()
defer h.metrics.mutex.Unlock()

var timer *prometheus.HistogramVec
if c, ok := h.metrics.cache[name]; ok {
timer, ok = c.(*prometheus.HistogramVec)
if !ok {
panic(fmt.Errorf("duplicate metric with different type: %s", name))
}
} else {
// TODO: buckets
timer = prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: name}, h.labels)
h.metrics.registry.MustRegister(timer)
h.metrics.cache[name] = timer
}

return metricsTimer{timer.WithLabelValues(h.values...)}
}

type metricsCounter struct {
Expand All @@ -80,7 +130,7 @@ func (m metricsGauge) Update(x float64) {
}

type metricsTimer struct {
prom prometheus.Histogram
prom prometheus.Observer
}

// Record records a duration.
Expand All @@ -102,6 +152,8 @@ type MetricsOptions struct {
type Metrics struct {
server *http.Server
registry *prometheus.Registry
cache map[string]interface{}
mutex sync.Mutex
}

// MustCreateMetrics sets up Prometheus based metrics and starts an HTTP server
Expand All @@ -116,14 +168,14 @@ func (m *MetricsOptions) MustCreateMetrics(logger *zap.SugaredLogger) *Metrics {
return &Metrics{
server: server,
registry: registry,
cache: make(map[string]interface{}),
}
}

// Handler returns a new Temporal-client-compatible metrics handler.
func (m *Metrics) NewHandler() client.MetricsHandler {
return &metricsHandler{
registry: m.registry,
tags: make(map[string]string),
metrics: m,
}
}

Expand Down

0 comments on commit 5f4065a

Please sign in to comment.