Skip to content

Commit

Permalink
Integrate waku and api prometheus metrics correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
snormore committed Sep 27, 2023
1 parent 69e9675 commit 5996678
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 315 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ require (
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3
github.com/xmtp/proto/v3 v3.27.0
github.com/yoheimuta/protolint v0.39.0
go.opencensus.io v0.24.0
go.uber.org/zap v1.24.0
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.53.0
Expand Down Expand Up @@ -165,6 +164,7 @@ require (
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230914230901-6057b9728a32 // indirect
github.com/wk8/go-ordered-map v1.0.0 // indirect
github.com/yoheimuta/go-protoparser/v4 v4.6.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.20.0 // indirect
Expand Down
92 changes: 32 additions & 60 deletions pkg/e2e/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,41 @@ import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/xmtp/xmtp-node-go/pkg/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
metricstag "go.opencensus.io/tag"
"go.uber.org/zap"
)

var (
successfulRuns = stats.Int64("successful_runs", "Number of successful runs", stats.UnitDimensionless)
failedRuns = stats.Int64("failed_runs", "Number of failed runs", stats.UnitDimensionless)
runDurationSeconds = stats.Float64("run_duration_seconds", "Duration of the run in seconds", stats.UnitSeconds)
testNameTagKey = "test"
testStatusTagKey = "status"

testNameTagKey = metricstag.MustNewKey("test")
testStatusTagKey = metricstag.MustNewKey("status")

views = []*view.View{
{
Name: "xmtpd_e2e_successful_runs",
Measure: successfulRuns,
Description: "Number of successful runs",
Aggregation: view.Count(),
TagKeys: []metricstag.Key{testNameTagKey},
successfulRuns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtpd_e2e_successful_runs",
Help: "Number of successful runs",
},
{
Name: "xmtpd_e2e_failed_runs",
Measure: failedRuns,
Description: "Number of failed runs",
Aggregation: view.Count(),
TagKeys: []metricstag.Key{testNameTagKey},
[]string{testNameTagKey},
)
failedRuns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtpd_e2e_failed_runs",
Help: "Number of failed runs",
},
{
Name: "xmtpd_e2e_run_duration_seconds",
Measure: runDurationSeconds,
Description: "Duration of the run in seconds",
Aggregation: view.Distribution(append(floatRange(30), 40, 50, 60, 90, 120, 300)...),
TagKeys: []metricstag.Key{testNameTagKey, testStatusTagKey},
[]string{testNameTagKey},
)
runDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmtpd_e2e_run_duration_seconds",
Help: "Duration of the run in seconds",
Buckets: append(floatRange(30), 40, 50, 60, 90, 120, 300),
},
}
[]string{testNameTagKey, testStatusTagKey},
)
)

func (r *Runner) withMetricsServer(fn func() error) error {
metrics := metrics.NewMetricsServer("0.0.0.0", 8008, r.log)
metrics := metrics.NewMetricsServer("0.0.0.0", 8008, r.log, r.prom)
metrics.Start(context.Background())
defer func() {
err := metrics.Stop(r.ctx)
Expand All @@ -54,44 +47,23 @@ func (r *Runner) withMetricsServer(fn func() error) error {
}
}()

err := view.Register(views...)
if err != nil {
return err
}
r.prom.MustRegister(successfulRuns)
r.prom.MustRegister(failedRuns)
r.prom.MustRegister(runDuration)

return fn()
}

func recordSuccessfulRun(ctx context.Context, tags ...tag) error {
return recordWithTags(ctx, tags, successfulRuns.M(1))
func recordSuccessfulRun(ctx context.Context, testName string) {
successfulRuns.WithLabelValues(testName).Inc()
}

func recordFailedRun(ctx context.Context, tags ...tag) error {
return recordWithTags(ctx, tags, failedRuns.M(1))
func recordFailedRun(ctx context.Context, testName string) {
failedRuns.WithLabelValues(testName).Inc()
}

func recordRunDuration(ctx context.Context, duration time.Duration, tags ...tag) error {
return recordWithTags(ctx, tags, runDurationSeconds.M(duration.Seconds()))
}

type tag struct {
key metricstag.Key
value string
}

func newTag(key metricstag.Key, value string) tag {
return tag{
key: key,
value: value,
}
}

func recordWithTags(ctx context.Context, tags []tag, ms ...stats.Measurement) error {
mutators := make([]metricstag.Mutator, len(tags))
for i, tag := range tags {
mutators[i] = metricstag.Upsert(tag.key, tag.value)
}
return stats.RecordWithTags(ctx, mutators, ms...)
func recordRunDuration(ctx context.Context, duration time.Duration, testName, testStatus string) {
runDuration.WithLabelValues(testName, testStatus).Observe(duration.Seconds())
}

func floatRange(n int) []float64 {
Expand Down
32 changes: 7 additions & 25 deletions pkg/e2e/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

Expand All @@ -15,6 +16,7 @@ type Runner struct {
ctx context.Context
log *zap.Logger
config *Config
prom *prometheus.Registry
suite *Suite
}

Expand All @@ -23,6 +25,7 @@ func NewRunner(ctx context.Context, log *zap.Logger, config *Config) *Runner {
ctx: ctx,
log: log,
config: config,
prom: prometheus.NewRegistry(),
suite: NewSuite(ctx, log, config),
}
}
Expand Down Expand Up @@ -61,7 +64,6 @@ func (r *Runner) Start() error {
}

func (r *Runner) runTest(test *Test) error {
nameTag := newTag(testNameTagKey, test.Name)
started := time.Now().UTC()
log := r.log.With(zap.String("test", test.Name))

Expand All @@ -70,35 +72,15 @@ func (r *Runner) runTest(test *Test) error {
duration := ended.Sub(started)
log = log.With(zap.Duration("duration", duration))
if err != nil {
recordErr := recordFailedRun(r.ctx, nameTag)
if recordErr != nil {
log.Error("recording failed run metric", zap.Error(recordErr))
}
recordFailedRun(r.ctx, test.Name)
log.Error("test failed", zap.Error(err))

statusTag := newTag(testStatusTagKey, "failed")
err = recordRunDuration(r.ctx, duration, nameTag, statusTag)
if err != nil {
log.Error("recording run duration", zap.Error(err))
return err
}

recordRunDuration(r.ctx, duration, test.Name, "failed")
return err
}
log.Info("test passed")

err = recordSuccessfulRun(r.ctx, nameTag)
if err != nil {
log.Error("recording successful run metric", zap.Error(err))
return err
}

statusTag := newTag(testStatusTagKey, "success")
err = recordRunDuration(r.ctx, duration, nameTag, statusTag)
if err != nil {
log.Error("recording run duration", zap.Error(err))
return err
}
recordSuccessfulRun(r.ctx, test.Name)
recordRunDuration(r.ctx, duration, test.Name, "success")

return nil
}
55 changes: 20 additions & 35 deletions pkg/metrics/api-limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,31 @@ package metrics
import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"github.com/prometheus/client_golang/prometheus"
)

var bucketsNameKey = newTagKey("name")
var bucketsNameKey = "name"

var ratelimiterBucketsGaugeMeasure = stats.Int64("ratelimiter_buckets", "size of ratelimiter buckets map", stats.UnitDimensionless)
var ratelimiterBucketsGaugeView = &view.View{
Name: "xmtp_ratelimiter_buckets",
Measure: ratelimiterBucketsGaugeMeasure,
Description: "Size of rate-limiter buckets maps",
Aggregation: view.LastValue(),
TagKeys: []tag.Key{bucketsNameKey},
}
var ratelimiterBuckets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtp_ratelimiter_buckets",
Help: "Size of rate-limiter buckets maps",
},
[]string{bucketsNameKey},
)

func EmitRatelimiterBucketsSize(ctx context.Context, log *zap.Logger, name string, size int) {
err := recordWithTags(ctx, []tag.Mutator{tag.Insert(bucketsNameKey, name)}, ratelimiterBucketsGaugeMeasure.M(int64(size)))
if err != nil {
log.Warn("recording metric",
zap.String("metric", ratelimiterBucketsGaugeMeasure.Name()),
zap.Error(err))
}
func EmitRatelimiterBucketsSize(ctx context.Context, name string, size int) {
ratelimiterBuckets.WithLabelValues(name).Add(float64(size))
}

var ratelimiterBucketsDeletedCounterMeasure = stats.Int64("xmtp_ratelimiter_entries_deleted", "Count of deleted entries from ratelimiter buckets map", stats.UnitDimensionless)
var ratelimiterBucketsDeletedCounterView = &view.View{
Name: "xmtp_ratelimiter_entries_deleted",
Measure: ratelimiterBucketsDeletedCounterMeasure,
Description: "Count of deleted entries from rate-limiter buckets maps",
Aggregation: view.Count(),
TagKeys: []tag.Key{bucketsNameKey},
}
var ratelimiterBucketsDeleted = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtp_ratelimiter_entries_deleted",
Help: "Count of deleted entries from rate-limiter buckets maps",
},
[]string{bucketsNameKey},
)

func EmitRatelimiterDeletedEntries(ctx context.Context, log *zap.Logger, name string, count int) {
err := recordWithTags(ctx, []tag.Mutator{tag.Insert(bucketsNameKey, name)}, ratelimiterBucketsDeletedCounterMeasure.M(int64(count)))
if err != nil {
log.Warn("recording metric",
zap.String("metric", ratelimiterBucketsDeletedCounterMeasure.Name()),
zap.Error(err))
}
func EmitRatelimiterDeletedEntries(ctx context.Context, name string, count int) {
ratelimiterBucketsDeleted.WithLabelValues(name).Add(float64(count))
}
Loading

0 comments on commit 5996678

Please sign in to comment.