Skip to content

Commit

Permalink
Merge pull request #12 from AH-dark/feat/options
Browse files Browse the repository at this point in the history
Tracer configuration using options
  • Loading branch information
GuangmingLuo authored Oct 7, 2023
2 parents 7644d6d + c6f3c75 commit 907fed1
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 22 deletions.
105 changes: 105 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package prometheus

import (
"net/http"

prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)

var defaultBuckets = []float64{5000, 10000, 25000, 50000, 100000, 250000, 500000, 1000000}

// Option opts for monitor prometheus
type Option interface {
apply(cfg *config)
}

type option func(cfg *config)

func (fn option) apply(cfg *config) {
fn(cfg)
}

type config struct {
buckets []float64
enableGoCollector bool
runtimeMetricRules []collectors.GoRuntimeMetricsRule
registry *prom.Registry
serveMux *http.ServeMux
disableServer bool
}

func defaultConfig() *config {
return &config{
buckets: defaultBuckets,
enableGoCollector: false,
runtimeMetricRules: []collectors.GoRuntimeMetricsRule{},
registry: prom.NewRegistry(),
serveMux: http.DefaultServeMux,
disableServer: false,
}
}

// WithEnableGoCollector enable go collector
func WithEnableGoCollector(enable bool) Option {
return option(func(cfg *config) {
cfg.enableGoCollector = enable
})
}

// WithGoCollectorRule define your custom go collector rule
func WithGoCollectorRule(rules ...collectors.GoRuntimeMetricsRule) Option {
return option(func(cfg *config) {
cfg.runtimeMetricRules = rules
})
}

// WithHistogramBuckets define your custom histogram buckets base on your biz
func WithHistogramBuckets(buckets []float64) Option {
return option(func(cfg *config) {
if len(buckets) > 0 {
cfg.buckets = buckets
}
})
}

// WithRegistry define your custom registry
func WithRegistry(registry *prom.Registry) Option {
return option(func(cfg *config) {
if registry != nil {
cfg.registry = registry
}
})
}

// WithServeMux define your custom serve mux
func WithServeMux(serveMux *http.ServeMux) Option {
return option(func(cfg *config) {
if serveMux != nil {
cfg.serveMux = serveMux
}
})
}

// WithDisableServer disable prometheus server
func WithDisableServer(disable bool) Option {
return option(func(cfg *config) {
cfg.disableServer = disable
})
}
71 changes: 49 additions & 22 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"

prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand Down Expand Up @@ -100,14 +101,23 @@ func (c *clientTracer) Finish(ctx context.Context) {
}

// NewClientTracer provide tracer for client call, addr and path is the scrape_configs for prometheus server.
func NewClientTracer(addr, path string) stats.Tracer {
registry := prom.NewRegistry()
http.Handle(path, promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}))
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatal("Unable to start a promhttp server, err: " + err.Error())
}
}()
func NewClientTracer(addr, path string, options ...Option) stats.Tracer {
cfg := defaultConfig()
for _, opt := range options {
opt.apply(cfg)
}

if !cfg.disableServer {
cfg.serveMux.Handle(path, promhttp.HandlerFor(cfg.registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.ContinueOnError,
Registry: cfg.registry,
}))
go func() {
if err := http.ListenAndServe(addr, cfg.serveMux); err != nil {
log.Fatal("Unable to start a promhttp server, err: " + err.Error())
}
}()
}

clientHandledCounter := prom.NewCounterVec(
prom.CounterOpts{
Expand All @@ -116,17 +126,21 @@ func NewClientTracer(addr, path string) stats.Tracer {
},
[]string{labelKeyCaller, labelKeyCallee, labelKeyMethod, labelKeyStatus, labelKeyRetry},
)
registry.MustRegister(clientHandledCounter)
cfg.registry.MustRegister(clientHandledCounter)

clientHandledHistogram := prom.NewHistogramVec(
prom.HistogramOpts{
Name: "kitex_client_latency_us",
Help: "Latency (microseconds) of the RPC until it is finished.",
Buckets: []float64{5000, 10000, 25000, 50000, 100000, 250000, 500000, 1000000},
Buckets: cfg.buckets,
},
[]string{labelKeyCaller, labelKeyCallee, labelKeyMethod, labelKeyStatus, labelKeyRetry},
)
registry.MustRegister(clientHandledHistogram)
cfg.registry.MustRegister(clientHandledHistogram)

if cfg.enableGoCollector {
cfg.registry.MustRegister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(cfg.runtimeMetricRules...)))
}

return &clientTracer{
clientHandledCounter: clientHandledCounter,
Expand Down Expand Up @@ -166,14 +180,23 @@ func (c *serverTracer) Finish(ctx context.Context) {
}

// NewServerTracer provides tracer for server access, addr and path is the scrape_configs for prometheus server.
func NewServerTracer(addr, path string) stats.Tracer {
registry := prom.NewRegistry()
http.Handle(path, promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}))
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatal("Unable to start a promhttp server, err: " + err.Error())
}
}()
func NewServerTracer(addr, path string, options ...Option) stats.Tracer {
cfg := defaultConfig()
for _, opt := range options {
opt.apply(cfg)
}

if !cfg.disableServer {
cfg.serveMux.Handle(path, promhttp.HandlerFor(cfg.registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.ContinueOnError,
Registry: cfg.registry,
}))
go func() {
if err := http.ListenAndServe(addr, cfg.serveMux); err != nil {
log.Fatal("Unable to start a promhttp server, err: " + err.Error())
}
}()
}

serverHandledCounter := prom.NewCounterVec(
prom.CounterOpts{
Expand All @@ -182,17 +205,21 @@ func NewServerTracer(addr, path string) stats.Tracer {
},
[]string{labelKeyCaller, labelKeyCallee, labelKeyMethod, labelKeyStatus, labelKeyRetry},
)
registry.MustRegister(serverHandledCounter)
cfg.registry.MustRegister(serverHandledCounter)

serverHandledHistogram := prom.NewHistogramVec(
prom.HistogramOpts{
Name: "kitex_server_latency_us",
Help: "Latency (microseconds) of RPC that had been application-level handled by the server.",
Buckets: []float64{5000, 10000, 25000, 50000, 100000, 250000, 500000, 1000000},
Buckets: cfg.buckets,
},
[]string{labelKeyCaller, labelKeyCallee, labelKeyMethod, labelKeyStatus, labelKeyRetry},
)
registry.MustRegister(serverHandledHistogram)
cfg.registry.MustRegister(serverHandledHistogram)

if cfg.enableGoCollector {
cfg.registry.MustRegister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(cfg.runtimeMetricRules...)))
}

return &serverTracer{
serverHandledCounter: serverHandledCounter,
Expand Down

0 comments on commit 907fed1

Please sign in to comment.