From 61fccf76b1b881f4f1dd1195e84b5e8681b0836c Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 30 Jan 2024 15:20:28 -0600 Subject: [PATCH] FINALLY Fixed all of the topic based routing plus delayed messaging combinations --- Directory.Build.props | 2 +- .../Routing/NoNamedEndpointRouteTests.cs | 2 +- .../send_by_topics.cs | 187 ++++++++++++++++++ .../Runtime/Routing/IMessageRoute.cs | 13 +- src/Wolverine/Runtime/Routing/MessageRoute.cs | 5 +- .../Runtime/Routing/MessageRouter.cs | 2 +- .../Runtime/Routing/MessageRouterBase.cs | 5 +- .../Runtime/Routing/NoNamedEndpointRoute.cs | 2 +- 8 files changed, 203 insertions(+), 15 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index 5d5c18f3e..373c25f9c 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -11,7 +11,7 @@ net6.0;net7.0;net8.0 1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618 true - 1.16.1 + 1.16.2 $(PackageProjectUrl) true true diff --git a/src/Testing/CoreTests/Runtime/Routing/NoNamedEndpointRouteTests.cs b/src/Testing/CoreTests/Runtime/Routing/NoNamedEndpointRouteTests.cs index b8c014d54..b21ff193b 100644 --- a/src/Testing/CoreTests/Runtime/Routing/NoNamedEndpointRouteTests.cs +++ b/src/Testing/CoreTests/Runtime/Routing/NoNamedEndpointRouteTests.cs @@ -10,7 +10,7 @@ public void throws_descriptive_exception() { var route = new NoNamedEndpointRoute("foo", new[] { "bar", "baz" }); - var ex = Should.Throw(() => route.CreateForSending(null, null, null, null)); + var ex = Should.Throw(() => route.CreateForSending(null, null, null, null, null)); ex.Message.ShouldBe("Endpoint name 'foo' is invalid. Known endpoints are bar, baz"); } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs index 5289589f1..9ca0ee1b4 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs @@ -1,13 +1,16 @@ using System; using System.Linq; using System.Threading.Tasks; +using IntegrationTests; using JasperFx.Core; +using Marten; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Shouldly; using TestingSupport; using TestMessages; using Wolverine.Attributes; +using Wolverine.Marten; using Wolverine.Tracking; using Xunit; @@ -193,6 +196,190 @@ public async Task publish_by_user_message_topic_logic_and_delay() } } +public class send_by_topics_durable : IDisposable +{ + private readonly IHost theGreenReceiver; + private readonly IHost theBlueReceiver; + private readonly IHost theSender; + private readonly IHost theThirdReceiver; + + public send_by_topics_durable() + { + + theSender = Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Services.AddMarten(Servers.PostgresConnectionString) + .IntegrateWithWolverine("sender"); + + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + opts.UseRabbitMq("host=localhost;port=5672").AutoProvision(); + opts.PublishAllMessages().ToRabbitTopics("wolverine.topics", exchange => + { + exchange.BindTopic("color.green").ToQueue("green"); + exchange.BindTopic("color.blue").ToQueue("blue"); + exchange.BindTopic("color.*").ToQueue("all"); + }); + + opts.PublishMessagesToRabbitMqExchange("wolverine.topics", m => m.TopicName); + }).Start(); + + theGreenReceiver = WolverineHost.For(opts => + { + opts.ServiceName = "Green"; + opts.ListenToRabbitQueue("green"); + opts.UseRabbitMq(); + }); + + theBlueReceiver = WolverineHost.For(opts => + { + opts.ServiceName = "Blue"; + opts.ListenToRabbitQueue("blue"); + opts.UseRabbitMq(); + }); + + theThirdReceiver = WolverineHost.For(opts => + { + opts.ServiceName = "Third"; + opts.ListenToRabbitQueue("all"); + opts.UseRabbitMq(); + }); + } + + public void Dispose() + { + theSender?.Dispose(); + theGreenReceiver?.Dispose(); + theBlueReceiver?.Dispose(); + theThirdReceiver?.Dispose(); + } + + internal async Task send_by_topic_sample() + { + #region sample_send_to_topic + + var publisher = theSender.Services + .GetRequiredService(); + + await publisher.BroadcastToTopicAsync("color.purple", new Message1()); + + #endregion + } + + [Fact] + public async Task send_by_message_topic() + { + var session = await theSender + .TrackActivity() + .IncludeExternalTransports() + .AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver) + .SendMessageAndWaitAsync(new PurpleMessage()); + + session.FindEnvelopesWithMessageType() + .Where(x => x.MessageEventType == MessageEventType.Received) + .Select(x => x.ServiceName) + .Single().ShouldBe("Third"); + } + + [Fact] + public async Task send_by_message_topic_to_multiple_listeners() + { + var session = await theSender + .TrackActivity() + .IncludeExternalTransports() + .AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver) + .SendMessageAndWaitAsync(new FirstMessage()); + + session.FindEnvelopesWithMessageType() + .Where(x => x.MessageEventType == MessageEventType.Received) + .Select(x => x.ServiceName) + .OrderBy(x => x).ShouldHaveTheSameElementsAs("Blue", "Third"); + } + + [Fact] + public async Task send_by_explicit_topic() + { + var session = await theSender + .TrackActivity() + .IncludeExternalTransports() + .AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver) + .BroadcastMessageToTopicAndWaitAsync("color.green", new PurpleMessage()); + + session.FindEnvelopesWithMessageType() + .Where(x => x.MessageEventType == MessageEventType.Received) + .Select(x => x.ServiceName) + .OrderBy(x => x) + .ShouldHaveTheSameElementsAs("Green", "Third"); + } + + [Fact] // this is occasionally failing with timeouts when running in combination with the entire suite + public async Task send_by_explicit_topic_2() + { + var session = await theSender + .TrackActivity() + .IncludeExternalTransports() + .AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver) + .BroadcastMessageToTopicAndWaitAsync("color.blue", new PurpleMessage()); + + session.FindEnvelopesWithMessageType() + .Where(x => x.MessageEventType == MessageEventType.Received) + .Select(x => x.ServiceName) + .OrderBy(x => x) + .ShouldHaveTheSameElementsAs("Blue", "Third"); + } + + [Fact] + public async Task send_to_topic_with_delay() + { + var session = await theSender + .TrackActivity() + .IncludeExternalTransports() + .WaitForMessageToBeReceivedAt(theBlueReceiver) + .AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver) + .InvokeMessageAndWaitAsync(new TriggerTopicMessage()); + } + + [Fact] + public async Task publish_by_user_message_topic_logic() + { + var routed = new RoutedMessage { TopicName = "color.blue" }; + + var session = await theSender + .TrackActivity() + .IncludeExternalTransports() + .WaitForMessageToBeReceivedAt(theBlueReceiver) + .AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver) + .SendMessageAndWaitAsync(routed); + + var record = session.Received.RecordsInOrder().Single(x => x.ServiceName == "Blue"); + + record.Envelope.Message.ShouldBeOfType() + .Id.ShouldBe(routed.Id); + } + + [Fact] + public async Task publish_by_user_message_topic_logic_and_delay() + { + var routed = new RoutedMessage { TopicName = "color.blue" }; + + var session = await theSender + .TrackActivity() + .IncludeExternalTransports() + .Timeout(15.Seconds()) + .WaitForMessageToBeReceivedAt(theBlueReceiver) + .AlsoTrack(theGreenReceiver, theBlueReceiver, theThirdReceiver) + .SendMessageAndWaitAsync(routed, new DeliveryOptions{ScheduleDelay = 3.Seconds()}); + + var record = session.Received.RecordsInOrder().Single(x => x.ServiceName == "Blue"); + + record.Envelope.Message.ShouldBeOfType() + .Id.ShouldBe(routed.Id); + } +} + [Topic("color.purple")] public class PurpleMessage { diff --git a/src/Wolverine/Runtime/Routing/IMessageRoute.cs b/src/Wolverine/Runtime/Routing/IMessageRoute.cs index 948fee48c..0659f858d 100644 --- a/src/Wolverine/Runtime/Routing/IMessageRoute.cs +++ b/src/Wolverine/Runtime/Routing/IMessageRoute.cs @@ -8,7 +8,7 @@ namespace Wolverine.Runtime.Routing; public interface IMessageRoute { Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, - WolverineRuntime runtime); + WolverineRuntime runtime, string? topicName); } @@ -36,10 +36,10 @@ public TransformedMessageRoute(Func transformation, IMess } public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, - WolverineRuntime runtime) + WolverineRuntime runtime, string? topicName) { var transformed = _transformation((TSource)message); - return _inner.CreateForSending(transformed!, options, localDurableQueue, runtime); + return _inner.CreateForSending(transformed!, options, localDurableQueue, runtime, topicName); } } @@ -69,13 +69,14 @@ public IEnumerable FindRoutes(Type messageType, IWolverineRuntime public bool IsAdditive => true; public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, - WolverineRuntime runtime) + WolverineRuntime runtime, string? topicName) { if (message is T typedMessage) { _route ??= _topicEndpoint.RouteFor(typeof(T), runtime); - var envelope = _route.CreateForSending(message, options, localDurableQueue, runtime); - envelope.TopicName = _topicSource(typedMessage); + topicName ??= _topicSource(typedMessage); + + var envelope = _route.CreateForSending(message, options, localDurableQueue, runtime, topicName); // This is an unfortunate timing of operation issue. if (envelope is { Message: Envelope scheduled, Status: EnvelopeStatus.Scheduled }) diff --git a/src/Wolverine/Runtime/Routing/MessageRoute.cs b/src/Wolverine/Runtime/Routing/MessageRoute.cs index 698aa316e..9905303c3 100644 --- a/src/Wolverine/Runtime/Routing/MessageRoute.cs +++ b/src/Wolverine/Runtime/Routing/MessageRoute.cs @@ -58,12 +58,13 @@ public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancel } public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, - WolverineRuntime runtime) + WolverineRuntime runtime, string? topicName) { var envelope = new Envelope(message, Sender) { Serializer = Serializer, - ContentType = Serializer.ContentType + ContentType = Serializer.ContentType, + TopicName = topicName }; if (Sender.Endpoint is LocalQueue) diff --git a/src/Wolverine/Runtime/Routing/MessageRouter.cs b/src/Wolverine/Runtime/Routing/MessageRouter.cs index 3861330c1..66cb3ca13 100644 --- a/src/Wolverine/Runtime/Routing/MessageRouter.cs +++ b/src/Wolverine/Runtime/Routing/MessageRouter.cs @@ -37,7 +37,7 @@ public override Envelope[] RouteForPublish(T message, DeliveryOptions? options) var envelopes = new Envelope[Routes.Length]; for (var i = 0; i < envelopes.Length; i++) { - envelopes[i] = Routes[i].CreateForSending(message, options, LocalDurableQueue, Runtime); + envelopes[i] = Routes[i].CreateForSending(message, options, LocalDurableQueue, Runtime, null); } return envelopes; diff --git a/src/Wolverine/Runtime/Routing/MessageRouterBase.cs b/src/Wolverine/Runtime/Routing/MessageRouterBase.cs index 39af60b04..15bff127b 100644 --- a/src/Wolverine/Runtime/Routing/MessageRouterBase.cs +++ b/src/Wolverine/Runtime/Routing/MessageRouterBase.cs @@ -72,7 +72,7 @@ public Envelope RouteToDestination(T message, Uri uri, DeliveryOptions? options) } return RouteForUri(uri) - .CreateForSending(message, options, LocalDurableQueue, Runtime); + .CreateForSending(message, options, LocalDurableQueue, Runtime, null); } public IMessageRoute RouteForUri(Uri destination) @@ -104,8 +104,7 @@ public Envelope[] RouteToTopic(T message, string topicName, DeliveryOptions? opt var envelopes = new Envelope[_topicRoutes.Length]; for (var i = 0; i < envelopes.Length; i++) { - envelopes[i] = _topicRoutes[i].CreateForSending(message, options, LocalDurableQueue, Runtime); - envelopes[i].TopicName = topicName; + envelopes[i] = _topicRoutes[i].CreateForSending(message, options, LocalDurableQueue, Runtime, topicName); } return envelopes; diff --git a/src/Wolverine/Runtime/Routing/NoNamedEndpointRoute.cs b/src/Wolverine/Runtime/Routing/NoNamedEndpointRoute.cs index 62d66ec22..8642c5341 100644 --- a/src/Wolverine/Runtime/Routing/NoNamedEndpointRoute.cs +++ b/src/Wolverine/Runtime/Routing/NoNamedEndpointRoute.cs @@ -18,7 +18,7 @@ public NoNamedEndpointRoute(string endpointName, string[] allNames) public string EndpointName { get; } public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, - WolverineRuntime runtime) + WolverineRuntime runtime, string? topicName) { throw new UnknownEndpointException(_message); }