From 749fb196c31577516766b370948fe5a1352cd0e8 Mon Sep 17 00:00:00 2001 From: Denise Li Date: Mon, 29 Jul 2024 21:04:25 -0400 Subject: [PATCH] feat: part 1 of adding metrics for pubsub (#2193) Implement `ftl.pubsub.published` and `ftl.pubsub.subscriber.called` ``` ScopeMetrics #0 ScopeMetrics SchemaURL: InstrumentationScope ftl.pubsub Metric #0 Descriptor: -> Name: ftl.pubsub.published -> Description: the number of times that an event is published to a topic -> Unit: 1 -> DataType: Sum -> IsMonotonic: true -> AggregationTemporality: Cumulative NumberDataPoints #0 Data point attributes: -> ftl.module.name: Str(echo) -> ftl.pubsub.topic.name: Str(echotopic) StartTimestamp: 2024-07-30 00:53:26.173902 +0000 UTC Timestamp: 2024-07-30 00:53:41.175052 +0000 UTC Value: 1 Metric #1 Descriptor: -> Name: ftl.pubsub.subscriber.called -> Description: the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber -> Unit: 1 -> DataType: Sum -> IsMonotonic: true -> AggregationTemporality: Cumulative NumberDataPoints #0 Data point attributes: -> ftl.module.name: Str(echo) -> ftl.pubsub.subscriber.sink.ref: Str(echo.echoSinkOne) -> ftl.pubsub.subscription.ref: Str(echo.sub) -> ftl.pubsub.topic.name: Str(echotopic) StartTimestamp: 2024-07-30 00:53:26.173906 +0000 UTC Timestamp: 2024-07-30 00:53:41.175069 +0000 UTC Value: 1 ``` PubSub design doc for reference: https://hackmd.io/@ftl/HyDTmzMdp Issue: https://github.com/TBD54566975/ftl/issues/2194 --------- Co-authored-by: github-actions[bot] --- backend/controller/dal/pubsub.go | 4 + .../controller/observability/observability.go | 10 ++- backend/controller/observability/pubsub.go | 76 +++++++++++++++++++ 3 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 backend/controller/observability/pubsub.go diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 8f086934c..c2f2a3ab5 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/sql" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" @@ -23,6 +24,7 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa if err != nil { return dalerrs.TranslatePGError(err) } + observability.PubSub.Published(ctx, module, topic) return nil } @@ -100,6 +102,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t if err != nil { return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err)) } + + observability.PubSub.SubscriberCalled(ctx, subscription.Topic.Payload.Name, schema.RefKey{Module: subscription.Key.Payload.Module, Name: subscription.Name}, subscriber.Sink) successful++ } return successful, nil diff --git a/backend/controller/observability/observability.go b/backend/controller/observability/observability.go index 176e61af4..da8165527 100644 --- a/backend/controller/observability/observability.go +++ b/backend/controller/observability/observability.go @@ -1,19 +1,25 @@ package observability import ( + "errors" "fmt" ) var ( - FSM *FSMMetrics + FSM *FSMMetrics + PubSub *PubSubMetrics ) func init() { + var errs error var err error FSM, err = initFSMMetrics() + errs = errors.Join(errs, err) + PubSub, err = initPubSubMetrics() + errs = errors.Join(errs, err) if err != nil { - panic(fmt.Errorf("could not initialize controller metrics: %w", err)) + panic(fmt.Errorf("could not initialize controller metrics: %w", errs)) } } diff --git a/backend/controller/observability/pubsub.go b/backend/controller/observability/pubsub.go new file mode 100644 index 000000000..242bf3291 --- /dev/null +++ b/backend/controller/observability/pubsub.go @@ -0,0 +1,76 @@ +package observability + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/observability" +) + +const ( + pubsubMeterName = "ftl.pubsub" + pubsubTopicNameAttribute = "ftl.pubsub.topic.name" + pubsubSubscriptionRefAttribute = "ftl.pubsub.subscription.ref" + pubsubSubscriberRefAttribute = "ftl.pubsub.subscriber.sink.ref" +) + +type PubSubMetrics struct { + meter metric.Meter + published metric.Int64Counter + subscriberCalled metric.Int64Counter +} + +func initPubSubMetrics() (*PubSubMetrics, error) { + result := &PubSubMetrics{} + var errs error + var err error + + result.meter = otel.Meter(pubsubMeterName) + + counterName := fmt.Sprintf("%s.published", pubsubMeterName) + if result.published, err = result.meter.Int64Counter( + counterName, + metric.WithUnit("1"), + metric.WithDescription("the number of times that an event is published to a topic")); err != nil { + errs = handleInitCounterError(errs, err, counterName) + result.published = noop.Int64Counter{} + } + + counterName = fmt.Sprintf("%s.subscriber.called", pubsubMeterName) + if result.subscriberCalled, err = result.meter.Int64Counter( + counterName, + metric.WithUnit("1"), + metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil { + errs = handleInitCounterError(errs, err, counterName) + result.subscriberCalled = noop.Int64Counter{} + } + + return result, errs +} + +func handleInitCounterError(errs error, err error, counterName string) error { + return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counterName, err)) +} + +func (m *PubSubMetrics) Published(ctx context.Context, module, topic string) { + m.published.Add(ctx, 1, metric.WithAttributes( + attribute.String(observability.ModuleNameAttribute, module), + attribute.String(pubsubTopicNameAttribute, topic), + )) +} + +func (m *PubSubMetrics) SubscriberCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) { + m.subscriberCalled.Add(ctx, 1, metric.WithAttributes( + attribute.String(observability.ModuleNameAttribute, sink.Module), + attribute.String(pubsubTopicNameAttribute, topic), + attribute.String(pubsubSubscriptionRefAttribute, subscription.String()), + attribute.String(pubsubSubscriberRefAttribute, sink.String()), + )) +}