diff --git a/src/NServiceBus.Router/Pipeline/Forwarding/1_ForwardPublishContext.cs b/src/NServiceBus.Router/Pipeline/Forwarding/1_ForwardPublishContext.cs index 1d64092..ce049a0 100644 --- a/src/NServiceBus.Router/Pipeline/Forwarding/1_ForwardPublishContext.cs +++ b/src/NServiceBus.Router/Pipeline/Forwarding/1_ForwardPublishContext.cs @@ -7,6 +7,9 @@ /// public class ForwardPublishContext : BaseForwardRuleContext { + internal bool Forwarded; + internal bool Dropped; + /// /// Creates new instance. /// @@ -38,5 +41,22 @@ public ForwardPublishContext(string outgoingInterface, Type rootType, PublishPre /// The headers associated with the received message. /// public byte[] ReceivedBody { get; } + + /// + /// Mark this message as forwarded. + /// + /// + public void MarkForwarded() + { + Forwarded = true; + } + + /// + /// Marks this message as OK to be dropped if no chain terminator forwards it. + /// + public void DoNotRequireThisMessageToBeForwarded() + { + Dropped = true; + } } } \ No newline at end of file diff --git a/src/NServiceBus.Router/Pipeline/Postrouting/PostroutingVerificationRule.cs b/src/NServiceBus.Router/Pipeline/Postrouting/PostroutingVerificationRule.cs deleted file mode 100644 index 2aa8e97..0000000 --- a/src/NServiceBus.Router/Pipeline/Postrouting/PostroutingVerificationRule.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System; -using System.Threading.Tasks; -using NServiceBus.Router; - -class PostroutingVerificationRule : IRule -{ - public async Task Invoke(PostroutingContext context, Func next) - { - await next(context).ConfigureAwait(false); - - if (context.Extensions.TryGet(out var state)) - { - foreach (var message in context.Messages) - { - state.MessageSent(message.Message.MessageId); - } - } - } - - public class State - { - string incomingMessageId; - - public State(string incomingMessageId) - { - this.incomingMessageId = incomingMessageId; - } - - public bool HasBeenForwarded { get; private set; } - - public void MessageSent(string messageId) - { - if (messageId == incomingMessageId) - { - HasBeenForwarded = true; - } - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.Router/Pipeline/Prerouting/PublishPreroutingTerminator.cs b/src/NServiceBus.Router/Pipeline/Prerouting/PublishPreroutingTerminator.cs index 0f4a0d7..e62e744 100644 --- a/src/NServiceBus.Router/Pipeline/Prerouting/PublishPreroutingTerminator.cs +++ b/src/NServiceBus.Router/Pipeline/Prerouting/PublishPreroutingTerminator.cs @@ -17,26 +17,23 @@ protected override async Task Terminate(PublishPreroutingContext context) { var outgoingInterfaces = allInterfaces.Where(i => i != context.IncomingInterface); - var verificationState = new PostroutingVerificationRule.State(context.MessageId); - var interfaces = context.Extensions.Get(); var forkTasks = outgoingInterfaces - .Select(iface => + .Select(async iface => { var chains = interfaces.GetChainsFor(iface); var chain = chains.Get(); var forwardPublishContext = new ForwardPublishContext(iface, typeGenerator.GetType(context.Types.First()), context); - forwardPublishContext.Extensions.Set(verificationState); - return chain.Invoke(forwardPublishContext); - }); + await chain.Invoke(forwardPublishContext).ConfigureAwait(false); - await Task.WhenAll(forkTasks).ConfigureAwait(false); + return forwardPublishContext.Dropped || forwardPublishContext.Forwarded; + }); - if (!verificationState.HasBeenForwarded) - { + var results = await Task.WhenAll(forkTasks).ConfigureAwait(false); + if (!results.Any()) + { throw new UnforwardableMessageException($"The incoming message {context.MessageId} has not been forwarded. This might indicate a configuration problem."); } - return true; } } \ No newline at end of file diff --git a/src/NServiceBus.Router/Router.cs b/src/NServiceBus.Router/Router.cs index 9681f75..5e31889 100644 --- a/src/NServiceBus.Router/Router.cs +++ b/src/NServiceBus.Router/Router.cs @@ -62,7 +62,6 @@ public static IRouter Create(RouterConfiguration config) chains.AddChain(cb => cb.Begin().Terminate()); chains.AddRule(c => new PublishPreroutingTerminator(interfaces.Select(i => i.Name).ToArray(), c.TypeGenerator)); chains.AddChain(cb => cb.Begin().Terminate()); - chains.AddRule(c => new PostroutingVerificationRule()); chains.AddRule(_ => new PreroutingToReplyPreroutingFork()); chains.AddChain(cb => cb.Begin().Terminate());