From 137b8b9d216096392c1321795af8337cedeaf061 Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Wed, 18 Mar 2020 09:47:43 +0100 Subject: [PATCH] Make SendOnlyInterface a separate concept entirely --- src/AcceptanceTesting/SubscriptionHelper.cs | 6 ++ ...orwarding_a_message_via_sendonly_router.cs | 83 +++++++++++++++++++ src/NServiceBus.Router/Router.cs | 6 +- src/NServiceBus.Router/RouterConfiguration.cs | 5 +- src/NServiceBus.Router/RouterImpl.cs | 25 ++++-- src/NServiceBus.Router/SendOnlyInterface.cs | 40 ++++----- .../SendOnlyInterfaceConfiguration.cs | 2 +- 7 files changed, 128 insertions(+), 39 deletions(-) create mode 100644 src/NServiceBus.Router.AcceptanceTests/SingleRouter/When_forwarding_a_message_via_sendonly_router.cs diff --git a/src/AcceptanceTesting/SubscriptionHelper.cs b/src/AcceptanceTesting/SubscriptionHelper.cs index 9d379cd..8359263 100644 --- a/src/AcceptanceTesting/SubscriptionHelper.cs +++ b/src/AcceptanceTesting/SubscriptionHelper.cs @@ -8,4 +8,10 @@ public static class SubscriptionHelper interfaceConfig.EnableMessageDrivenPublishSubscribe(new InMemorySubscriptionStorage()); return interfaceConfig; } + + public static SendOnlyInterfaceConfiguration InMemorySubscriptions(this SendOnlyInterfaceConfiguration interfaceConfig) where T : TransportDefinition, new() + { + interfaceConfig.EnableMessageDrivenPublishSubscribe(new InMemorySubscriptionStorage()); + return interfaceConfig; + } } \ No newline at end of file diff --git a/src/NServiceBus.Router.AcceptanceTests/SingleRouter/When_forwarding_a_message_via_sendonly_router.cs b/src/NServiceBus.Router.AcceptanceTests/SingleRouter/When_forwarding_a_message_via_sendonly_router.cs new file mode 100644 index 0000000..3a52528 --- /dev/null +++ b/src/NServiceBus.Router.AcceptanceTests/SingleRouter/When_forwarding_a_message_via_sendonly_router.cs @@ -0,0 +1,83 @@ +using System.Threading.Tasks; +using NServiceBus.AcceptanceTesting; +using NServiceBus.AcceptanceTests; +using NServiceBus.AcceptanceTests.EndpointTemplates; +using NUnit.Framework; + +namespace NServiceBus.Router.AcceptanceTests.SingleRouter +{ + using AcceptanceTesting.Customization; + + [TestFixture] + public class When_forwarding_a_message_via_sendonly_router : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_deliver_the_message() + { + var result = await Scenario.Define() + .WithRouter("Router", cfg => + { + cfg.AddInterface("Left", t => t.BrokerAlpha()).InMemorySubscriptions(); + cfg.AddSendOnlyInterface("Right", t => t.BrokerBravo()).InMemorySubscriptions(); + + cfg.UseStaticRoutingProtocol().AddForwardRoute("Left", "Right"); + }) + .WithEndpoint(c => c.When(s => s.Send(new MyRequest()))) + .WithEndpoint() + .Done(c => c.RequestReceived) + .Run(); + + Assert.IsTrue(result.RequestReceived); + } + + class Context : ScenarioContext + { + public bool RequestReceived { get; set; } + } + + class Sender : EndpointConfigurationBuilder + { + public Sender() + { + EndpointSetup(c => + { + var routing = c.UseTransport().BrokerAlpha().Routing(); + var bridge = routing.ConnectToRouter("Router"); + bridge.RouteToEndpoint(typeof(MyRequest), Conventions.EndpointNamingConvention(typeof(Receiver))); + }); + } + } + + class Receiver : EndpointConfigurationBuilder + { + public Receiver() + { + EndpointSetup(c => + { + //No bridge configuration needed for reply + c.UseTransport().BrokerBravo(); + }); + } + + class MyRequestHandler : IHandleMessages + { + Context scenarioContext; + + public MyRequestHandler(Context scenarioContext) + { + this.scenarioContext = scenarioContext; + } + + public Task Handle(MyRequest request, IMessageHandlerContext context) + { + scenarioContext.RequestReceived = true; + return Task.CompletedTask; + } + } + } + + class MyRequest : IMessage + { + } + } +} diff --git a/src/NServiceBus.Router/Router.cs b/src/NServiceBus.Router/Router.cs index 2af442d..7a1b810 100644 --- a/src/NServiceBus.Router/Router.cs +++ b/src/NServiceBus.Router/Router.cs @@ -28,7 +28,9 @@ public static IRouter Create(RouterConfiguration config) feature.Configure(config); } + var sendOnlyInterfaces = config.SendOnlyInterfaceFactories.Select(x => x()).ToArray(); var interfaces = config.InterfaceFactories.Select(x => x()).ToArray(); + var allInterfaceNames = sendOnlyInterfaces.Select(i => i.Name).Concat(interfaces.Select(i => i.Name)).ToArray(); var chains = config.Chains; @@ -61,7 +63,7 @@ public static IRouter Create(RouterConfiguration config) chains.AddRule(_ => new PreroutingToPublishPreroutingFork()); chains.AddChain(cb => cb.Begin().Terminate()); - chains.AddRule(c => new PublishPreroutingTerminator(interfaces.Select(i => i.Name).ToArray(), c.TypeGenerator)); + chains.AddRule(c => new PublishPreroutingTerminator(allInterfaceNames, c.TypeGenerator)); chains.AddChain(cb => cb.Begin().Terminate()); chains.AddRule(_ => new PreroutingToReplyPreroutingFork()); @@ -80,7 +82,7 @@ public static IRouter Create(RouterConfiguration config) chains.AddChain(cb => cb.Begin().Terminate()); chains.AddRule(c => new PostroutingTerminator(c.Endpoint)); - return new RouterImpl(config.Name, interfaces, config.Modules.ToArray(), config.RoutingProtocol, chains, config.Settings); + return new RouterImpl(config.Name, interfaces, sendOnlyInterfaces, config.Modules.ToArray(), config.RoutingProtocol, chains, config.Settings); } } } \ No newline at end of file diff --git a/src/NServiceBus.Router/RouterConfiguration.cs b/src/NServiceBus.Router/RouterConfiguration.cs index f266e5d..083e59b 100644 --- a/src/NServiceBus.Router/RouterConfiguration.cs +++ b/src/NServiceBus.Router/RouterConfiguration.cs @@ -53,11 +53,11 @@ public SendOnlyInterfaceConfiguration AddSendOnlyInterface(string name, Ac where T : TransportDefinition, new() { var ifaceConfig = new SendOnlyInterfaceConfiguration(name, customization, this); - InterfaceFactories.Add(() => CreateSendOnlyInterface(ifaceConfig)); + SendOnlyInterfaceFactories.Add(() => CreateSendOnlyInterface(ifaceConfig)); return ifaceConfig; } - Interface CreateSendOnlyInterface(SendOnlyInterfaceConfiguration ifaceConfig) + SendOnlyInterface CreateSendOnlyInterface(SendOnlyInterfaceConfiguration ifaceConfig) where T : TransportDefinition, new() { return ifaceConfig.Create(Name, typeGenerator, Settings); @@ -148,6 +148,7 @@ public void DefineChain(Func> chainDefiniti bool? autoCreateQueues; string autoCreateQueuesIdentity; internal List> InterfaceFactories = new List>(); + internal List> SendOnlyInterfaceFactories = new List>(); internal List Modules = new List(); internal HashSet Features = new HashSet(); internal IRoutingProtocol RoutingProtocol; diff --git a/src/NServiceBus.Router/RouterImpl.cs b/src/NServiceBus.Router/RouterImpl.cs index b064446..2207128 100644 --- a/src/NServiceBus.Router/RouterImpl.cs +++ b/src/NServiceBus.Router/RouterImpl.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using NServiceBus.Logging; @@ -7,14 +6,15 @@ class RouterImpl : IRouter { - public RouterImpl(string name, Interface[] interfaces, IModule[] modules, IRoutingProtocol routingProtocol, InterfaceChains interfaceChains, SettingsHolder extensibilitySettings) + public RouterImpl(string name, Interface[] interfaces, SendOnlyInterface[] sendOnlyInterfaces, IModule[] modules, IRoutingProtocol routingProtocol, InterfaceChains interfaceChains, SettingsHolder extensibilitySettings) { this.name = name; + this.sendOnlyInterfaces = sendOnlyInterfaces; this.modules = modules; this.routingProtocol = routingProtocol; this.interfaceChains = interfaceChains; this.extensibilitySettings = extensibilitySettings; - this.interfaces = interfaces.ToDictionary(x => x.Name, x => x); + this.interfaces = interfaces; } public async Task Initialize() @@ -25,9 +25,14 @@ public async Task Initialize() } rootContext = new RootContext(interfaceChains, name); - await routingProtocol.Start(new RouterMetadata(name, interfaces.Keys.ToList())).ConfigureAwait(false); + await routingProtocol.Start(new RouterMetadata(name, interfaces.Select(i => i.Name).ToList())).ConfigureAwait(false); - foreach (var iface in interfaces.Values) + foreach (var iface in sendOnlyInterfaces) + { + await iface.Initialize(interfaceChains).ConfigureAwait(false); + } + + foreach (var iface in interfaces) { await iface.Initialize(interfaceChains, rootContext).ConfigureAwait(false); } @@ -50,12 +55,12 @@ public async Task Start() log.Debug($"Started module {module}"); } - await Task.WhenAll(interfaces.Values.Select(p => p.StartReceiving())).ConfigureAwait(false); + await Task.WhenAll(interfaces.Select(p => p.StartReceiving())).ConfigureAwait(false); } public async Task Stop() { - await Task.WhenAll(interfaces.Values.Select(s => s.StopReceiving())).ConfigureAwait(false); + await Task.WhenAll(interfaces.Select(s => s.StopReceiving())).ConfigureAwait(false); //Stop modules in reverse order foreach (var module in modules.Reverse()) @@ -65,17 +70,19 @@ public async Task Stop() log.Debug($"Stopped module {module}"); } - await Task.WhenAll(interfaces.Values.Select(s => s.Stop())).ConfigureAwait(false); + await Task.WhenAll(interfaces.Select(s => s.Stop())).ConfigureAwait(false); + await Task.WhenAll(sendOnlyInterfaces.Select(s => s.Stop())).ConfigureAwait(false); await routingProtocol.Stop().ConfigureAwait(false); } bool initialized; string name; + Interface[] interfaces; + SendOnlyInterface[] sendOnlyInterfaces; IModule[] modules; IRoutingProtocol routingProtocol; InterfaceChains interfaceChains; SettingsHolder extensibilitySettings; - Dictionary interfaces; static ILog log = LogManager.GetLogger(); RootContext rootContext; } \ No newline at end of file diff --git a/src/NServiceBus.Router/SendOnlyInterface.cs b/src/NServiceBus.Router/SendOnlyInterface.cs index ee20e39..df1f762 100644 --- a/src/NServiceBus.Router/SendOnlyInterface.cs +++ b/src/NServiceBus.Router/SendOnlyInterface.cs @@ -6,7 +6,14 @@ using NServiceBus.Raw; using NServiceBus.Transport; -class SendOnlyInterface : Interface where T : TransportDefinition, new() +interface SendOnlyInterface +{ + string Name { get; } + Task Initialize(InterfaceChains interfaces); + Task Stop(); +} + +class SendOnlyInterface : SendOnlyInterface where T : TransportDefinition, new() { public SendOnlyInterface(string endpointName, string interfaceName, Action> transportCustomization, Func ruleCreationContextFactory) { @@ -26,44 +33,27 @@ static void SetTransportSpecificFlags(NServiceBus.Settings.SettingsHolder settin settings.Set("RabbitMQ.RoutingTopologySupportsDelayedDelivery", true); } - public async Task Initialize(InterfaceChains interfaces, RootContext rootContext) + public async Task Initialize(InterfaceChains interfaces) { - startable = await RawEndpoint.Create(config).ConfigureAwait(false); + var startable = await RawEndpoint.Create(config).ConfigureAwait(false); config = null; var ruleCreationContext = ruleCreationContextFactory(startable); interfaces.InitializeInterface(Name, ruleCreationContext); - } - - public async Task StartReceiving() - { - receiver = await startable.Start().ConfigureAwait(false); - } - public async Task StopReceiving() - { - if (receiver != null) - { - stoppable = await receiver.StopReceiving().ConfigureAwait(false); - } - else - { - stoppable = null; - } + endpoint = await startable.Start().ConfigureAwait(false); } public async Task Stop() { - if (stoppable != null) + if (endpoint != null) { - await stoppable.Stop().ConfigureAwait(false); - stoppable = null; + await endpoint.Stop().ConfigureAwait(false); + endpoint = null; } } RawEndpointConfiguration config; - IStartableRawEndpoint startable; - IReceivingRawEndpoint receiver; - IStoppableRawEndpoint stoppable; + IStoppableRawEndpoint endpoint; Func ruleCreationContextFactory; } diff --git a/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs b/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs index 86873bd..8dc0634 100644 --- a/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs +++ b/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs @@ -85,7 +85,7 @@ public void OverrideEndpointName(string interfaceEndpointName) /// public EndpointInstances EndpointInstances { get; } = new EndpointInstances(); - internal Interface Create(string endpointName, RuntimeTypeGenerator typeGenerator, SettingsHolder routerSettings) + internal SendOnlyInterface Create(string endpointName, RuntimeTypeGenerator typeGenerator, SettingsHolder routerSettings) { IRuleCreationContext ContextFactory(IRawEndpoint e) {