Skip to content

Commit

Permalink
feat: timeline events for pubsub (#3322)
Browse files Browse the repository at this point in the history
  • Loading branch information
safeer authored Nov 6, 2024
1 parent 03861f5 commit 7e8abfa
Show file tree
Hide file tree
Showing 35 changed files with 2,031 additions and 715 deletions.
57 changes: 57 additions & 0 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter
eventTypes = append(eventTypes, timeline.EventTypeCronScheduled)
case pbconsole.EventType_EVENT_TYPE_ASYNC_EXECUTE:
eventTypes = append(eventTypes, timeline.EventTypeAsyncExecute)
case pbconsole.EventType_EVENT_TYPE_PUBSUB_PUBLISH:
eventTypes = append(eventTypes, timeline.EventTypePubSubPublish)
case pbconsole.EventType_EVENT_TYPE_PUBSUB_CONSUME:
eventTypes = append(eventTypes, timeline.EventTypePubSubConsume)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType))
}
Expand Down Expand Up @@ -827,6 +831,59 @@ func eventDALToProto(event timeline.Event) *pbconsole.Event {
},
}

case *timeline.PubSubPublishEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
requestKey = &r
}

return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_PubsubPublish{
PubsubPublish: &pbconsole.PubSubPublishEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
VerbRef: event.SourceVerb.ToProto().(*schemapb.Ref), //nolint:forcetypeassert
TimeStamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Topic: event.Topic,
Request: string(event.Request),
Error: event.Error.Ptr(),
},
},
}

case *timeline.PubSubConsumeEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
requestKey = &r
}

var destVerbModule string
var destVerbName string
if destVerb, ok := event.DestVerb.Get(); ok {
destVerbModule = destVerb.Module
destVerbName = destVerb.Name
}

return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_PubsubConsume{
PubsubConsume: &pbconsole.PubSubConsumeEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
DestVerbModule: &destVerbModule,
DestVerbName: &destVerbName,
TimeStamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Topic: event.Topic,
Error: event.Error.Ptr(),
},
},
}

default:
panic(fmt.Errorf("unknown event type %T", event))
}
Expand Down
34 changes: 31 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,12 @@ func New(
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})

pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub

svc.registry = artefacts.New(conn)

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc), timelineSvc)
svc.pubSub = pubSub
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, timelineSvc, conn)
svc.cronJobs = cronSvc
svc.dal = dal.New(ctx, conn, encryption, pubSub, cronSvc)
Expand Down Expand Up @@ -859,7 +858,36 @@ func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque

func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
// Publish the event.
now := time.Now().UTC()
pubishError := optional.None[string]()
err := s.pubSub.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Caller, req.Msg.Body)
if err != nil {
pubishError = optional.Some(err.Error())
}

requestKey := optional.None[string]()
if rk, err := rpc.RequestKeyFromContext(ctx); err == nil {
if rk, ok := rk.Get(); ok {
requestKey = optional.Some(rk.String())
}
}

// Add to timeline.
sstate := s.schemaState.Load()
module := req.Msg.Topic.Module
route, ok := sstate.routes[module]
if ok {
s.timeline.EnqueueEvent(ctx, &timeline.PubSubPublish{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
Time: now,
SourceVerb: schema.Ref{Name: req.Msg.Caller, Module: req.Msg.Topic.Module},
Topic: req.Msg.Topic.Name,
Request: req.Msg,
Error: pubishError,
})
}

if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to publish a event to topic %s:%s: %w", req.Msg.Topic.Module, req.Msg.Topic.Name, err))
}
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/internal/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestNewCronJobsForModule(t *testing.T) {
timelineSrv := timeline.New(ctx, conn, encryption)
cjs := cronjobs.NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)

pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs)
moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute
Expand Down
5 changes: 4 additions & 1 deletion backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand All @@ -22,7 +23,9 @@ func TestNoCallToAcquire(t *testing.T) {
conn := sqltest.OpenForTesting(ctx, t)
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

timelineSvc := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSvc)
dal := New(ctx, conn, encryption, pubSub, nil)

_, _, err = dal.AcquireAsyncCall(ctx)
Expand Down
6 changes: 3 additions & 3 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestDAL(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
timelineSrv := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)
key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
dal := New(ctx, conn, encryption, pubSub, cjs)
Expand Down Expand Up @@ -195,9 +195,9 @@ func TestCreateArtefactConflict(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

timelineSrv := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)

key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
dal := New(ctx, conn, encryption, pubSub, cjs)
Expand Down
23 changes: 17 additions & 6 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 32 additions & 6 deletions backend/controller/pubsub/internal/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/observability"
dalsql "github.com/TBD54566975/ftl/backend/controller/pubsub/internal/sql"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscr
}), nil
}

func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error) {
func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration, timelineSvc *timeline.Service) (count int, err error) {
tx, err := d.Begin(ctx)
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %w", err)
Expand All @@ -115,36 +116,58 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t

successful := 0
for _, subscription := range subs {
now := time.Now().UTC()
enqueueTimelineEvent := func(destVerb optional.Option[schema.RefKey], err optional.Option[string]) {
timelineSvc.EnqueueEvent(ctx, &timeline.PubSubConsume{
DeploymentKey: subscription.DeploymentKey,
RequestKey: subscription.RequestKey,
Time: now,
DestVerb: destVerb,
Topic: subscription.Topic.Payload.Name,
Error: err,
})
}

nextCursor, err := tx.db.GetNextEventForSubscription(ctx, sqltypes.Duration(eventConsumptionDelay), subscription.Topic, subscription.Cursor)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("failed to get next cursor: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("failed to get next cursor: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(err.Error()))
return 0, err
}
payload, ok := nextCursor.Payload.Get()
if !ok {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Payload.Get", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("could not find payload to progress subscription: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("could not find payload to progress subscription: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(err.Error()))
return 0, err
}
nextCursorKey, ok := nextCursor.Event.Get()
if !ok {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("could not find event to progress subscription: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("could not find event to progress subscription: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(err.Error()))
return 0, err
}
if !nextCursor.Ready {
logger.Tracef("Skipping subscription %s because event is too new", subscription.Key)
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(fmt.Sprintf("Skipping subscription %s because event is too new", subscription.Key)))
continue
}

subscriber, err := tx.db.GetRandomSubscriber(ctx, subscription.Key)
if err != nil {
logger.Tracef("no subscriber for subscription %s", subscription.Key)
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(fmt.Sprintf("no subscriber for subscription %s", subscription.Key)))
continue
}

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "BeginConsumingTopicEvent", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to progress subscription: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("failed to progress subscription: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.Some(subscriber.Sink), optional.Some(err.Error()))
return 0, err
}

origin := async.AsyncOriginPubSub{
Expand All @@ -169,10 +192,13 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
observability.AsyncCalls.Created(ctx, subscriber.Sink, subscriber.CatchVerb, origin.String(), int64(subscriber.RetryAttempts), err)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("failed to schedule async task for subscription: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.Some(subscriber.Sink), optional.Some(err.Error()))
return 0, err
}

observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), subscriber.Sink)
enqueueTimelineEvent(optional.Some(subscriber.Sink), optional.None[string]())
successful++
}

Expand Down
10 changes: 8 additions & 2 deletions backend/controller/pubsub/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,12 @@ SELECT
subs.key::subscription_key as key,
curser.key as cursor,
topics.key::topic_key as topic,
subs.name
subs.name,
deployments.key as deployment_key,
curser.request_key as request_key
FROM topic_subscriptions subs
JOIN runner_count on subs.deployment_id = runner_count.deployment
JOIN deployments ON subs.deployment_id = deployments.id
LEFT JOIN topics ON subs.topic_id = topics.id
LEFT JOIN topic_events curser ON subs.cursor = curser.id
WHERE subs.cursor IS DISTINCT FROM topics.head
Expand All @@ -138,6 +141,7 @@ ORDER BY curser.created_at
LIMIT 3
FOR UPDATE OF subs SKIP LOCKED;


-- name: GetNextEventForSubscription :one
WITH cursor AS (
SELECT
Expand Down Expand Up @@ -166,9 +170,11 @@ SELECT
subscribers.retry_attempts as retry_attempts,
subscribers.backoff as backoff,
subscribers.max_backoff as max_backoff,
subscribers.catch_verb as catch_verb
subscribers.catch_verb as catch_verb,
deployments.key as deployment_key
FROM topic_subscribers as subscribers
JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id
JOIN deployments ON subscribers.deployment_id = deployments.id
WHERE topic_subscriptions.key = sqlc.arg('key')::subscription_key
ORDER BY RANDOM()
LIMIT 1;
Expand Down
Loading

0 comments on commit 7e8abfa

Please sign in to comment.