Skip to content

Commit

Permalink
FINALLY Fixed all of the topic based routing plus delayed messaging c…
Browse files Browse the repository at this point in the history
…ombinations
  • Loading branch information
jeremydmiller committed Jan 30, 2024
1 parent 4f133c9 commit 61fccf7
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
<NoWarn>1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618</NoWarn>
<ImplicitUsings>true</ImplicitUsings>
<Version>1.16.1</Version>
<Version>1.16.2</Version>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public void throws_descriptive_exception()
{
var route = new NoNamedEndpointRoute("foo", new[] { "bar", "baz" });

var ex = Should.Throw<UnknownEndpointException>(() => route.CreateForSending(null, null, null, null));
var ex = Should.Throw<UnknownEndpointException>(() => route.CreateForSending(null, null, null, null, null));

ex.Message.ShouldBe("Endpoint name 'foo' is invalid. Known endpoints are bar, baz");
}
Expand Down
187 changes: 187 additions & 0 deletions src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using IntegrationTests;
using JasperFx.Core;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using TestingSupport;
using TestMessages;
using Wolverine.Attributes;
using Wolverine.Marten;
using Wolverine.Tracking;
using Xunit;

Expand Down Expand Up @@ -193,6 +196,190 @@ public async Task publish_by_user_message_topic_logic_and_delay()
}
}

public class send_by_topics_durable : IDisposable
{
private readonly IHost theGreenReceiver;
private readonly IHost theBlueReceiver;
private readonly IHost theSender;
private readonly IHost theThirdReceiver;

public send_by_topics_durable()
{

theSender = Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Services.AddMarten(Servers.PostgresConnectionString)
.IntegrateWithWolverine("sender");

opts.Policies.UseDurableOutboxOnAllSendingEndpoints();

opts.UseRabbitMq("host=localhost;port=5672").AutoProvision();
opts.PublishAllMessages().ToRabbitTopics("wolverine.topics", exchange =>
{
exchange.BindTopic("color.green").ToQueue("green");
exchange.BindTopic("color.blue").ToQueue("blue");
exchange.BindTopic("color.*").ToQueue("all");
});

opts.PublishMessagesToRabbitMqExchange<RoutedMessage>("wolverine.topics", m => m.TopicName);
}).Start();

theGreenReceiver = WolverineHost.For(opts =>
{
opts.ServiceName = "Green";
opts.ListenToRabbitQueue("green");
opts.UseRabbitMq();
});

theBlueReceiver = WolverineHost.For(opts =>
{
opts.ServiceName = "Blue";
opts.ListenToRabbitQueue("blue");
opts.UseRabbitMq();
});

theThirdReceiver = WolverineHost.For(opts =>
{
opts.ServiceName = "Third";
opts.ListenToRabbitQueue("all");
opts.UseRabbitMq();
});
}

public void Dispose()
{
theSender?.Dispose();
theGreenReceiver?.Dispose();
theBlueReceiver?.Dispose();
theThirdReceiver?.Dispose();
}

internal async Task send_by_topic_sample()
{
#region sample_send_to_topic

var publisher = theSender.Services
.GetRequiredService<IMessageBus>();

await publisher.BroadcastToTopicAsync("color.purple", new Message1());

#endregion
}

[Fact]
public async Task send_by_message_topic()
{
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.SendMessageAndWaitAsync(new PurpleMessage());

session.FindEnvelopesWithMessageType<PurpleMessage>()
.Where(x => x.MessageEventType == MessageEventType.Received)
.Select(x => x.ServiceName)
.Single().ShouldBe("Third");
}

[Fact]
public async Task send_by_message_topic_to_multiple_listeners()
{
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.SendMessageAndWaitAsync(new FirstMessage());

session.FindEnvelopesWithMessageType<FirstMessage>()
.Where(x => x.MessageEventType == MessageEventType.Received)
.Select(x => x.ServiceName)
.OrderBy(x => x).ShouldHaveTheSameElementsAs("Blue", "Third");
}

[Fact]
public async Task send_by_explicit_topic()
{
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.BroadcastMessageToTopicAndWaitAsync("color.green", new PurpleMessage());

session.FindEnvelopesWithMessageType<PurpleMessage>()
.Where(x => x.MessageEventType == MessageEventType.Received)
.Select(x => x.ServiceName)
.OrderBy(x => x)
.ShouldHaveTheSameElementsAs("Green", "Third");
}

[Fact] // this is occasionally failing with timeouts when running in combination with the entire suite
public async Task send_by_explicit_topic_2()
{
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.BroadcastMessageToTopicAndWaitAsync("color.blue", new PurpleMessage());

session.FindEnvelopesWithMessageType<PurpleMessage>()
.Where(x => x.MessageEventType == MessageEventType.Received)
.Select(x => x.ServiceName)
.OrderBy(x => x)
.ShouldHaveTheSameElementsAs("Blue", "Third");
}

[Fact]
public async Task send_to_topic_with_delay()
{
var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.WaitForMessageToBeReceivedAt<FirstMessage>(theBlueReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.InvokeMessageAndWaitAsync(new TriggerTopicMessage());
}

[Fact]
public async Task publish_by_user_message_topic_logic()
{
var routed = new RoutedMessage { TopicName = "color.blue" };

var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.WaitForMessageToBeReceivedAt<RoutedMessage>(theBlueReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.SendMessageAndWaitAsync(routed);

var record = session.Received.RecordsInOrder().Single(x => x.ServiceName == "Blue");

record.Envelope.Message.ShouldBeOfType<RoutedMessage>()
.Id.ShouldBe(routed.Id);
}

[Fact]
public async Task publish_by_user_message_topic_logic_and_delay()
{
var routed = new RoutedMessage { TopicName = "color.blue" };

var session = await theSender
.TrackActivity()
.IncludeExternalTransports()
.Timeout(15.Seconds())
.WaitForMessageToBeReceivedAt<RoutedMessage>(theBlueReceiver)
.AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver)
.SendMessageAndWaitAsync(routed, new DeliveryOptions{ScheduleDelay = 3.Seconds()});

var record = session.Received.RecordsInOrder().Single(x => x.ServiceName == "Blue");

record.Envelope.Message.ShouldBeOfType<RoutedMessage>()
.Id.ShouldBe(routed.Id);
}
}

[Topic("color.purple")]
public class PurpleMessage
{
Expand Down
13 changes: 7 additions & 6 deletions src/Wolverine/Runtime/Routing/IMessageRoute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Wolverine.Runtime.Routing;
public interface IMessageRoute
{
Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue,
WolverineRuntime runtime);
WolverineRuntime runtime, string? topicName);

}

Expand Down Expand Up @@ -36,10 +36,10 @@ public TransformedMessageRoute(Func<TSource, TDestination> transformation, IMess
}

public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue,
WolverineRuntime runtime)
WolverineRuntime runtime, string? topicName)
{
var transformed = _transformation((TSource)message);
return _inner.CreateForSending(transformed!, options, localDurableQueue, runtime);
return _inner.CreateForSending(transformed!, options, localDurableQueue, runtime, topicName);
}
}

Expand Down Expand Up @@ -69,13 +69,14 @@ public IEnumerable<IMessageRoute> FindRoutes(Type messageType, IWolverineRuntime
public bool IsAdditive => true;

public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue,
WolverineRuntime runtime)
WolverineRuntime runtime, string? topicName)
{
if (message is T typedMessage)
{
_route ??= _topicEndpoint.RouteFor(typeof(T), runtime);
var envelope = _route.CreateForSending(message, options, localDurableQueue, runtime);
envelope.TopicName = _topicSource(typedMessage);
topicName ??= _topicSource(typedMessage);

var envelope = _route.CreateForSending(message, options, localDurableQueue, runtime, topicName);

// This is an unfortunate timing of operation issue.
if (envelope is { Message: Envelope scheduled, Status: EnvelopeStatus.Scheduled })
Expand Down
5 changes: 3 additions & 2 deletions src/Wolverine/Runtime/Routing/MessageRoute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancel
}

public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue,
WolverineRuntime runtime)
WolverineRuntime runtime, string? topicName)
{
var envelope = new Envelope(message, Sender)
{
Serializer = Serializer,
ContentType = Serializer.ContentType
ContentType = Serializer.ContentType,
TopicName = topicName
};

if (Sender.Endpoint is LocalQueue)
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/Routing/MessageRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public override Envelope[] RouteForPublish(T message, DeliveryOptions? options)
var envelopes = new Envelope[Routes.Length];
for (var i = 0; i < envelopes.Length; i++)
{
envelopes[i] = Routes[i].CreateForSending(message, options, LocalDurableQueue, Runtime);
envelopes[i] = Routes[i].CreateForSending(message, options, LocalDurableQueue, Runtime, null);
}

return envelopes;
Expand Down
5 changes: 2 additions & 3 deletions src/Wolverine/Runtime/Routing/MessageRouterBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Envelope RouteToDestination(T message, Uri uri, DeliveryOptions? options)
}

return RouteForUri(uri)
.CreateForSending(message, options, LocalDurableQueue, Runtime);
.CreateForSending(message, options, LocalDurableQueue, Runtime, null);
}

public IMessageRoute RouteForUri(Uri destination)
Expand Down Expand Up @@ -104,8 +104,7 @@ public Envelope[] RouteToTopic(T message, string topicName, DeliveryOptions? opt
var envelopes = new Envelope[_topicRoutes.Length];
for (var i = 0; i < envelopes.Length; i++)
{
envelopes[i] = _topicRoutes[i].CreateForSending(message, options, LocalDurableQueue, Runtime);
envelopes[i].TopicName = topicName;
envelopes[i] = _topicRoutes[i].CreateForSending(message, options, LocalDurableQueue, Runtime, topicName);
}

return envelopes;
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/Routing/NoNamedEndpointRoute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public NoNamedEndpointRoute(string endpointName, string[] allNames)
public string EndpointName { get; }

public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue,
WolverineRuntime runtime)
WolverineRuntime runtime, string? topicName)
{
throw new UnknownEndpointException(_message);
}
Expand Down

0 comments on commit 61fccf7

Please sign in to comment.