Skip to content

Commit

Permalink
Make SendOnlyInterface a separate concept entirely
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Mar 18, 2020
1 parent f70993a commit 137b8b9
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 39 deletions.
6 changes: 6 additions & 0 deletions src/AcceptanceTesting/SubscriptionHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ public static class SubscriptionHelper
interfaceConfig.EnableMessageDrivenPublishSubscribe(new InMemorySubscriptionStorage());
return interfaceConfig;
}

public static SendOnlyInterfaceConfiguration<T> InMemorySubscriptions<T>(this SendOnlyInterfaceConfiguration<T> interfaceConfig) where T : TransportDefinition, new()
{
interfaceConfig.EnableMessageDrivenPublishSubscribe(new InMemorySubscriptionStorage());
return interfaceConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System.Threading.Tasks;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

namespace NServiceBus.Router.AcceptanceTests.SingleRouter
{
using AcceptanceTesting.Customization;

[TestFixture]
public class When_forwarding_a_message_via_sendonly_router : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_deliver_the_message()
{
var result = await Scenario.Define<Context>()
.WithRouter("Router", cfg =>
{
cfg.AddInterface<TestTransport>("Left", t => t.BrokerAlpha()).InMemorySubscriptions();
cfg.AddSendOnlyInterface<TestTransport>("Right", t => t.BrokerBravo()).InMemorySubscriptions();

cfg.UseStaticRoutingProtocol().AddForwardRoute("Left", "Right");
})
.WithEndpoint<Sender>(c => c.When(s => s.Send(new MyRequest())))
.WithEndpoint<Receiver>()
.Done(c => c.RequestReceived)
.Run();

Assert.IsTrue(result.RequestReceived);
}

class Context : ScenarioContext
{
public bool RequestReceived { get; set; }
}

class Sender : EndpointConfigurationBuilder
{
public Sender()
{
EndpointSetup<DefaultServer>(c =>
{
var routing = c.UseTransport<TestTransport>().BrokerAlpha().Routing();
var bridge = routing.ConnectToRouter("Router");
bridge.RouteToEndpoint(typeof(MyRequest), Conventions.EndpointNamingConvention(typeof(Receiver)));
});
}
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(c =>
{
//No bridge configuration needed for reply
c.UseTransport<TestTransport>().BrokerBravo();
});
}

class MyRequestHandler : IHandleMessages<MyRequest>
{
Context scenarioContext;

public MyRequestHandler(Context scenarioContext)
{
this.scenarioContext = scenarioContext;
}

public Task Handle(MyRequest request, IMessageHandlerContext context)
{
scenarioContext.RequestReceived = true;
return Task.CompletedTask;
}
}
}

class MyRequest : IMessage
{
}
}
}
6 changes: 4 additions & 2 deletions src/NServiceBus.Router/Router.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public static IRouter Create(RouterConfiguration config)
feature.Configure(config);
}

var sendOnlyInterfaces = config.SendOnlyInterfaceFactories.Select(x => x()).ToArray();
var interfaces = config.InterfaceFactories.Select(x => x()).ToArray();
var allInterfaceNames = sendOnlyInterfaces.Select(i => i.Name).Concat(interfaces.Select(i => i.Name)).ToArray();

var chains = config.Chains;

Expand Down Expand Up @@ -61,7 +63,7 @@ public static IRouter Create(RouterConfiguration config)

chains.AddRule(_ => new PreroutingToPublishPreroutingFork());
chains.AddChain(cb => cb.Begin<PublishPreroutingContext>().Terminate());
chains.AddRule(c => new PublishPreroutingTerminator(interfaces.Select(i => i.Name).ToArray(), c.TypeGenerator));
chains.AddRule(c => new PublishPreroutingTerminator(allInterfaceNames, c.TypeGenerator));
chains.AddChain(cb => cb.Begin<ForwardPublishContext>().Terminate());

chains.AddRule(_ => new PreroutingToReplyPreroutingFork());
Expand All @@ -80,7 +82,7 @@ public static IRouter Create(RouterConfiguration config)
chains.AddChain(cb => cb.Begin<PostroutingContext>().Terminate());
chains.AddRule(c => new PostroutingTerminator(c.Endpoint));

return new RouterImpl(config.Name, interfaces, config.Modules.ToArray(), config.RoutingProtocol, chains, config.Settings);
return new RouterImpl(config.Name, interfaces, sendOnlyInterfaces, config.Modules.ToArray(), config.RoutingProtocol, chains, config.Settings);
}
}
}
5 changes: 3 additions & 2 deletions src/NServiceBus.Router/RouterConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public SendOnlyInterfaceConfiguration<T> AddSendOnlyInterface<T>(string name, Ac
where T : TransportDefinition, new()
{
var ifaceConfig = new SendOnlyInterfaceConfiguration<T>(name, customization, this);
InterfaceFactories.Add(() => CreateSendOnlyInterface(ifaceConfig));
SendOnlyInterfaceFactories.Add(() => CreateSendOnlyInterface(ifaceConfig));
return ifaceConfig;
}

Interface CreateSendOnlyInterface<T>(SendOnlyInterfaceConfiguration<T> ifaceConfig)
SendOnlyInterface CreateSendOnlyInterface<T>(SendOnlyInterfaceConfiguration<T> ifaceConfig)
where T : TransportDefinition, new()
{
return ifaceConfig.Create(Name, typeGenerator, Settings);
Expand Down Expand Up @@ -148,6 +148,7 @@ public void DefineChain<TInput>(Func<ChainBuilder, IChain<TInput>> chainDefiniti
bool? autoCreateQueues;
string autoCreateQueuesIdentity;
internal List<Func<Interface>> InterfaceFactories = new List<Func<Interface>>();
internal List<Func<SendOnlyInterface>> SendOnlyInterfaceFactories = new List<Func<SendOnlyInterface>>();
internal List<IModule> Modules = new List<IModule>();
internal HashSet<Type> Features = new HashSet<Type>();
internal IRoutingProtocol RoutingProtocol;
Expand Down
25 changes: 16 additions & 9 deletions src/NServiceBus.Router/RouterImpl.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus.Logging;
using NServiceBus.Router;

class RouterImpl : IRouter
{
public RouterImpl(string name, Interface[] interfaces, IModule[] modules, IRoutingProtocol routingProtocol, InterfaceChains interfaceChains, SettingsHolder extensibilitySettings)
public RouterImpl(string name, Interface[] interfaces, SendOnlyInterface[] sendOnlyInterfaces, IModule[] modules, IRoutingProtocol routingProtocol, InterfaceChains interfaceChains, SettingsHolder extensibilitySettings)
{
this.name = name;
this.sendOnlyInterfaces = sendOnlyInterfaces;
this.modules = modules;
this.routingProtocol = routingProtocol;
this.interfaceChains = interfaceChains;
this.extensibilitySettings = extensibilitySettings;
this.interfaces = interfaces.ToDictionary(x => x.Name, x => x);
this.interfaces = interfaces;
}

public async Task Initialize()
Expand All @@ -25,9 +25,14 @@ public async Task Initialize()
}

rootContext = new RootContext(interfaceChains, name);
await routingProtocol.Start(new RouterMetadata(name, interfaces.Keys.ToList())).ConfigureAwait(false);
await routingProtocol.Start(new RouterMetadata(name, interfaces.Select(i => i.Name).ToList())).ConfigureAwait(false);

foreach (var iface in interfaces.Values)
foreach (var iface in sendOnlyInterfaces)
{
await iface.Initialize(interfaceChains).ConfigureAwait(false);
}

foreach (var iface in interfaces)
{
await iface.Initialize(interfaceChains, rootContext).ConfigureAwait(false);
}
Expand All @@ -50,12 +55,12 @@ public async Task Start()
log.Debug($"Started module {module}");
}

await Task.WhenAll(interfaces.Values.Select(p => p.StartReceiving())).ConfigureAwait(false);
await Task.WhenAll(interfaces.Select(p => p.StartReceiving())).ConfigureAwait(false);
}

public async Task Stop()
{
await Task.WhenAll(interfaces.Values.Select(s => s.StopReceiving())).ConfigureAwait(false);
await Task.WhenAll(interfaces.Select(s => s.StopReceiving())).ConfigureAwait(false);

//Stop modules in reverse order
foreach (var module in modules.Reverse())
Expand All @@ -65,17 +70,19 @@ public async Task Stop()
log.Debug($"Stopped module {module}");
}

await Task.WhenAll(interfaces.Values.Select(s => s.Stop())).ConfigureAwait(false);
await Task.WhenAll(interfaces.Select(s => s.Stop())).ConfigureAwait(false);
await Task.WhenAll(sendOnlyInterfaces.Select(s => s.Stop())).ConfigureAwait(false);
await routingProtocol.Stop().ConfigureAwait(false);
}

bool initialized;
string name;
Interface[] interfaces;
SendOnlyInterface[] sendOnlyInterfaces;
IModule[] modules;
IRoutingProtocol routingProtocol;
InterfaceChains interfaceChains;
SettingsHolder extensibilitySettings;
Dictionary<string, Interface> interfaces;
static ILog log = LogManager.GetLogger<RouterImpl>();
RootContext rootContext;
}
40 changes: 15 additions & 25 deletions src/NServiceBus.Router/SendOnlyInterface.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
using NServiceBus.Raw;
using NServiceBus.Transport;

class SendOnlyInterface<T> : Interface where T : TransportDefinition, new()
interface SendOnlyInterface
{
string Name { get; }
Task Initialize(InterfaceChains interfaces);
Task Stop();
}

class SendOnlyInterface<T> : SendOnlyInterface where T : TransportDefinition, new()
{
public SendOnlyInterface(string endpointName, string interfaceName, Action<TransportExtensions<T>> transportCustomization, Func<IRawEndpoint, IRuleCreationContext> ruleCreationContextFactory)
{
Expand All @@ -26,44 +33,27 @@ static void SetTransportSpecificFlags(NServiceBus.Settings.SettingsHolder settin
settings.Set("RabbitMQ.RoutingTopologySupportsDelayedDelivery", true);
}

public async Task Initialize(InterfaceChains interfaces, RootContext rootContext)
public async Task Initialize(InterfaceChains interfaces)
{
startable = await RawEndpoint.Create(config).ConfigureAwait(false);
var startable = await RawEndpoint.Create(config).ConfigureAwait(false);
config = null;
var ruleCreationContext = ruleCreationContextFactory(startable);
interfaces.InitializeInterface(Name, ruleCreationContext);
}

public async Task StartReceiving()
{
receiver = await startable.Start().ConfigureAwait(false);
}

public async Task StopReceiving()
{
if (receiver != null)
{
stoppable = await receiver.StopReceiving().ConfigureAwait(false);
}
else
{
stoppable = null;
}
endpoint = await startable.Start().ConfigureAwait(false);
}

public async Task Stop()
{
if (stoppable != null)
if (endpoint != null)
{
await stoppable.Stop().ConfigureAwait(false);
stoppable = null;
await endpoint.Stop().ConfigureAwait(false);
endpoint = null;
}
}

RawEndpointConfiguration config;
IStartableRawEndpoint startable;
IReceivingRawEndpoint receiver;
IStoppableRawEndpoint stoppable;
IStoppableRawEndpoint endpoint;

Func<IRawEndpoint, IRuleCreationContext> ruleCreationContextFactory;
}
2 changes: 1 addition & 1 deletion src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void OverrideEndpointName(string interfaceEndpointName)
/// </summary>
public EndpointInstances EndpointInstances { get; } = new EndpointInstances();

internal Interface Create(string endpointName, RuntimeTypeGenerator typeGenerator, SettingsHolder routerSettings)
internal SendOnlyInterface Create(string endpointName, RuntimeTypeGenerator typeGenerator, SettingsHolder routerSettings)
{
IRuleCreationContext ContextFactory(IRawEndpoint e)
{
Expand Down

0 comments on commit 137b8b9

Please sign in to comment.