Skip to content

Commit

Permalink
reintroduced the ability to synchronize with topic publishing when th…
Browse files Browse the repository at this point in the history
…ere are no live subscriptions.
  • Loading branch information
jonathanj-square committed Aug 14, 2024
1 parent 80d4330 commit 8a15be5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
6 changes: 6 additions & 0 deletions go-runtime/ftl/ftltest/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func newFakePubSub(ctx context.Context) *fakePubSub {
}

func (f *fakePubSub) publishEvent(topic *schema.Ref, event any) error {
// tracks event publication to a topic
f.publishWaitGroup.Add(1)
// tracks event subscription consumption completion
f.publishWaitGroup.Add(f.fetchTopicState(topic).subscriptionCount)
return f.globalTopic.PublishSync(publishEvent{topic: topic, content: event})
}
Expand Down Expand Up @@ -169,6 +172,8 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
logger.Debugf("publishing to %s: %v", event.topic.Name, event.content)
ts := f.fetchTopicState(event.topic)
ts.events = append(ts.events, event.content)
// indicate that the event has been published to the topic
f.publishWaitGroup.Done()
case subscriptionDidConsumeEvent:
sub, ok := f.subscriptions[event.subscription]
if !ok {
Expand All @@ -178,6 +183,7 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
sub.errors[sub.cursor.MustGet()] = event.err
}
sub.isExecuting = false
// indicate that the subscription has processed the event
f.publishWaitGroup.Done()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package subscriber_test

import (
"ftl/pubsub"
"testing"

"github.com/TBD54566975/ftl/go-runtime/ftl/ftltest"
"github.com/alecthomas/assert/v2"
"testing"
)

func TestPublishToExternalModule(t *testing.T) {
ctx := ftltest.Context()

assert.NoError(t, pubsub.Topic1.Publish(ctx, pubsub.Event{Value: "external"}))

ftltest.WaitForSubscriptionsToComplete(ctx)

assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic1)))

// Make sure we correctly made the right ref for the external module.
Expand Down

0 comments on commit 8a15be5

Please sign in to comment.