Skip to content

Commit

Permalink
chore: minor refactor to get all otel metric code following the same …
Browse files Browse the repository at this point in the history
…patterns (#2288)

part 1 of #2286
  • Loading branch information
deniseli authored Aug 7, 2024
1 parent 005f605 commit 4f39234
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 118 deletions.
4 changes: 0 additions & 4 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ func initAsyncCallMetrics() (*AsyncCallMetrics, error) {
return result, nil
}

func wrapErr(signalName string, err error) error {
return fmt.Errorf("failed to create %q signal: %w", signalName, err)
}

func (m *AsyncCallMetrics) Created(ctx context.Context, verb schema.RefKey, origin string, remainingAttempts int64, maybeErr error) {
attrs := extractRefAttrs(verb, origin)
attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil))
Expand Down
29 changes: 16 additions & 13 deletions backend/controller/observability/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/observability"
Expand All @@ -28,36 +29,38 @@ type CronMetrics struct {
}

func initCronMetrics() (*CronMetrics, error) {
result := &CronMetrics{}
result := &CronMetrics{
jobsActive: noop.Int64UpDownCounter{},
jobsCompleted: noop.Int64Counter{},
jobLatency: noop.Int64Histogram{},
}

var errs error
var err error

meter := otel.Meter(deploymentMeterName)

counter := fmt.Sprintf("%s.jobs.completed", cronMeterName)
signalName := fmt.Sprintf("%s.jobs.completed", cronMeterName)
if result.jobsCompleted, err = meter.Int64Counter(
counter,
signalName,
metric.WithDescription("the number of cron jobs completed; successful or otherwise")); err != nil {
result.jobsCompleted, errs = handleInt64CounterError(counter, err, errs)
return nil, wrapErr(signalName, err)
}

counter = fmt.Sprintf("%s.jobs.active", cronMeterName)
signalName = fmt.Sprintf("%s.jobs.active", cronMeterName)
if result.jobsActive, err = meter.Int64UpDownCounter(
counter,
signalName,
metric.WithDescription("the number of actively executing cron jobs")); err != nil {
result.jobsActive, errs = handleInt64UpDownCounterError(counter, err, errs)
return nil, wrapErr(signalName, err)
}

counter = fmt.Sprintf("%s.job.latency", cronMeterName)
signalName = fmt.Sprintf("%s.job.latency", cronMeterName)
if result.jobLatency, err = meter.Int64Histogram(
counter,
signalName,
metric.WithDescription("the latency between the scheduled execution time of a cron job"),
metric.WithUnit("ms")); err != nil {
result.jobLatency, errs = handleInt64HistogramCounterError(counter, err, errs)
return nil, wrapErr(signalName, err)
}

return result, errs
return result, nil
}

func (m *CronMetrics) JobStarted(ctx context.Context, job model.CronJob) {
Expand Down
36 changes: 20 additions & 16 deletions backend/controller/observability/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

"github.com/TBD54566975/ftl/internal/observability"
)
Expand All @@ -23,42 +24,45 @@ type DeploymentMetrics struct {
}

func initDeploymentMetrics() (*DeploymentMetrics, error) {
result := &DeploymentMetrics{}
result := &DeploymentMetrics{
reconciliationFailures: noop.Int64Counter{},
reconciliationsActive: noop.Int64UpDownCounter{},
replicasAdded: noop.Int64Counter{},
replicasRemoved: noop.Int64Counter{},
}

var errs error
var err error

meter := otel.Meter(deploymentMeterName)

counter := fmt.Sprintf("%s.reconciliation.failures", deploymentMeterName)
signalName := fmt.Sprintf("%s.reconciliation.failures", deploymentMeterName)
if result.reconciliationFailures, err = meter.Int64Counter(
counter,
signalName,
metric.WithDescription("the number of failed runner deployment reconciliation tasks")); err != nil {
result.reconciliationFailures, errs = handleInt64CounterError(counter, err, errs)
return nil, wrapErr(signalName, err)
}

counter = fmt.Sprintf("%s.reconciliations.active", deploymentMeterName)
signalName = fmt.Sprintf("%s.reconciliations.active", deploymentMeterName)
if result.reconciliationsActive, err = meter.Int64UpDownCounter(
counter,
signalName,
metric.WithDescription("the number of active deployment reconciliation tasks")); err != nil {
result.reconciliationsActive, errs = handleInt64UpDownCounterError(counter, err, errs)
return nil, wrapErr(signalName, err)
}

counter = fmt.Sprintf("%s.replicas.added", deploymentMeterName)
signalName = fmt.Sprintf("%s.replicas.added", deploymentMeterName)
if result.replicasAdded, err = meter.Int64Counter(
counter,
signalName,
metric.WithDescription("the number of runner replicas added by the deployment reconciliation tasks")); err != nil {
result.replicasAdded, errs = handleInt64CounterError(counter, err, errs)
return nil, wrapErr(signalName, err)
}

counter = fmt.Sprintf("%s.replicas.removed", deploymentMeterName)
signalName = fmt.Sprintf("%s.replicas.removed", deploymentMeterName)
if result.replicasRemoved, err = meter.Int64Counter(
counter,
signalName,
metric.WithDescription("the number of runner replicas removed by the deployment reconciliation tasks")); err != nil {
result.replicasRemoved, errs = handleInt64CounterError(counter, err, errs)
return nil, wrapErr(signalName, err)
}

return result, errs
return result, nil
}

func (m *DeploymentMetrics) ReconciliationFailure(ctx context.Context, module string, key string) {
Expand Down
45 changes: 19 additions & 26 deletions backend/controller/observability/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package observability

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/otel"
Expand All @@ -21,45 +20,43 @@ const (
)

type FSMMetrics struct {
meter metric.Meter
instancesActive metric.Int64UpDownCounter
transitionsActive metric.Int64UpDownCounter
transitionAttempts metric.Int64Counter
}

func initFSMMetrics() (*FSMMetrics, error) {
result := &FSMMetrics{}
result := &FSMMetrics{
instancesActive: noop.Int64UpDownCounter{},
transitionsActive: noop.Int64UpDownCounter{},
transitionAttempts: noop.Int64Counter{},
}

var errs error
var err error
meter := otel.Meter(fsmMeterName)

result.meter = otel.Meter(fsmMeterName)

counter := fmt.Sprintf("%s.instances.active", fsmMeterName)
if result.instancesActive, err = result.meter.Int64UpDownCounter(
counter,
signalName := fmt.Sprintf("%s.instances.active", fsmMeterName)
if result.instancesActive, err = meter.Int64UpDownCounter(
signalName,
metric.WithDescription("counts the number of active FSM instances")); err != nil {
errs = joinInitErrors(counter, err, errs)
result.instancesActive = noop.Int64UpDownCounter{}
return nil, wrapErr(signalName, err)
}

counter = fmt.Sprintf("%s.transitions.active", fsmMeterName)
if result.transitionsActive, err = result.meter.Int64UpDownCounter(
counter,
signalName = fmt.Sprintf("%s.transitions.active", fsmMeterName)
if result.transitionsActive, err = meter.Int64UpDownCounter(
signalName,
metric.WithDescription("counts the number of active FSM transitions")); err != nil {
errs = joinInitErrors(counter, err, errs)
result.transitionsActive = noop.Int64UpDownCounter{}
return nil, wrapErr(signalName, err)
}

counter = fmt.Sprintf("%s.transitions.attempts", fsmMeterName)
if result.transitionAttempts, err = result.meter.Int64Counter(
counter,
signalName = fmt.Sprintf("%s.transitions.attempts", fsmMeterName)
if result.transitionAttempts, err = meter.Int64Counter(
signalName,
metric.WithDescription("counts the number of attempted FSM transitions")); err != nil {
errs = joinInitErrors(counter, err, errs)
result.transitionAttempts = noop.Int64Counter{}
return nil, wrapErr(signalName, err)
}

return result, errs
return result, nil
}

func (m *FSMMetrics) InstanceCreated(ctx context.Context, fsm schema.RefKey) {
Expand Down Expand Up @@ -90,7 +87,3 @@ func (m *FSMMetrics) TransitionCompleted(ctx context.Context, fsm schema.RefKey)
attribute.String(observability.ModuleNameAttribute, fsm.Module),
attribute.String(fsmRefAttribute, fsm.String())))
}

func joinInitErrors(counter string, err error, errs error) error {
return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}
18 changes: 2 additions & 16 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"fmt"
"math"
"time"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
)

var (
Expand Down Expand Up @@ -41,19 +38,8 @@ func init() {
}
}

//nolint:unparam
func handleInt64CounterError(counter string, err error, errs error) (metric.Int64Counter, error) {
return noop.Int64Counter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

//nolint:unparam
func handleInt64UpDownCounterError(counter string, err error, errs error) (metric.Int64UpDownCounter, error) {
return noop.Int64UpDownCounter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

//nolint:unparam
func handleInt64HistogramCounterError(counter string, err error, errs error) (metric.Int64Histogram, error) {
return noop.Int64Histogram{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
func wrapErr(signalName string, err error) error {
return fmt.Errorf("failed to create %q signal: %w", signalName, err)
}

func timeSinceMS(start time.Time) int64 {
Expand Down
34 changes: 14 additions & 20 deletions backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package observability

import (
"context"
"errors"
"fmt"

"github.com/alecthomas/types/optional"
Expand Down Expand Up @@ -31,51 +30,46 @@ const (
)

type PubSubMetrics struct {
meter metric.Meter
published metric.Int64Counter
propagationFailed metric.Int64Counter
sinkCalled metric.Int64Counter
}

func initPubSubMetrics() (*PubSubMetrics, error) {
result := &PubSubMetrics{}
var errs error
var err error
result := &PubSubMetrics{
published: noop.Int64Counter{},
propagationFailed: noop.Int64Counter{},
sinkCalled: noop.Int64Counter{},
}

result.meter = otel.Meter(pubsubMeterName)
var err error
meter := otel.Meter(pubsubMeterName)

counterName := fmt.Sprintf("%s.published", pubsubMeterName)
if result.published, err = result.meter.Int64Counter(
if result.published, err = meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that an event is published to a topic")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.published = noop.Int64Counter{}
return nil, wrapErr(counterName, err)
}

counterName = fmt.Sprintf("%s.propagation.failed", pubsubMeterName)
if result.propagationFailed, err = result.meter.Int64Counter(
if result.propagationFailed, err = meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that subscriptions fail to progress")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.propagationFailed = noop.Int64Counter{}
return nil, wrapErr(counterName, err)
}

counterName = fmt.Sprintf("%s.sink.called", pubsubMeterName)
if result.sinkCalled, err = result.meter.Int64Counter(
if result.sinkCalled, err = meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.sinkCalled = noop.Int64Counter{}
return nil, wrapErr(counterName, err)
}

return result, errs
}

func handleInitCounterError(errs error, err error, counterName string) error {
return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counterName, err))
return result, nil
}

func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller string, maybeErr error) {
Expand Down
14 changes: 8 additions & 6 deletions backend/runner/observability/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

"github.com/TBD54566975/ftl/internal/observability"
)
Expand All @@ -22,28 +23,29 @@ type DeploymentMetrics struct {
}

func initDeploymentMetrics() (*DeploymentMetrics, error) {
result := &DeploymentMetrics{}
result := &DeploymentMetrics{
failure: noop.Int64Counter{},
active: noop.Int64UpDownCounter{},
}

var errs error
var err error

meter := otel.Meter(deploymentMeterName)

counter := fmt.Sprintf("%s.failures", deploymentMeterName)
if result.failure, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of deployment failures")); err != nil {
result.failure, errs = handleInt64CounterError(counter, err, errs)
return nil, wrapErr(counter, err)
}

counter = fmt.Sprintf("%s.active", deploymentMeterName)
if result.active, err = meter.Int64UpDownCounter(
counter,
metric.WithDescription("the number of active deployments")); err != nil {
result.active, errs = handleInt64UpDownCounterError(counter, err, errs)
return nil, wrapErr(counter, err)
}

return result, errs
return result, nil
}

func (m *DeploymentMetrics) Failure(ctx context.Context, key optional.Option[string]) {
Expand Down
13 changes: 2 additions & 11 deletions backend/runner/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package observability
import (
"errors"
"fmt"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
)

var (
Expand All @@ -27,12 +24,6 @@ func init() {
}
}

//nolint:unparam
func handleInt64CounterError(counter string, err error, errs error) (metric.Int64Counter, error) {
return noop.Int64Counter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

//nolint:unparam
func handleInt64UpDownCounterError(counter string, err error, errs error) (metric.Int64UpDownCounter, error) {
return noop.Int64UpDownCounter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
func wrapErr(signalName string, err error) error {
return fmt.Errorf("failed to create %q signal: %w", signalName, err)
}
Loading

0 comments on commit 4f39234

Please sign in to comment.