Skip to content

Commit

Permalink
instrument with otel metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
cedi committed Mar 10, 2023
1 parent 11859ff commit cb01d19
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 16 deletions.
47 changes: 43 additions & 4 deletions controllers/redirect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package controllers

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/trace"
networkingv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -57,16 +60,36 @@ type RedirectReconciler struct {
scheme *runtime.Scheme
log *logr.Logger
tracer trace.Tracer

redirectCount int
redirects instrument.Int64UpDownCounter
latency instrument.Int64Histogram
}

// NewRedirectReconciler returns a new RedirectReconciler
func NewRedirectReconciler(client client.Client, rClient *redirectclient.RedirectClient, scheme *runtime.Scheme, log *logr.Logger, tracer trace.Tracer) *RedirectReconciler {
func NewRedirectReconciler(client client.Client, rClient *redirectclient.RedirectClient, scheme *runtime.Scheme, log *logr.Logger, tracer trace.Tracer, meter metric.Meter) *RedirectReconciler {
var redirects, _ = meter.Int64UpDownCounter(
"urlshortener.active_redirects",
instrument.WithUnit("count"),
instrument.WithDescription("Amount of redirects (redirect one URL to another)"),
)

var redirectReconcileLatency, _ = meter.Int64Histogram(
"urlshortener.redirect_controller.reconcile_latency",
instrument.WithUnit("microseconds"),
instrument.WithDescription("How long does the reconcile function run for"),
)

return &RedirectReconciler{
client: client,
rClient: rClient,
scheme: scheme,
log: log,
tracer: tracer,

redirectCount: 0,
redirects: redirects,
latency: redirectReconcileLatency,
}
}

Expand All @@ -82,15 +105,31 @@ func NewRedirectReconciler(client client.Client, rClient *redirectclient.Redirec
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *RedirectReconciler) Reconcile(c context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx, span := r.tracer.Start(c, "RedirectReconciler.Reconcile", trace.WithAttributes(attribute.String("redirect", req.Name)))
defer span.End()
func (r *RedirectReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
startTime := time.Now()

defer func() {
r.latency.Record(ctx, time.Since(startTime).Microseconds(), attribute.String("redirect", req.NamespacedName.String()))
}()

log := r.log.WithName("reconciler").WithValues("redirect", req.NamespacedName)

span := trace.SpanFromContext(ctx)

// Check if the span was sampled and is recording the data
if !span.IsRecording() {
ctx, span = r.tracer.Start(ctx, "RedirectReconciler.Reconcile")
defer span.End()
}

span.SetAttributes(attribute.String("redirect", req.Name))

// Monitor the number of redirects
if redirectList, err := r.rClient.List(ctx); redirectList != nil && err == nil {
activeRedirects.Set(float64(len(redirectList.Items)))

r.redirects.Add(ctx, int64(len(redirectList.Items)-r.redirectCount))
r.redirectCount = len(redirectList.Items)
}

// get Redirect from etcd
Expand Down
48 changes: 43 additions & 5 deletions controllers/shortlink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package controllers

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/trace"

"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -61,15 +64,35 @@ type ShortLinkReconciler struct {
scheme *runtime.Scheme
log *logr.Logger
tracer trace.Tracer

shortlinkCount int
shortlinks instrument.Int64UpDownCounter
latency instrument.Int64Histogram
}

// NewShortLinkReconciler returns a new ShortLinkReconciler
func NewShortLinkReconciler(client *shortlinkclient.ShortlinkClient, scheme *runtime.Scheme, log *logr.Logger, tracer trace.Tracer) *ShortLinkReconciler {
func NewShortLinkReconciler(client *shortlinkclient.ShortlinkClient, scheme *runtime.Scheme, log *logr.Logger, tracer trace.Tracer, meter metric.Meter) *ShortLinkReconciler {
var shortlinks, _ = meter.Int64UpDownCounter(
"urlshortener.active_shortlinks",
instrument.WithUnit("count"),
instrument.WithDescription("Amount of shortlinks (redirect a short-name to another URI)"),
)

var shortlinkReconcileLatency, _ = meter.Int64Histogram(
"urlshortener.shortlink_controller.reconcile_latency",
instrument.WithUnit("microseconds"),
instrument.WithDescription("How long does the reconcile function run for"),
)

return &ShortLinkReconciler{
client: client,
scheme: scheme,
log: log,
tracer: tracer,

shortlinkCount: 0,
shortlinks: shortlinks,
latency: shortlinkReconcileLatency,
}
}

Expand All @@ -82,24 +105,39 @@ func NewShortLinkReconciler(client *shortlinkclient.ShortlinkClient, scheme *run
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *ShortLinkReconciler) Reconcile(c context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx, span := r.tracer.Start(c, "ShortLinkReconciler.Reconcile", trace.WithAttributes(attribute.String("shortlink", req.Name)))
defer span.End()
func (r *ShortLinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
startTime := time.Now()

defer func() {
r.latency.Record(ctx, time.Since(startTime).Microseconds(), attribute.String("shortlink", req.NamespacedName.String()))
}()

log := r.log.WithName("reconciler").WithValues("shortlink", req.NamespacedName.String())

span := trace.SpanFromContext(ctx)

// Check if the span was sampled and is recording the data
if !span.IsRecording() {
ctx, span = r.tracer.Start(ctx, "ShortLinkReconciler.Reconcile")
defer span.End()
}

span.SetAttributes(attribute.String("shortlink", req.NamespacedName.String()))

// Get ShortLink from etcd
shortlink, err := r.client.GetNamespaced(ctx, req.NamespacedName)
if err != nil || shortlink == nil {
if errors.IsNotFound(err) {
activeShortlinks.Dec()
observability.RecordInfo(span, &log, "Shortlink resource not found. Ignoring since object must be deleted")
} else {
observability.RecordError(span, &log, err, "Failed to fetch ShortLink resource")
}
}

if shortlinkList, err := r.client.ListNamespaced(ctx, req.Namespace); shortlinkList != nil && err == nil {
r.shortlinks.Add(ctx, int64(len(shortlinkList.Items)-r.shortlinkCount))
r.shortlinkCount = len(shortlinkList.Items)

activeShortlinks.Set(float64(len(shortlinkList.Items)))

for _, shortlink := range shortlinkList.Items {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
require (
github.com/felixge/httpsnoop v1.0.3 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.37.0 // indirect
)

require (
Expand Down Expand Up @@ -84,6 +85,7 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.9 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.37.0
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,15 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.13.0 h1:Any/nVxaoMq1T2w0W85
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.13.0/go.mod h1:46vAP6RWfNn7EKov73l5KBFlNxz8kYlxR1woU+bJ4ZY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.13.0 h1:Ntu7izEOIRHEgQNjbGc7j3eNtYMAiZfElJJ4JiiRDH4=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.13.0/go.mod h1:wZ9SAjm2sjw3vStBhlCfMZWZusyOQrwrHOFo00jyMC4=
go.opentelemetry.io/otel/exporters/prometheus v0.37.0 h1:NQc0epfL0xItsmGgSXgfbH2C1fq2VLXkZoDFsfRNHpc=
go.opentelemetry.io/otel/exporters/prometheus v0.37.0/go.mod h1:hB8qWjsStK36t50/R0V2ULFb4u95X/Q6zupXLgvjTh8=
go.opentelemetry.io/otel/metric v0.37.0 h1:pHDQuLQOZwYD+Km0eb657A25NaRzy0a+eLyKfDXedEs=
go.opentelemetry.io/otel/metric v0.37.0/go.mod h1:DmdaHfGt54iV6UKxsV9slj2bBRJcKC1B1uvDLIioc1s=
go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe374q6cZwUU=
go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvxGzY=
go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM=
go.opentelemetry.io/otel/sdk/metric v0.37.0 h1:haYBBtZZxiI3ROwSmkZnI+d0+AVzBWeviuYQDeBWosU=
go.opentelemetry.io/otel/sdk/metric v0.37.0/go.mod h1:mO2WV1AZKKwhwHTV3AKOoIEb9LbUaENZDuGUQd+j4A0=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M=
go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8=
Expand Down
17 changes: 16 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ func main() {
}
}()

// Initialize Metrics (OpenTelemetry)
meterProvider, meter, err := observability.InitMetrics(serviceName, serviceVersion)
if err != nil {
setupLog.Error(err, "failed initializing tracing")
os.Exit(1)
}

defer func() {
if err := meterProvider.Shutdown(context.Background()); err != nil {
shutdownLog.Error(err, "Error shutting down metrics provider")
}
}()

// Start namespaced
namespace := ""

Expand Down Expand Up @@ -161,6 +174,7 @@ func main() {
mgr.GetScheme(),
&ctrl.Log,
tracer,
meter,
)

if err = shortlinkReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -175,6 +189,7 @@ func main() {
mgr.GetScheme(),
&ctrl.Log,
tracer,
meter,
)

if err = redirectReconciler.SetupWithManager(mgr); err != nil {
Expand Down Expand Up @@ -213,7 +228,7 @@ func main() {

// Init Gin Framework
gin.SetMode(gin.ReleaseMode)
r, srv := router.NewGinGonicHTTPServer(&setupLog, bindAddr)
r, srv := router.NewGinGonicHTTPServer(&setupLog, bindAddr, serviceName)

setupLog.Info("Load API routes")
router.Load(r, shortlinkController)
Expand Down
3 changes: 1 addition & 2 deletions pkg/client/redirect_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"io/ioutil"
"os"

"github.com/cedi/urlshortener/api/v1alpha1"
Expand Down Expand Up @@ -37,7 +36,7 @@ func (c *RedirectClient) Get(ct context.Context, name string) (*v1alpha1.Redirec
defer span.End()

// try to read the namespace from /var/run
namespace, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
span.RecordError(err)
return nil, errors.Wrap(err, "Unable to read current namespace")
Expand Down
75 changes: 73 additions & 2 deletions pkg/observability/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@ package observability

import (
"context"
"fmt"
"os"
"strings"

"github.com/MrAlias/flow"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
otelProm "go.opentelemetry.io/otel/exporters/prometheus"
otelMetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
)

func InitTracer(serviceName, serviceVersion string) (*sdkTrace.TracerProvider, trace.Tracer, error) {
ctx := context.Background()

otlpEndpoint, ok := os.LookupEnv("OTLP_ENDPOINT")
otlpInsecure := os.Getenv("OTLP_INSECURE")
Expand All @@ -37,7 +43,7 @@ func InitTracer(serviceName, serviceVersion string) (*sdkTrace.TracerProvider, t

client := otlptracehttp.NewClient(otlpOptions...)

otlptracehttpExporter, err := otlptrace.New(context.Background(), client)
otlptracehttpExporter, err := otlptrace.New(ctx, client)
if err != nil {
return nil, nil, errors.Wrap(err, "failed creating OTLP trace exporter")
}
Expand All @@ -48,7 +54,7 @@ func InitTracer(serviceName, serviceVersion string) (*sdkTrace.TracerProvider, t
}

resources, err := resource.New(
context.Background(),
ctx,
resource.WithFromEnv(), // pull attributes from OTEL_RESOURCE_ATTRIBUTES and OTEL_SERVICE_NAME environment variables
resource.WithOS(), // This option configures a set of Detectors that discover OS information
resource.WithContainer(), // This option configures a set of Detectors that discover container information
Expand Down Expand Up @@ -80,3 +86,68 @@ func InitTracer(serviceName, serviceVersion string) (*sdkTrace.TracerProvider, t

return traceProvider, trace, nil
}

func InitMetrics(serviceName, serviceVersion string) (*metric.MeterProvider, otelMetric.Meter, error) {
ctx := context.Background()

otlpEndpoint, ok := os.LookupEnv("OTLP_ENDPOINT")
otlpInsecure := os.Getenv("OTLP_INSECURE")

otlpOptions := make([]otlptracehttp.Option, 0)

if ok {
otlpOptions = append(otlpOptions, otlptracehttp.WithEndpoint(otlpEndpoint))

if strings.ToLower(otlpInsecure) == "true" {
otlpOptions = append(otlpOptions, otlptracehttp.WithInsecure())
}
} else {
otlpOptions = append(otlpOptions, otlptracehttp.WithEndpoint("localhost:4318"))
otlpOptions = append(otlpOptions, otlptracehttp.WithInsecure())
}

registry := prometheus.NewRegistry()
exporter, err := otelProm.New(
otelProm.WithoutUnits(),
otelProm.WithRegisterer(registry),
)

if err != nil {
return nil, nil, err
}

hostname, err := os.Hostname()
if err != nil {
return nil, nil, err
}

resources, err := resource.New(
ctx,
resource.WithFromEnv(), // pull attributes from OTEL_RESOURCE_ATTRIBUTES and OTEL_SERVICE_NAME environment variables
resource.WithOS(), // This option configures a set of Detectors that discover OS information
resource.WithContainer(), // This option configures a set of Detectors that discover container information
resource.WithHost(), // This option configures a set of Detectors that discover host information
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(serviceVersion),
semconv.ServiceInstanceIDKey.String(hostname),
),
)
if err != nil {
return nil, nil, err
}

resources, err = resource.Merge(resource.Default(), resources)
if err != nil {
return nil, nil, err
}

provider := metric.NewMeterProvider(
metric.WithResource(resources),
metric.WithReader(exporter),
)

meter := provider.Meter(fmt.Sprintf("%sMeter", serviceName))

return provider, meter, nil
}
Loading

0 comments on commit cb01d19

Please sign in to comment.