diff --git a/src/AcceptanceTesting/Infrastructure/RouterComponent.cs b/src/AcceptanceTesting/Infrastructure/RouterComponent.cs index 85ffb7d..ed6c1f2 100644 --- a/src/AcceptanceTesting/Infrastructure/RouterComponent.cs +++ b/src/AcceptanceTesting/Infrastructure/RouterComponent.cs @@ -39,23 +39,28 @@ Interface NewFactory() class Runner : ComponentRunner { - IRouter @switch; + IRouter router; - public Runner(IRouter @switch, string name) + public Runner(IRouter router, string name) { - this.@switch = @switch; + this.router = router; Name = name; } public override Task Start(CancellationToken token) { - return @switch.Start(); + return router.Initialize(); + } + + public override Task ComponentsStarted(CancellationToken token) + { + return router.Start(); } public override Task Stop() { - return @switch != null - ? @switch.Stop() + return router != null + ? router.Stop() : Task.CompletedTask; } diff --git a/src/AcceptanceTesting/TestTransport/TestTransportInfrastructure.cs b/src/AcceptanceTesting/TestTransport/TestTransportInfrastructure.cs index bc84ff1..3b7f56a 100644 --- a/src/AcceptanceTesting/TestTransport/TestTransportInfrastructure.cs +++ b/src/AcceptanceTesting/TestTransport/TestTransportInfrastructure.cs @@ -70,7 +70,8 @@ string FindSolutionRoot() public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() { - return new TransportReceiveInfrastructure(() => new TestTransportMessagePump(storagePath), () => new TestTransportQueueCreator(), () => Task.FromResult(StartupCheckResult.Success)); + return new TransportReceiveInfrastructure(() => new TestTransportMessagePump(storagePath), + () => new TestTransportQueueCreator(storagePath), () => Task.FromResult(StartupCheckResult.Success)); } public override TransportSendInfrastructure ConfigureSendInfrastructure() diff --git a/src/AcceptanceTesting/TestTransport/TestTransportQueueCreator.cs b/src/AcceptanceTesting/TestTransport/TestTransportQueueCreator.cs index c9ca494..17faaad 100644 --- a/src/AcceptanceTesting/TestTransport/TestTransportQueueCreator.cs +++ b/src/AcceptanceTesting/TestTransport/TestTransportQueueCreator.cs @@ -1,10 +1,33 @@ namespace NServiceBus { + using System.IO; using System.Threading.Tasks; using Transport; class TestTransportQueueCreator : ICreateQueues { - public Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity) => Task.CompletedTask; + readonly string storagePath; + + public TestTransportQueueCreator(string storagePath) + { + this.storagePath = storagePath; + } + + public Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity) + { + foreach (var queueBinding in queueBindings.ReceivingAddresses) + { + var queuePath = Path.Combine(storagePath, queueBinding); + Directory.CreateDirectory(queuePath); + } + + foreach (var queueBinding in queueBindings.SendingAddresses) + { + var queuePath = Path.Combine(storagePath, queueBinding); + Directory.CreateDirectory(queuePath); + } + + return Task.CompletedTask; + } } } \ No newline at end of file diff --git a/src/NServiceBus.Router.AcceptanceTests/SingleRouter/When_intentionally_dropping_messages_in_prerouting.cs b/src/NServiceBus.Router.AcceptanceTests/SingleRouter/When_intentionally_dropping_messages_in_prerouting.cs new file mode 100644 index 0000000..29d19e5 --- /dev/null +++ b/src/NServiceBus.Router.AcceptanceTests/SingleRouter/When_intentionally_dropping_messages_in_prerouting.cs @@ -0,0 +1,87 @@ +using System.Threading.Tasks; +using NServiceBus.AcceptanceTesting; +using NServiceBus.AcceptanceTests; +using NServiceBus.AcceptanceTests.EndpointTemplates; +using NUnit.Framework; + +namespace NServiceBus.Router.AcceptanceTests.SingleRouter +{ + using System; + using Pipeline; + + [TestFixture] + public class When_intentionally_dropping_messages_in_prerouting : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_not_complain() + { + var result = await Scenario.Define() + .WithRouter("Router", (ctx, cfg) => + { + cfg.AddInterface("Left", t => t.BrokerAlpha()).InMemorySubscriptions(); + cfg.AddInterface("Right", t => t.BrokerBravo()).InMemorySubscriptions(); + + cfg.UseStaticRoutingProtocol().AddForwardRoute("Left", "Right"); + cfg.AddRule(_ => new DropMessagesRule(ctx)); + }) + .WithEndpoint(c => c.When(s => s.Send(new MyRequest()))) + .Done(c => c.Dropped) + .Run(TimeSpan.FromSeconds(20)); + + Assert.IsTrue(result.Dropped); + } + + class DropMessagesRule : IRule + { + Context scenarioContext; + + public DropMessagesRule(Context scenarioContext) + { + this.scenarioContext = scenarioContext; + } + + public async Task Invoke(PreroutingContext context, Func next) + { + context.DoNotRequireThisMessageToBeForwarded(); + await next(context); + scenarioContext.Dropped = true; + } + } + + class Context : ScenarioContext + { + public bool Dropped { get; set; } + } + + class Sender : EndpointConfigurationBuilder + { + public Sender() + { + EndpointSetup(c => + { + var routing = c.UseTransport().BrokerAlpha().Routing(); + var router = routing.ConnectToRouter("Router"); + router.DelegateRouting(typeof(MyRequest)); + c.Pipeline.Register(new RemoveIntentBehavior(), "Remove message intent header"); + }); + } + + class RemoveIntentBehavior : Behavior + { + public override Task Invoke(IDispatchContext context, Func next) + { + foreach (var operation in context.Operations) + { + operation.Message.Headers.Remove(Headers.MessageIntent); + } + + return next(); + } + } + } + + class MyRequest : IMessage + { + } + } +} diff --git a/src/NServiceBus.Router/IRouter.cs b/src/NServiceBus.Router/IRouter.cs index 82fb9a9..416e293 100644 --- a/src/NServiceBus.Router/IRouter.cs +++ b/src/NServiceBus.Router/IRouter.cs @@ -8,7 +8,13 @@ namespace NServiceBus.Router public interface IRouter { /// - /// Starts the router. + /// Initializes the router. + /// + /// + Task Initialize(); + + /// + /// Initializes and starts the router. /// Task Start(); diff --git a/src/NServiceBus.Router/Pipeline/Prerouting/2_PreroutingContext.cs b/src/NServiceBus.Router/Pipeline/Prerouting/2_PreroutingContext.cs index bdb396d..2184e88 100644 --- a/src/NServiceBus.Router/Pipeline/Prerouting/2_PreroutingContext.cs +++ b/src/NServiceBus.Router/Pipeline/Prerouting/2_PreroutingContext.cs @@ -10,19 +10,22 @@ /// public class PreroutingContext : BasePreroutingContext { + internal bool Forwarded; + internal bool Dropped; + internal PreroutingContext(RawContext parent) : base(parent) { Body = parent.Body; - Intent = GetMesssageIntent(parent.Headers); + Intent = GetMessageIntent(parent.Headers); } - static MessageIntentEnum? GetMesssageIntent(IReadOnlyDictionary headers) + static MessageIntentEnum? GetMessageIntent(IReadOnlyDictionary headers) { - var messageIntent = default(MessageIntentEnum); if (headers.TryGetValue(NServiceBus.Headers.MessageIntent, out var messageIntentString)) { - Enum.TryParse(messageIntentString, true, out messageIntent); + Enum.TryParse(messageIntentString, true, out var messageIntent); + return messageIntent; } - return messageIntent; + return null; } /// @@ -34,5 +37,22 @@ internal PreroutingContext(RawContext parent) : base(parent) /// The body of the received message. /// public byte[] Body { get; set; } + + /// + /// Mark this message as forwarded. + /// + /// + public void MarkForwarded() + { + Forwarded = true; + } + + /// + /// Marks this message as OK to be dropped if no chain terminator forwards it. + /// + public void DoNotRequireThisMessageToBeForwarded() + { + Dropped = true; + } } } \ No newline at end of file diff --git a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingTerminator.cs b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingTerminator.cs new file mode 100644 index 0000000..7028781 --- /dev/null +++ b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingTerminator.cs @@ -0,0 +1,14 @@ +using System.Threading.Tasks; +using NServiceBus.Router; + +class PreroutingTerminator : ChainTerminator +{ + protected override Task Terminate(PreroutingContext context) + { + if (!context.Dropped && !context.Forwarded) + { + throw new UnforwardableMessageException($"The incoming message {context.MessageId} has not been forwarded. This might indicate a configuration problem."); + } + return Task.FromResult(true); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToPublishPreroutingFork.cs b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToPublishPreroutingFork.cs index 3c9fe03..8e5ce9a 100644 --- a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToPublishPreroutingFork.cs +++ b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToPublishPreroutingFork.cs @@ -3,9 +3,9 @@ using NServiceBus; using NServiceBus.Router; -class PreroutingToPublishPreroutingFork : ChainTerminator +class PreroutingToPublishPreroutingFork : IRule { - protected override async Task Terminate(PreroutingContext context) + public async Task Invoke(PreroutingContext context, Func next) { if (context.Intent == MessageIntentEnum.Publish) { @@ -20,10 +20,10 @@ await context.Chains.Get() .Invoke(new PublishPreroutingContext(types, context)) .ConfigureAwait(false); - return true; + context.MarkForwarded(); } - return false; + await next(context).ConfigureAwait(false); } } diff --git a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToReplyPreroutingFork.cs b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToReplyPreroutingFork.cs index 32a994e..dd25ee4 100644 --- a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToReplyPreroutingFork.cs +++ b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToReplyPreroutingFork.cs @@ -1,10 +1,11 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using NServiceBus; using NServiceBus.Router; -class PreroutingToReplyPreroutingFork : ChainTerminator +class PreroutingToReplyPreroutingFork : IRule { - protected override async Task Terminate(PreroutingContext context) + public async Task Invoke(PreroutingContext context, Func next) { if (context.Intent == MessageIntentEnum.Reply) { @@ -12,10 +13,10 @@ await context.Chains.Get() .Invoke(new ReplyPreroutingContext(context)) .ConfigureAwait(false); - return true; + context.MarkForwarded(); } - return false; + await next(context).ConfigureAwait(false); } } diff --git a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToSendPreroutingFork.cs b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToSendPreroutingFork.cs index 10e903e..5c8656e 100644 --- a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToSendPreroutingFork.cs +++ b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToSendPreroutingFork.cs @@ -1,10 +1,11 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using NServiceBus; using NServiceBus.Router; -class PreroutingToSendPreroutingFork : ChainTerminator +class PreroutingToSendPreroutingFork : IRule { - protected override async Task Terminate(PreroutingContext context) + public async Task Invoke(PreroutingContext context, Func next) { if (context.Intent == MessageIntentEnum.Send) { @@ -12,10 +13,10 @@ await context.Chains.Get() .Invoke(new SendPreroutingContext(context)) .ConfigureAwait(false); - return true; + context.MarkForwarded(); } - return false; + await next(context).ConfigureAwait(false); } } diff --git a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToSubscribePreroutingFork.cs b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToSubscribePreroutingFork.cs index 7210616..48af62d 100644 --- a/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToSubscribePreroutingFork.cs +++ b/src/NServiceBus.Router/Pipeline/Prerouting/PreroutingToSubscribePreroutingFork.cs @@ -1,10 +1,11 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using NServiceBus; using NServiceBus.Router; -class PreroutingToSubscribePreroutingFork : ChainTerminator +class PreroutingToSubscribePreroutingFork : IRule { - protected override async Task Terminate(PreroutingContext context) + public async Task Invoke(PreroutingContext context, Func next) { if (context.Intent == MessageIntentEnum.Subscribe || context.Intent == MessageIntentEnum.Unsubscribe) @@ -41,10 +42,10 @@ await context.Chains.Get() .ConfigureAwait(false); } - return true; + context.MarkForwarded(); } - return false; + await next(context).ConfigureAwait(false); } static string GetReplyToAddress(PreroutingContext message) diff --git a/src/NServiceBus.Router/Router.cs b/src/NServiceBus.Router/Router.cs index 5e31889..2af442d 100644 --- a/src/NServiceBus.Router/Router.cs +++ b/src/NServiceBus.Router/Router.cs @@ -35,6 +35,7 @@ public static IRouter Create(RouterConfiguration config) chains.AddChain(cb => cb.Begin().AddSection().Terminate()); chains.AddRule(_ => new RawToPreroutingConnector()); chains.AddRule(c => new DetectCyclesRule(c.Endpoint.EndpointName)); + chains.AddRule(_ => new PreroutingTerminator()); chains.AddRule(_ => new PreroutingToSubscribePreroutingFork()); diff --git a/src/NServiceBus.Router/RouterImpl.cs b/src/NServiceBus.Router/RouterImpl.cs index 41878fd..b064446 100644 --- a/src/NServiceBus.Router/RouterImpl.cs +++ b/src/NServiceBus.Router/RouterImpl.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using NServiceBus.Logging; @@ -16,10 +17,14 @@ public RouterImpl(string name, Interface[] interfaces, IModule[] modules, IRouti this.interfaces = interfaces.ToDictionary(x => x.Name, x => x); } - public async Task Start() + public async Task Initialize() { - var rootContext = new RootContext(interfaceChains, name); + if (initialized) + { + throw new Exception("The router has already been initialized."); + } + rootContext = new RootContext(interfaceChains, name); await routingProtocol.Start(new RouterMetadata(name, interfaces.Keys.ToList())).ConfigureAwait(false); foreach (var iface in interfaces.Values) @@ -27,6 +32,16 @@ public async Task Start() await iface.Initialize(interfaceChains, rootContext).ConfigureAwait(false); } + initialized = true; + } + + public async Task Start() + { + if (!initialized) + { + await Initialize().ConfigureAwait(false); + } + //Start modules in order foreach (var module in modules) { @@ -53,7 +68,8 @@ public async Task Stop() await Task.WhenAll(interfaces.Values.Select(s => s.Stop())).ConfigureAwait(false); await routingProtocol.Stop().ConfigureAwait(false); } - + + bool initialized; string name; IModule[] modules; IRoutingProtocol routingProtocol; @@ -61,4 +77,5 @@ public async Task Stop() SettingsHolder extensibilitySettings; Dictionary interfaces; static ILog log = LogManager.GetLogger(); + RootContext rootContext; } \ No newline at end of file