Skip to content

Commit

Permalink
feat: part 1 of adding metrics for pubsub (#2193)
Browse files Browse the repository at this point in the history
Implement `ftl.pubsub.published` and `ftl.pubsub.subscriber.called`

```
ScopeMetrics #0
ScopeMetrics SchemaURL: 
InstrumentationScope ftl.pubsub 

Metric #0
Descriptor:
     -> Name: ftl.pubsub.published
     -> Description: the number of times that an event is published to a topic
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.module.name: Str(echo)
     -> ftl.pubsub.topic.name: Str(echotopic)
StartTimestamp: 2024-07-30 00:53:26.173902 +0000 UTC
Timestamp: 2024-07-30 00:53:41.175052 +0000 UTC
Value: 1

Metric #1
Descriptor:
     -> Name: ftl.pubsub.subscriber.called
     -> Description: the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.module.name: Str(echo)
     -> ftl.pubsub.subscriber.sink.ref: Str(echo.echoSinkOne)
     -> ftl.pubsub.subscription.ref: Str(echo.sub)
     -> ftl.pubsub.topic.name: Str(echotopic)
StartTimestamp: 2024-07-30 00:53:26.173906 +0000 UTC
Timestamp: 2024-07-30 00:53:41.175069 +0000 UTC
Value: 1
```

PubSub design doc for reference: https://hackmd.io/@ftl/HyDTmzMdp
Issue: #2194

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
deniseli and github-actions[bot] authored Jul 30, 2024
1 parent c437a63 commit 749fb19
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 2 deletions.
4 changes: 4 additions & 0 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/sql"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
Expand All @@ -23,6 +24,7 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa
if err != nil {
return dalerrs.TranslatePGError(err)
}
observability.PubSub.Published(ctx, module, topic)
return nil
}

Expand Down Expand Up @@ -100,6 +102,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
if err != nil {
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err))
}

observability.PubSub.SubscriberCalled(ctx, subscription.Topic.Payload.Name, schema.RefKey{Module: subscription.Key.Payload.Module, Name: subscription.Name}, subscriber.Sink)
successful++
}
return successful, nil
Expand Down
10 changes: 8 additions & 2 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package observability

import (
"errors"
"fmt"
)

var (
FSM *FSMMetrics
FSM *FSMMetrics
PubSub *PubSubMetrics
)

func init() {
var errs error
var err error

FSM, err = initFSMMetrics()
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)

if err != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w", err))
panic(fmt.Errorf("could not initialize controller metrics: %w", errs))
}
}
76 changes: 76 additions & 0 deletions backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package observability

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
pubsubMeterName = "ftl.pubsub"
pubsubTopicNameAttribute = "ftl.pubsub.topic.name"
pubsubSubscriptionRefAttribute = "ftl.pubsub.subscription.ref"
pubsubSubscriberRefAttribute = "ftl.pubsub.subscriber.sink.ref"
)

type PubSubMetrics struct {
meter metric.Meter
published metric.Int64Counter
subscriberCalled metric.Int64Counter
}

func initPubSubMetrics() (*PubSubMetrics, error) {
result := &PubSubMetrics{}
var errs error
var err error

result.meter = otel.Meter(pubsubMeterName)

counterName := fmt.Sprintf("%s.published", pubsubMeterName)
if result.published, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that an event is published to a topic")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.published = noop.Int64Counter{}
}

counterName = fmt.Sprintf("%s.subscriber.called", pubsubMeterName)
if result.subscriberCalled, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.subscriberCalled = noop.Int64Counter{}
}

return result, errs
}

func handleInitCounterError(errs error, err error, counterName string) error {
return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counterName, err))
}

func (m *PubSubMetrics) Published(ctx context.Context, module, topic string) {
m.published.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(pubsubTopicNameAttribute, topic),
))
}

func (m *PubSubMetrics) SubscriberCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) {
m.subscriberCalled.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, sink.Module),
attribute.String(pubsubTopicNameAttribute, topic),
attribute.String(pubsubSubscriptionRefAttribute, subscription.String()),
attribute.String(pubsubSubscriberRefAttribute, sink.String()),
))
}

0 comments on commit 749fb19

Please sign in to comment.