Skip to content

Commit

Permalink
Better Marten event subscriptions. Closes GH-132
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Nov 13, 2023
1 parent f860b17 commit c70402e
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 21 deletions.
47 changes: 38 additions & 9 deletions src/Persistence/PersistenceTests/Marten/event_streaming.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using IntegrationTests;
using JasperFx.Core;
using Marten;
using Marten.Events;
using Microsoft.Extensions.Hosting;
using Oakton.Resources;
using Shouldly;
using TestingSupport;
using Wolverine;
using Wolverine.Attributes;
using Wolverine.Marten;
using Wolverine.Tracking;
using Wolverine.Transports.Tcp;
using Wolverine.Util;
using Xunit;
Expand All @@ -24,7 +26,11 @@ public async Task InitializeAsync()
var receiverPort = PortFinder.GetAvailablePort();

theReceiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts => opts.ListenAtPort(receiverPort))
.UseWolverine(opts =>
{
opts.ListenAtPort(receiverPort);
opts.Durability.Mode = DurabilityMode.Solo;
})
.ConfigureServices(services =>
{
services.AddMarten(Servers.PostgresConnectionString)
Expand All @@ -40,11 +46,16 @@ public async Task InitializeAsync()
{
opts.PublishAllMessages().ToPort(receiverPort).UseDurableOutbox();
opts.DisableConventionalDiscovery().IncludeType<TriggerHandler>();
opts.Durability.Mode = DurabilityMode.Solo;
opts.ServiceName = "sender";
})
.ConfigureServices(services =>
{
services.AddMarten(Servers.PostgresConnectionString)
.IntegrateWithWolverine("sender").EventForwardingToWolverine();
.IntegrateWithWolverine("sender").EventForwardingToWolverine(opts =>
{
opts.SubscribeToEvent<SecondEvent>().TransformedTo(e => new SecondMessage(e.Sequence));
});

services.AddResourceSetupOnStartup();
}).StartAsync();
Expand All @@ -69,16 +80,18 @@ public async Task event_should_be_published_from_sender_to_receiver()
{
var command = new TriggerCommand();

var waiter = TriggerEventHandler.Waiter;

await theSender.InvokeAsync(command);
var results = await theSender.TrackActivity().AlsoTrack(theReceiver).InvokeMessageAndWaitAsync(command);

var @event = await waiter.TimeoutAfterAsync(5000);

@event.Id.ShouldBe(command.Id);
var triggered = results.Received.SingleMessage<TriggeredEvent>();
triggered.ShouldNotBeNull();
triggered.Id.ShouldBe(command.Id);

results.Received.SingleMessage<SecondMessage>()
.Sequence.ShouldBeGreaterThan(0);
}
}


public class TriggerCommand
{
public Guid Id { get; set; } = Guid.NewGuid();
Expand All @@ -89,10 +102,24 @@ public class TriggerHandler
[Transactional]
public void Handle(TriggerCommand command, IDocumentSession session)
{
session.Events.StartStream(command.Id, new TriggeredEvent { Id = command.Id });
session.Events.StartStream(command.Id, new TriggeredEvent { Id = command.Id }, new SecondEvent(), new ThirdEvent());
}

public void Handle(IEvent<ThirdEvent> e)
{

}
}

public record SecondMessage(long Sequence);

public class SecondEvent
{

}

public class ThirdEvent{}

public class TriggeredEvent
{
public Guid Id { get; set; }
Expand All @@ -107,4 +134,6 @@ public void Handle(TriggeredEvent message)
{
_source.SetResult(message);
}

public void Handle(SecondMessage message){}
}
85 changes: 84 additions & 1 deletion src/Persistence/Wolverine.Marten/MartenIntegration.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
using JasperFx.Core.Reflection;
using Marten.Events;
using Wolverine.Marten.Codegen;
using Wolverine.Marten.Persistence.Sagas;
using Wolverine.Persistence.Sagas;
using Wolverine.Runtime;
using Wolverine.Runtime.Routing;

namespace Wolverine.Marten;

internal class MartenIntegration : IWolverineExtension
internal class MartenIntegration : IWolverineExtension, IEventForwarding
{
private readonly List<Action<WolverineOptions>> _actions = new();

/// <summary>
/// This directs the Marten integration to try to publish events out of the enrolled outbox
/// for a Marten session on SaveChangesAsync()
Expand All @@ -23,5 +29,82 @@ public void Configure(WolverineOptions options)
options.Policies.Add<MartenAggregateHandlerStrategy>();

options.Discovery.CustomizeHandlerDiscovery(x => x.Includes.WithAttribute<AggregateHandlerAttribute>());

options.PublishWithMessageRoutingSource(EventRouter);
}

internal MartenEventRouter EventRouter { get; } = new();

EventForwardingTransform<T> IEventForwarding.SubscribeToEvent<T>()
{
return new EventForwardingTransform<T>(EventRouter);
}
}

internal class MartenEventRouter : IMessageRouteSource
{


public IEnumerable<IMessageRoute> FindRoutes(Type messageType, IWolverineRuntime runtime)
{
if (messageType.Closes(typeof(IEvent<>)))
{
var eventType = messageType.GetGenericArguments().Single();
var wrappedType = typeof(IEvent<>).MakeGenericType(eventType);

// First look for explicit transformations
var transformers = Transformers.Where(x => x.SourceType == wrappedType);
var transformed = transformers.SelectMany(x =>
runtime.RoutingFor(x.DestinationType).Routes.Select(x.CreateRoute));

var forInner = runtime.RoutingFor(eventType).Routes.Select(route =>
typeof(EventUnwrappingMessageRoute<>).CloseAndBuildAs<IMessageRoute>(route, eventType));

var locals = new LocalRouting().FindRoutes(wrappedType, runtime);


var candidates = forInner.Concat(transformed).Concat(locals).ToArray();
return candidates;
}
else
{
return Array.Empty<IMessageRoute>();
}
}

public bool IsAdditive => false;
public List<IMessageTransformation> Transformers { get; } = new();
}

internal class EventUnwrappingMessageRoute<T> : TransformedMessageRoute<IEvent<T>, T>
{
public EventUnwrappingMessageRoute(IMessageRoute inner) : base(e => e.Data, inner)
{
}
}

public interface IEventForwarding
{
/// <summary>
/// Subscribe to an event, but with a transformation. The transformed message will be
/// published to Wolverine with its normal routing rules
/// </summary>
/// <typeparam name="T"></typeparam>
EventForwardingTransform<T> SubscribeToEvent<T>();
}

public class EventForwardingTransform<TSource>
{
private readonly MartenEventRouter _martenEventWrapper;

internal EventForwardingTransform(MartenEventRouter martenEventWrapper)
{
_martenEventWrapper = martenEventWrapper;
}

public void TransformedTo<TDestination>(Func<IEvent<TSource>, TDestination> transformer)
{
var transformation = new MessageTransformation<IEvent<TSource>, TDestination>(transformer);
_martenEventWrapper.Transformers.Add(transformation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public PublishIncomingEventsBeforeCommit(IMessageContext bus)

public override async Task BeforeSaveChangesAsync(IDocumentSession session, CancellationToken token)
{
var events = session.PendingChanges.As<IChangeSet>().GetEvents().Select(x => x.Data).ToArray();
var events = session.PendingChanges.As<IChangeSet>().GetEvents().ToArray();

if (events.Any())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,28 @@ public static MartenServiceCollectionExtensions.MartenConfigurationExpression Ev

return expression;
}
}

/// <summary>
/// Enable publishing of events to Wolverine message routing when captured in Marten sessions that are enrolled in a
/// Wolverine outbox
/// </summary>
/// <param name="expression"></param>
/// <returns></returns>
public static MartenServiceCollectionExtensions.MartenConfigurationExpression EventForwardingToWolverine(
this MartenServiceCollectionExtensions.MartenConfigurationExpression expression, Action<IEventForwarding> configure)
{
var integration = expression.Services.FindMartenIntegration();
if (integration == null)
{
expression.IntegrateWithWolverine();
integration = expression.Services.FindMartenIntegration();
}

integration!.ShouldPublishEvents = true;

configure(integration);

return expression;
}
}

32 changes: 32 additions & 0 deletions src/Wolverine/Runtime/Routing/IMessageRoute.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Core.Reflection;
using Wolverine.Transports.Sending;

namespace Wolverine.Runtime.Routing;
Expand All @@ -7,4 +8,35 @@ public interface IMessageRoute
Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue,
WolverineRuntime runtime);

}

internal class TransformedMessageRouteSource : IMessageRouteSource
{
public IEnumerable<IMessageRoute> FindRoutes(Type messageType, IWolverineRuntime runtime)
{
var transformations = runtime.Options.MessageTransformations.Where(x => x.SourceType == messageType);
return transformations.SelectMany(t => runtime.RoutingFor(t.DestinationType).Routes.Select(t.CreateRoute)).ToArray();

}

public bool IsAdditive => true;
}

internal class TransformedMessageRoute<TSource, TDestination> : IMessageRoute
{
private readonly Func<TSource, TDestination> _transformation;
private readonly IMessageRoute _inner;

public TransformedMessageRoute(Func<TSource, TDestination> transformation, IMessageRoute inner)
{
_transformation = transformation;
_inner = inner;
}

public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue,
WolverineRuntime runtime)
{
var transformed = _transformation((TSource)message);
return _inner.CreateForSending(transformed!, options, localDurableQueue, runtime);
}
}
10 changes: 1 addition & 9 deletions src/Wolverine/Runtime/WolverineRuntime.Routing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,6 @@ public partial class WolverineRuntime
{
private ImHashMap<Type, IMessageRouter> _messageTypeRouting = ImHashMap<Type, IMessageRouter>.Empty;

// TODO -- expand this later so you can track routing source
private readonly List<IMessageRouteSource> _routeSources = new()
{
new ExplicitRouting(),
new LocalRouting(),
new MessageRoutingConventions()
};


public IMessageRouter RoutingFor(Type messageType)
{
Expand Down Expand Up @@ -97,7 +89,7 @@ public IMessageRouter RoutingFor(Type messageType)
private List<IMessageRoute> findRoutes(Type messageType)
{
var routes = new List<IMessageRoute>();
foreach (var source in _routeSources)
foreach (var source in Options.RouteSources())
{
routes.AddRange(source.FindRoutes(messageType, this));

Expand Down
66 changes: 66 additions & 0 deletions src/Wolverine/WolverineOptions.MessageTransformations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using Wolverine.Runtime;
using Wolverine.Runtime.Routing;

namespace Wolverine;

public sealed partial class WolverineOptions
{
internal readonly List<IMessageTransformation> MessageTransformations = new();

internal readonly List<IMessageRouteSource> InternalRouteSources = new()
{
new TransformedMessageRouteSource(),
new ExplicitRouting(),
new LocalRouting(),
new MessageRoutingConventions()
};

internal readonly List<IMessageRouteSource> CustomRouteSources = new();

internal IEnumerable<IMessageRouteSource> RouteSources()
{
foreach (var routeSource in CustomRouteSources)
{
yield return routeSource;
}

foreach (var routeSource in InternalRouteSources)
{
yield return routeSource;
}
}


/// <summary>
/// Advanced usage of Wolverine to register programmatic message routing rules
/// </summary>
/// <param name="messageRouteSource"></param>
public void PublishWithMessageRoutingSource(IMessageRouteSource messageRouteSource)
{
CustomRouteSources.Add(messageRouteSource);
}
}

internal interface IMessageTransformation
{
Type SourceType { get; }
Type DestinationType { get; }
IMessageRoute CreateRoute(IMessageRoute inner);
}

internal class MessageTransformation<TSource, TDestination> : IMessageTransformation
{
private readonly Func<TSource, TDestination> _transformation;

public MessageTransformation(Func<TSource, TDestination> transformation)
{
_transformation = transformation;
}

public Type SourceType => typeof(TSource);
public Type DestinationType => typeof(TDestination);
public IMessageRoute CreateRoute(IMessageRoute inner)
{
return new TransformedMessageRoute<TSource, TDestination>(_transformation, inner);
}
}

0 comments on commit c70402e

Please sign in to comment.