From c70402ea047e91e9e791bd56e2ef2c681c34c64e Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 13 Nov 2023 11:43:03 -0600 Subject: [PATCH] Better Marten event subscriptions. Closes GH-132 --- .../Marten/event_streaming.cs | 47 ++++++++-- .../Wolverine.Marten/MartenIntegration.cs | 85 ++++++++++++++++++- .../PublishIncomingEventsBeforeCommit.cs | 2 +- .../WolverineOptionsMartenExtensions.cs | 26 +++++- .../Runtime/Routing/IMessageRoute.cs | 32 +++++++ .../Runtime/WolverineRuntime.Routing.cs | 10 +-- ...WolverineOptions.MessageTransformations.cs | 66 ++++++++++++++ 7 files changed, 247 insertions(+), 21 deletions(-) create mode 100644 src/Wolverine/WolverineOptions.MessageTransformations.cs diff --git a/src/Persistence/PersistenceTests/Marten/event_streaming.cs b/src/Persistence/PersistenceTests/Marten/event_streaming.cs index 44684333b..140c71f29 100644 --- a/src/Persistence/PersistenceTests/Marten/event_streaming.cs +++ b/src/Persistence/PersistenceTests/Marten/event_streaming.cs @@ -1,6 +1,7 @@ using IntegrationTests; using JasperFx.Core; using Marten; +using Marten.Events; using Microsoft.Extensions.Hosting; using Oakton.Resources; using Shouldly; @@ -8,6 +9,7 @@ using Wolverine; using Wolverine.Attributes; using Wolverine.Marten; +using Wolverine.Tracking; using Wolverine.Transports.Tcp; using Wolverine.Util; using Xunit; @@ -24,7 +26,11 @@ public async Task InitializeAsync() var receiverPort = PortFinder.GetAvailablePort(); theReceiver = await Host.CreateDefaultBuilder() - .UseWolverine(opts => opts.ListenAtPort(receiverPort)) + .UseWolverine(opts => + { + opts.ListenAtPort(receiverPort); + opts.Durability.Mode = DurabilityMode.Solo; + }) .ConfigureServices(services => { services.AddMarten(Servers.PostgresConnectionString) @@ -40,11 +46,16 @@ public async Task InitializeAsync() { opts.PublishAllMessages().ToPort(receiverPort).UseDurableOutbox(); opts.DisableConventionalDiscovery().IncludeType(); + opts.Durability.Mode = DurabilityMode.Solo; + opts.ServiceName = "sender"; }) .ConfigureServices(services => { services.AddMarten(Servers.PostgresConnectionString) - .IntegrateWithWolverine("sender").EventForwardingToWolverine(); + .IntegrateWithWolverine("sender").EventForwardingToWolverine(opts => + { + opts.SubscribeToEvent().TransformedTo(e => new SecondMessage(e.Sequence)); + }); services.AddResourceSetupOnStartup(); }).StartAsync(); @@ -69,16 +80,18 @@ public async Task event_should_be_published_from_sender_to_receiver() { var command = new TriggerCommand(); - var waiter = TriggerEventHandler.Waiter; - - await theSender.InvokeAsync(command); + var results = await theSender.TrackActivity().AlsoTrack(theReceiver).InvokeMessageAndWaitAsync(command); - var @event = await waiter.TimeoutAfterAsync(5000); - - @event.Id.ShouldBe(command.Id); + var triggered = results.Received.SingleMessage(); + triggered.ShouldNotBeNull(); + triggered.Id.ShouldBe(command.Id); + + results.Received.SingleMessage() + .Sequence.ShouldBeGreaterThan(0); } } + public class TriggerCommand { public Guid Id { get; set; } = Guid.NewGuid(); @@ -89,10 +102,24 @@ public class TriggerHandler [Transactional] public void Handle(TriggerCommand command, IDocumentSession session) { - session.Events.StartStream(command.Id, new TriggeredEvent { Id = command.Id }); + session.Events.StartStream(command.Id, new TriggeredEvent { Id = command.Id }, new SecondEvent(), new ThirdEvent()); } + + public void Handle(IEvent e) + { + + } +} + +public record SecondMessage(long Sequence); + +public class SecondEvent +{ + } +public class ThirdEvent{} + public class TriggeredEvent { public Guid Id { get; set; } @@ -107,4 +134,6 @@ public void Handle(TriggeredEvent message) { _source.SetResult(message); } + + public void Handle(SecondMessage message){} } \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/MartenIntegration.cs b/src/Persistence/Wolverine.Marten/MartenIntegration.cs index 9b492551b..bb93889e7 100644 --- a/src/Persistence/Wolverine.Marten/MartenIntegration.cs +++ b/src/Persistence/Wolverine.Marten/MartenIntegration.cs @@ -1,11 +1,17 @@ +using JasperFx.Core.Reflection; +using Marten.Events; using Wolverine.Marten.Codegen; using Wolverine.Marten.Persistence.Sagas; using Wolverine.Persistence.Sagas; +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; namespace Wolverine.Marten; -internal class MartenIntegration : IWolverineExtension +internal class MartenIntegration : IWolverineExtension, IEventForwarding { + private readonly List> _actions = new(); + /// /// This directs the Marten integration to try to publish events out of the enrolled outbox /// for a Marten session on SaveChangesAsync() @@ -23,5 +29,82 @@ public void Configure(WolverineOptions options) options.Policies.Add(); options.Discovery.CustomizeHandlerDiscovery(x => x.Includes.WithAttribute()); + + options.PublishWithMessageRoutingSource(EventRouter); + } + + internal MartenEventRouter EventRouter { get; } = new(); + + EventForwardingTransform IEventForwarding.SubscribeToEvent() + { + return new EventForwardingTransform(EventRouter); + } +} + +internal class MartenEventRouter : IMessageRouteSource +{ + + + public IEnumerable FindRoutes(Type messageType, IWolverineRuntime runtime) + { + if (messageType.Closes(typeof(IEvent<>))) + { + var eventType = messageType.GetGenericArguments().Single(); + var wrappedType = typeof(IEvent<>).MakeGenericType(eventType); + + // First look for explicit transformations + var transformers = Transformers.Where(x => x.SourceType == wrappedType); + var transformed = transformers.SelectMany(x => + runtime.RoutingFor(x.DestinationType).Routes.Select(x.CreateRoute)); + + var forInner = runtime.RoutingFor(eventType).Routes.Select(route => + typeof(EventUnwrappingMessageRoute<>).CloseAndBuildAs(route, eventType)); + + var locals = new LocalRouting().FindRoutes(wrappedType, runtime); + + + var candidates = forInner.Concat(transformed).Concat(locals).ToArray(); + return candidates; + } + else + { + return Array.Empty(); + } + } + + public bool IsAdditive => false; + public List Transformers { get; } = new(); +} + +internal class EventUnwrappingMessageRoute : TransformedMessageRoute, T> +{ + public EventUnwrappingMessageRoute(IMessageRoute inner) : base(e => e.Data, inner) + { + } +} + +public interface IEventForwarding +{ + /// + /// Subscribe to an event, but with a transformation. The transformed message will be + /// published to Wolverine with its normal routing rules + /// + /// + EventForwardingTransform SubscribeToEvent(); +} + +public class EventForwardingTransform +{ + private readonly MartenEventRouter _martenEventWrapper; + + internal EventForwardingTransform(MartenEventRouter martenEventWrapper) + { + _martenEventWrapper = martenEventWrapper; + } + + public void TransformedTo(Func, TDestination> transformer) + { + var transformation = new MessageTransformation, TDestination>(transformer); + _martenEventWrapper.Transformers.Add(transformation); } } \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/PublishIncomingEventsBeforeCommit.cs b/src/Persistence/Wolverine.Marten/PublishIncomingEventsBeforeCommit.cs index 45e0ae413..e65fa0d97 100644 --- a/src/Persistence/Wolverine.Marten/PublishIncomingEventsBeforeCommit.cs +++ b/src/Persistence/Wolverine.Marten/PublishIncomingEventsBeforeCommit.cs @@ -15,7 +15,7 @@ public PublishIncomingEventsBeforeCommit(IMessageContext bus) public override async Task BeforeSaveChangesAsync(IDocumentSession session, CancellationToken token) { - var events = session.PendingChanges.As().GetEvents().Select(x => x.Data).ToArray(); + var events = session.PendingChanges.As().GetEvents().ToArray(); if (events.Any()) { diff --git a/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs b/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs index 2b53ae6f5..f5104466b 100644 --- a/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs +++ b/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs @@ -155,4 +155,28 @@ public static MartenServiceCollectionExtensions.MartenConfigurationExpression Ev return expression; } -} \ No newline at end of file + + /// + /// Enable publishing of events to Wolverine message routing when captured in Marten sessions that are enrolled in a + /// Wolverine outbox + /// + /// + /// + public static MartenServiceCollectionExtensions.MartenConfigurationExpression EventForwardingToWolverine( + this MartenServiceCollectionExtensions.MartenConfigurationExpression expression, Action configure) + { + var integration = expression.Services.FindMartenIntegration(); + if (integration == null) + { + expression.IntegrateWithWolverine(); + integration = expression.Services.FindMartenIntegration(); + } + + integration!.ShouldPublishEvents = true; + + configure(integration); + + return expression; + } +} + diff --git a/src/Wolverine/Runtime/Routing/IMessageRoute.cs b/src/Wolverine/Runtime/Routing/IMessageRoute.cs index 9f9e1006a..5f67b8fbd 100644 --- a/src/Wolverine/Runtime/Routing/IMessageRoute.cs +++ b/src/Wolverine/Runtime/Routing/IMessageRoute.cs @@ -1,3 +1,4 @@ +using JasperFx.Core.Reflection; using Wolverine.Transports.Sending; namespace Wolverine.Runtime.Routing; @@ -7,4 +8,35 @@ public interface IMessageRoute Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, WolverineRuntime runtime); +} + +internal class TransformedMessageRouteSource : IMessageRouteSource +{ + public IEnumerable FindRoutes(Type messageType, IWolverineRuntime runtime) + { + var transformations = runtime.Options.MessageTransformations.Where(x => x.SourceType == messageType); + return transformations.SelectMany(t => runtime.RoutingFor(t.DestinationType).Routes.Select(t.CreateRoute)).ToArray(); + + } + + public bool IsAdditive => true; +} + +internal class TransformedMessageRoute : IMessageRoute +{ + private readonly Func _transformation; + private readonly IMessageRoute _inner; + + public TransformedMessageRoute(Func transformation, IMessageRoute inner) + { + _transformation = transformation; + _inner = inner; + } + + public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, + WolverineRuntime runtime) + { + var transformed = _transformation((TSource)message); + return _inner.CreateForSending(transformed!, options, localDurableQueue, runtime); + } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs index 206679617..dae855e71 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs @@ -61,14 +61,6 @@ public partial class WolverineRuntime { private ImHashMap _messageTypeRouting = ImHashMap.Empty; - // TODO -- expand this later so you can track routing source - private readonly List _routeSources = new() - { - new ExplicitRouting(), - new LocalRouting(), - new MessageRoutingConventions() - }; - public IMessageRouter RoutingFor(Type messageType) { @@ -97,7 +89,7 @@ public IMessageRouter RoutingFor(Type messageType) private List findRoutes(Type messageType) { var routes = new List(); - foreach (var source in _routeSources) + foreach (var source in Options.RouteSources()) { routes.AddRange(source.FindRoutes(messageType, this)); diff --git a/src/Wolverine/WolverineOptions.MessageTransformations.cs b/src/Wolverine/WolverineOptions.MessageTransformations.cs new file mode 100644 index 000000000..307661756 --- /dev/null +++ b/src/Wolverine/WolverineOptions.MessageTransformations.cs @@ -0,0 +1,66 @@ +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; + +namespace Wolverine; + +public sealed partial class WolverineOptions +{ + internal readonly List MessageTransformations = new(); + + internal readonly List InternalRouteSources = new() + { + new TransformedMessageRouteSource(), + new ExplicitRouting(), + new LocalRouting(), + new MessageRoutingConventions() + }; + + internal readonly List CustomRouteSources = new(); + + internal IEnumerable RouteSources() + { + foreach (var routeSource in CustomRouteSources) + { + yield return routeSource; + } + + foreach (var routeSource in InternalRouteSources) + { + yield return routeSource; + } + } + + + /// + /// Advanced usage of Wolverine to register programmatic message routing rules + /// + /// + public void PublishWithMessageRoutingSource(IMessageRouteSource messageRouteSource) + { + CustomRouteSources.Add(messageRouteSource); + } +} + +internal interface IMessageTransformation +{ + Type SourceType { get; } + Type DestinationType { get; } + IMessageRoute CreateRoute(IMessageRoute inner); +} + +internal class MessageTransformation : IMessageTransformation +{ + private readonly Func _transformation; + + public MessageTransformation(Func transformation) + { + _transformation = transformation; + } + + public Type SourceType => typeof(TSource); + public Type DestinationType => typeof(TDestination); + public IMessageRoute CreateRoute(IMessageRoute inner) + { + return new TransformedMessageRoute(_transformation, inner); + } +} \ No newline at end of file