Skip to content

Watermill messaging subscriber for http push subscriptions on GCP, instrumented by OpenTelemetry

License

Notifications You must be signed in to change notification settings

dentech-floss/subscriber

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

subscriber

Watermill subscriber that is setup to use our custom dentech-floss/watermill-googlecloud-http lib to subscribe to messages delivered by http push subscriptions in GCP. But why push with a custom lib instead of pull using the official watermill-googlecloud lib you may ask? That's because we're running on Cloud Run where we are limited to use push subscriptions (and want to be able to scale down to 0 instances).

The subscriber is preconfigured for distributed Opentelemetry tracing. For this we use both the official watermill-opentelemetry project and our custom complement dentech-floss/watermill-opentelemetry-go-extra lib to extract a propagated parent span and create a child span for this when we receive a message. With this support we get quite awesome observability of the system, since we can see and follow events flowing through the system in an APM of choice!

Install

go get github.com/dentech-floss/[email protected]

Usage

Create the subscriber and start subscribing to a topic/url using a router with support for tracing:

package example

import (
    "github.com/dentech-floss/logging/pkg/logging"
    "github.com/dentech-floss/metadata/pkg/metadata"
    "github.com/dentech-floss/revision/pkg/revision"
    "github.com/dentech-floss/subscriber/pkg/subscriber"

    "github.com/ThreeDotsLabs/watermill/message/router/plugin"

    "github.com/go-chi/chi"
)

func main() {

    metadata := metadata.NewMetadata()

    logger := logging.NewLogger(
        &logging.LoggerConfig{
            OnGCP:       metadata.OnGCP,
            ServiceName: revision.ServiceName,
        },
    )
    defer logger.Sync()

    ...

    bqIngestionService := service.NewAppointmentBigQueryIngestionService(repo, logger)

    httpRouter := chi.NewRouter() // it is not necessary to use chi, you can use your mux of choice

    _subscriber := subscriber.NewSubscriber(
        logger.Logger.Logger, // the *zap.Logger is wrapped like a matryoshka doll :)
        &subscriber.SubscriberConfig{}, // nothing required to provide here atm
        httpRouter.Handle, // register the http handler for the topic/url on chi
    )

    // this Watermill router have tracing middleware added to it
    router := subscriber.InitTracedRouter(logger.Logger.Logger) // the *zap.Logger is wrapped like a matryoshka doll :)
    router.AddPlugin(plugin.SignalsHandler) // kills the router after SIGINT or SIGTERM is sent to the process

    router.AddNoPublisherHandler(
        "pubsub.Subscribe/appointment/claimed", // the name of our handler
        "/push-handlers/pubsub/appointment/claimed", // topic/url we're getting messages pushed to us on
        _subscriber,
        bqIngestionService.HandleAppointmentClaimedEvent, // our handler to invoke
    )

    go func() {
        err := router.Run(ctx)
        if err != nil {
            panic(err)
        }
    }()

    err := http.ListenAndServe(":"+strconv.Itoa(*config.Port), httpRouter)
    if errors.Is(err, http.ErrServerClosed) {
        logger.Info("Server closed")
    } else {
        logger.Error("Server failed", logging.ErrorField(err))
    }
}

Handle the Watermill message by unmarshalling the payload and Ack/Nack the message:

package example

import (
    "github.com/dentech-floss/logging/pkg/logging"
    "github.com/dentech-floss/subscriber/pkg/subscriber"

    appointment_service_v1 "go.buf.build/dentechse/go-grpc-gateway-openapiv2/dentechse/service-definitions/api/appointment/v1"
)

...

func (s *AppointmentBigQueryIngestionService) HandleAppointmentClaimedEvent(msg *message.Message) error {

    event := &appointment_service_v1.AppointmentEvent{}
    // HandleMessage will take care or marshalling + ack/nack'ing the message for us
    err := subscriber.HandleMessage(msg, event, func(ctx context.Context) error {
        err := s.repo.InsertAppointmentClaimedEvent(ctx, event.GetAppointmentClaimed())
        if err != nil {
            s.logger.WithContext(ctx).Error(
                "Failed to insert 'AppointmentClaimedEvent'",
                logging.StringField("msg_uuid", msg.UUID),
                logging.ProtoField("event", event),
                logging.ErrorField(err),
            )
            return err
        }
        return nil
    },
    )
    if err != nil {
        s.logger.WithContext(msg.Context()).Error(
            "Failed to unmarshal 'AppointmentClaimedEvent', ack'ed the message to get rid of it",
            logging.StringField("msg_uuid", msg.UUID),
            logging.StringField("payload", string(msg.Payload)),
            logging.ErrorField(err),
        )
    }
    return err
}