Skip to content

Commit

Permalink
Merge pull request #24 from KatoStoelen/send-only-interfaces
Browse files Browse the repository at this point in the history
Implement send-only interfaces
  • Loading branch information
SzymonPobiega authored Mar 18, 2020
2 parents bea8795 + bbd88b4 commit f70993a
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 5 deletions.
11 changes: 8 additions & 3 deletions src/NServiceBus.Router/MessageDrivenPubSubFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public void Configure(RouterConfiguration routerConfig)
{
routerConfig.AddRule(c => new ForwardPublishStorageDrivenRule(GetSubscriptionStorage(c), c.DistributionPolicy), c => IsEnabled(c));
routerConfig.AddRule(c => new ForwardPublishNullRule(), c => IsExplicitlyDisabled(c));
routerConfig.AddRule(c => new ForwardSubscribeMessageDrivenRule(c.Endpoint.TransportAddress, c.Endpoint.EndpointName), c => IsEnabled(c));
routerConfig.AddRule(c => new ForwardUnsubscribeMessageDrivenRule(c.Endpoint.TransportAddress, c.Endpoint.EndpointName), c => IsEnabled(c));
routerConfig.AddRule(c => new StorageDrivenSubscriptionRule(GetSubscriptionStorage(c)), c => IsEnabled(c));
routerConfig.AddRule(c => new ForwardSubscribeMessageDrivenRule(c.Endpoint.TransportAddress, c.Endpoint.EndpointName), c => IsEnabled(c) && !SendOnly(c));
routerConfig.AddRule(c => new ForwardUnsubscribeMessageDrivenRule(c.Endpoint.TransportAddress, c.Endpoint.EndpointName), c => IsEnabled(c) && !SendOnly(c));
routerConfig.AddRule(c => new StorageDrivenSubscriptionRule(GetSubscriptionStorage(c)), c => IsEnabled(c) && !SendOnly(c));
}

static ISubscriptionStorage GetSubscriptionStorage(IRuleCreationContext c)
Expand Down Expand Up @@ -45,6 +45,11 @@ static bool IsExplicitlyDisabled(IRuleCreationContext context)
&& false == context.Settings.Get<bool>(SettingsKey);
}

static bool SendOnly(IRuleCreationContext context)
{
return context.Endpoint.Settings.GetOrDefault<bool>("Endpoint.SendOnly");
}

class ForwardPublishNullRule : ChainTerminator<ForwardPublishContext>
{
static Task<bool> falseResult = Task.FromResult(false);
Expand Down
22 changes: 22 additions & 0 deletions src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,27 @@ public static void DisableMessageDrivenPublishSubscribe<T>(this InterfaceConfigu
{
interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", false);
}

/// <summary>
/// Enables message-driven storage-based publish/subscribe for a given send-only interface.
/// </summary>
/// <param name="interfaceConfig">Send-only interface configuration.</param>
/// <param name="subscriptionStorage">Subscription storage.</param>
public static void EnableMessageDrivenPublishSubscribe<T>(this SendOnlyInterfaceConfiguration<T> interfaceConfig, ISubscriptionStorage subscriptionStorage)
where T : TransportDefinition, new()
{
interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", true);
interfaceConfig.Settings.Set<ISubscriptionStorage>(subscriptionStorage);
}

/// <summary>
/// Disables message-driven storage-based publish/subscribe for a given send-only interface.
/// </summary>
/// <param name="interfaceConfig">Send-only interface configuration.</param>
public static void DisableMessageDrivenPublishSubscribe<T>(this SendOnlyInterfaceConfiguration<T> interfaceConfig)
where T : TransportDefinition, new()
{
interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", false);
}
}
}
9 changes: 7 additions & 2 deletions src/NServiceBus.Router/NativePubSubFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ public void Configure(RouterConfiguration routerConfig)
{
routerConfig.AddRule(c => new ForwardPublishNativeRule(), c => EnableNativePubSub(c));
routerConfig.AddRule(c => new ForwardPublishNullRule(), c => c.Settings.HasExplicitValue("NativePubSubDisabled"));
routerConfig.AddRule(c => new ForwardSubscribeNativeRule(c.Endpoint.SubscriptionManager), c => EnableNativePubSub(c));
routerConfig.AddRule(c => new ForwardUnsubscribeNativeRule(c.Endpoint.SubscriptionManager), c => EnableNativePubSub(c));
routerConfig.AddRule(c => new ForwardSubscribeNativeRule(c.Endpoint.SubscriptionManager), c => EnableNativePubSub(c) && !SendOnly(c));
routerConfig.AddRule(c => new ForwardUnsubscribeNativeRule(c.Endpoint.SubscriptionManager), c => EnableNativePubSub(c) && !SendOnly(c));
}

static bool EnableNativePubSub(IRuleCreationContext context)
Expand All @@ -22,6 +22,11 @@ static bool EnableNativePubSub(IRuleCreationContext context)
return transport.OutboundRoutingPolicy.Publishes == OutboundRoutingType.Multicast;
}

static bool SendOnly(IRuleCreationContext context)
{
return context.Endpoint.Settings.GetOrDefault<bool>("Endpoint.SendOnly");
}

class ForwardPublishNullRule : ChainTerminator<ForwardPublishContext>
{
static Task<bool> falseResult = Task.FromResult(false);
Expand Down
9 changes: 9 additions & 0 deletions src/NServiceBus.Router/NativeSubscriptionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,14 @@ public static void DisableNativePubSub<T>(this InterfaceConfiguration<T> interfa
{
interfaceConfig.Settings.Set("NativePubSubDisabled", true);
}

/// <summary>
/// Disables native publish/subscribe handling for a given send-only interface.
/// </summary>
public static void DisableNativePubSub<T>(this SendOnlyInterfaceConfiguration<T> interfaceConfig)
where T : TransportDefinition, new()
{
interfaceConfig.Settings.Set("NativePubSubDisabled", true);
}
}
}
20 changes: 20 additions & 0 deletions src/NServiceBus.Router/RouterConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,26 @@ public InterfaceConfiguration<T> AddInterface<T>(string name, Action<TransportEx
return ifaceConfig;
}

/// <summary>
/// Adds a new send-only interface to the router.
/// </summary>
/// <typeparam name="T">Transport to use for this interface.</typeparam>
/// <param name="name">Name of the interface.</param>
/// <param name="customization">A callback for customizing the transport settings.</param>
public SendOnlyInterfaceConfiguration<T> AddSendOnlyInterface<T>(string name, Action<TransportExtensions<T>> customization)
where T : TransportDefinition, new()
{
var ifaceConfig = new SendOnlyInterfaceConfiguration<T>(name, customization, this);
InterfaceFactories.Add(() => CreateSendOnlyInterface(ifaceConfig));
return ifaceConfig;
}

Interface CreateSendOnlyInterface<T>(SendOnlyInterfaceConfiguration<T> ifaceConfig)
where T : TransportDefinition, new()
{
return ifaceConfig.Create(Name, typeGenerator, Settings);
}

Interface CreateInterface<T>(InterfaceConfiguration<T> ifaceConfig) where T : TransportDefinition, new()
{
return ifaceConfig.Create(Name, PoisonQueueName, autoCreateQueues, autoCreateQueuesIdentity, ImmediateRetries, DelayedRetries, CircuitBreakerThreshold, typeGenerator, Settings);
Expand Down
69 changes: 69 additions & 0 deletions src/NServiceBus.Router/SendOnlyInterface.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Router;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Raw;
using NServiceBus.Transport;

class SendOnlyInterface<T> : Interface where T : TransportDefinition, new()
{
public SendOnlyInterface(string endpointName, string interfaceName, Action<TransportExtensions<T>> transportCustomization, Func<IRawEndpoint, IRuleCreationContext> ruleCreationContextFactory)
{
this.ruleCreationContextFactory = ruleCreationContextFactory;
Name = interfaceName;

config = RawEndpointConfiguration.CreateSendOnly(endpointName);
var transport = config.UseTransport<T>();
SetTransportSpecificFlags(transport.GetSettings());
transportCustomization?.Invoke(transport);
}

public string Name { get; }

static void SetTransportSpecificFlags(NServiceBus.Settings.SettingsHolder settings)
{
settings.Set("RabbitMQ.RoutingTopologySupportsDelayedDelivery", true);
}

public async Task Initialize(InterfaceChains interfaces, RootContext rootContext)
{
startable = await RawEndpoint.Create(config).ConfigureAwait(false);
config = null;
var ruleCreationContext = ruleCreationContextFactory(startable);
interfaces.InitializeInterface(Name, ruleCreationContext);
}

public async Task StartReceiving()
{
receiver = await startable.Start().ConfigureAwait(false);
}

public async Task StopReceiving()
{
if (receiver != null)
{
stoppable = await receiver.StopReceiving().ConfigureAwait(false);
}
else
{
stoppable = null;
}
}

public async Task Stop()
{
if (stoppable != null)
{
await stoppable.Stop().ConfigureAwait(false);
stoppable = null;
}
}

RawEndpointConfiguration config;
IStartableRawEndpoint startable;
IReceivingRawEndpoint receiver;
IStoppableRawEndpoint stoppable;

Func<IRawEndpoint, IRuleCreationContext> ruleCreationContextFactory;
}
99 changes: 99 additions & 0 deletions src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
namespace NServiceBus.Router
{
using System;
using Raw;
using Routing;
using Transport;
using Unicast.Subscriptions.MessageDrivenSubscriptions;

/// <summary>
/// Configures the switch port.
/// </summary>
/// <typeparam name="T">Type of transport.</typeparam>
public class SendOnlyInterfaceConfiguration<T>
where T : TransportDefinition, new()
{
Action<TransportExtensions<T>> customization;
string overriddenEndpointName;

/// <summary>
/// Interface's extensibility settings.
/// </summary>
public SettingsHolder Settings { get; } = new SettingsHolder();

/// <summary>
/// Name of the interface.
/// </summary>
public string Name { get; }

/// <summary>
/// Router's configuration.
/// </summary>
public RouterConfiguration RouterConfiguration { get; }

internal SendOnlyInterfaceConfiguration(string name, Action<TransportExtensions<T>> customization, RouterConfiguration routerConfiguration)
{
Name = name;
this.customization = customization;
RouterConfiguration = routerConfiguration;
}

/// <summary>
/// Adds a global (applicable to all interfaces) routing rule.
/// </summary>
/// <typeparam name="TRule">Type of the rule.</typeparam>
/// <param name="constructor">Delegate that constructs a new instance of the rule.</param>
/// <param name="condition">Condition which must be true for the rule to be added to the chain.</param>
public void AddRule<TRule>(Func<IRuleCreationContext, TRule> constructor, Func<IRuleCreationContext, bool> condition = null)
where TRule : IRule
{
RouterConfiguration.AddRule(constructor, context =>
{
if (condition == null)
{
return context.InterfaceName == Name;
}
return condition(context) && context.InterfaceName == Name;
});
}

/// <summary>
/// Configures the port to use specified subscription persistence.
/// </summary>
[Obsolete("Use EnableMessageDrivenPublishSubscribe instead.")]
public void UseSubscriptionPersistence(ISubscriptionStorage subscriptionStorage)
{
this.EnableMessageDrivenPublishSubscribe(subscriptionStorage);
}

/// <summary>
/// Overrides the interface endpoint name.
/// </summary>
/// <param name="interfaceEndpointName">Endpoint name to use for this interface instead of Router's name</param>
public void OverrideEndpointName(string interfaceEndpointName)
{
overriddenEndpointName = interfaceEndpointName;
}

/// <summary>
/// Distribution policy of the port.
/// </summary>
public RawDistributionPolicy DistributionPolicy { get; } = new RawDistributionPolicy();

/// <summary>
/// Physical routing settings of the port.
/// </summary>
public EndpointInstances EndpointInstances { get; } = new EndpointInstances();

internal Interface Create(string endpointName, RuntimeTypeGenerator typeGenerator, SettingsHolder routerSettings)
{
IRuleCreationContext ContextFactory(IRawEndpoint e)
{
Settings.Merge(routerSettings);
return new RuleCreationContext(Name, EndpointInstances, DistributionPolicy, e, typeGenerator, Settings);
}

return new SendOnlyInterface<T>(overriddenEndpointName ?? endpointName, Name, customization, ContextFactory);
}
}
}

0 comments on commit f70993a

Please sign in to comment.