From 847b3ade73c89f334705666a8d0a58d3e25db804 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kato=20St=C3=B8len?= Date: Fri, 6 Mar 2020 11:42:48 +0100 Subject: [PATCH 1/5] Add APIs to configure send-only interfaces Fixes #23 --- .../InterfaceConfiguration.cs | 17 +++- .../MessageDrivenSubscriptionExtensions.cs | 8 +- .../NativeSubscriptionExtensions.cs | 5 +- src/NServiceBus.Router/RouterConfiguration.cs | 20 ++++ src/NServiceBus.Router/SendOnlyInterface.cs | 68 +++++++++++++ .../SendOnlyInterfaceConfiguration.cs | 97 +++++++++++++++++++ .../SendOnlyRawEndpointConfig.cs | 76 +++++++++++++++ 7 files changed, 277 insertions(+), 14 deletions(-) create mode 100644 src/NServiceBus.Router/SendOnlyInterface.cs create mode 100644 src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs create mode 100644 src/NServiceBus.Router/SendOnlyRawEndpointConfig.cs diff --git a/src/NServiceBus.Router/InterfaceConfiguration.cs b/src/NServiceBus.Router/InterfaceConfiguration.cs index b7899a9..e15b873 100644 --- a/src/NServiceBus.Router/InterfaceConfiguration.cs +++ b/src/NServiceBus.Router/InterfaceConfiguration.cs @@ -6,11 +6,22 @@ using Transport; using Unicast.Subscriptions.MessageDrivenSubscriptions; + /// + /// Represents an interface configuration, + /// + public interface IInterfaceConfiguration + { + /// + /// Interface's extensibility settings. + /// + SettingsHolder Settings { get; } + } + /// /// Configures the switch port. /// /// Type of transport. - public class InterfaceConfiguration + public class InterfaceConfiguration : IInterfaceConfiguration where T : TransportDefinition, new() { Action> customization; @@ -19,9 +30,7 @@ public class InterfaceConfiguration int? maximumConcurrency; string overriddenEndpointName; - /// - /// Interface's extensibility settings. - /// + /// public SettingsHolder Settings { get; } = new SettingsHolder(); /// diff --git a/src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs b/src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs index 3f7533d..5b19f7b 100644 --- a/src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs +++ b/src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs @@ -1,6 +1,5 @@ namespace NServiceBus.Router { - using Transport; using Unicast.Subscriptions.MessageDrivenSubscriptions; /// @@ -13,8 +12,7 @@ public static class MessageDrivenSubscriptionExtensions /// /// Interface configuration. /// Subscription storage. - public static void EnableMessageDrivenPublishSubscribe(this InterfaceConfiguration interfaceConfig, ISubscriptionStorage subscriptionStorage) - where T : TransportDefinition, new() + public static void EnableMessageDrivenPublishSubscribe(this IInterfaceConfiguration interfaceConfig, ISubscriptionStorage subscriptionStorage) { interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", true); interfaceConfig.Settings.Set(subscriptionStorage); @@ -23,10 +21,8 @@ public static void EnableMessageDrivenPublishSubscribe(this InterfaceConfigur /// /// Disables message-driven storage-based publish/subscribe for a given interface. /// - /// /// - public static void DisableMessageDrivenPublishSubscribe(this InterfaceConfiguration interfaceConfig) - where T : TransportDefinition, new() + public static void DisableMessageDrivenPublishSubscribe(this IInterfaceConfiguration interfaceConfig) { interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", false); } diff --git a/src/NServiceBus.Router/NativeSubscriptionExtensions.cs b/src/NServiceBus.Router/NativeSubscriptionExtensions.cs index 9736736..bd03f38 100644 --- a/src/NServiceBus.Router/NativeSubscriptionExtensions.cs +++ b/src/NServiceBus.Router/NativeSubscriptionExtensions.cs @@ -1,7 +1,5 @@ namespace NServiceBus.Router { - using Transport; - /// /// Configures native pub/sub /// @@ -10,8 +8,7 @@ public static class NativeSubscriptionExtensions /// /// Disables native publish/subscribe handling for a given interface. /// - public static void DisableNativePubSub(this InterfaceConfiguration interfaceConfig) - where T : TransportDefinition, new() + public static void DisableNativePubSub(this IInterfaceConfiguration interfaceConfig) { interfaceConfig.Settings.Set("NativePubSubDisabled", true); } diff --git a/src/NServiceBus.Router/RouterConfiguration.cs b/src/NServiceBus.Router/RouterConfiguration.cs index 4f116a8..f266e5d 100644 --- a/src/NServiceBus.Router/RouterConfiguration.cs +++ b/src/NServiceBus.Router/RouterConfiguration.cs @@ -43,6 +43,26 @@ public InterfaceConfiguration AddInterface(string name, Action + /// Adds a new send-only interface to the router. + /// + /// Transport to use for this interface. + /// Name of the interface. + /// A callback for customizing the transport settings. + public SendOnlyInterfaceConfiguration AddSendOnlyInterface(string name, Action> customization) + where T : TransportDefinition, new() + { + var ifaceConfig = new SendOnlyInterfaceConfiguration(name, customization, this); + InterfaceFactories.Add(() => CreateSendOnlyInterface(ifaceConfig)); + return ifaceConfig; + } + + Interface CreateSendOnlyInterface(SendOnlyInterfaceConfiguration ifaceConfig) + where T : TransportDefinition, new() + { + return ifaceConfig.Create(Name, typeGenerator, Settings); + } + Interface CreateInterface(InterfaceConfiguration ifaceConfig) where T : TransportDefinition, new() { return ifaceConfig.Create(Name, PoisonQueueName, autoCreateQueues, autoCreateQueuesIdentity, ImmediateRetries, DelayedRetries, CircuitBreakerThreshold, typeGenerator, Settings); diff --git a/src/NServiceBus.Router/SendOnlyInterface.cs b/src/NServiceBus.Router/SendOnlyInterface.cs new file mode 100644 index 0000000..bbf0d8f --- /dev/null +++ b/src/NServiceBus.Router/SendOnlyInterface.cs @@ -0,0 +1,68 @@ +using System; +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.Router; +using NServiceBus.Configuration.AdvancedExtensibility; +using NServiceBus.Raw; +using NServiceBus.Transport; + +class SendOnlyInterface : Interface where T : TransportDefinition, new() +{ + public SendOnlyInterface(string endpointName, string interfaceName, Action> transportCustomization, Func ruleCreationContextFactory) + { + this.ruleCreationContextFactory = ruleCreationContextFactory; + Name = interfaceName; + rawConfig = new SendOnlyRawEndpointConfig(endpointName, ext => + { + SetTransportSpecificFlags(ext.GetSettings()); + transportCustomization?.Invoke(ext); + }); + } + + public string Name { get; } + + static void SetTransportSpecificFlags(NServiceBus.Settings.SettingsHolder settings) + { + settings.Set("RabbitMQ.RoutingTopologySupportsDelayedDelivery", true); + } + + public async Task Initialize(InterfaceChains interfaces, RootContext rootContext) + { + sender = await rawConfig.Create().ConfigureAwait(false); + var ruleCreationContext = ruleCreationContextFactory(sender); + interfaces.InitializeInterface(Name, ruleCreationContext); + } + + public async Task StartReceiving() + { + receiver = await sender.Start().ConfigureAwait(false); + } + + public async Task StopReceiving() + { + if (receiver != null) + { + stoppable = await receiver.StopReceiving().ConfigureAwait(false); + } + else + { + stoppable = null; + } + } + + public async Task Stop() + { + if (stoppable != null) + { + await stoppable.Stop().ConfigureAwait(false); + stoppable = null; + } + } + + IReceivingRawEndpoint receiver; + IStartableRawEndpoint sender; + IStoppableRawEndpoint stoppable; + + SendOnlyRawEndpointConfig rawConfig; + Func ruleCreationContextFactory; +} diff --git a/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs b/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs new file mode 100644 index 0000000..83fb674 --- /dev/null +++ b/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs @@ -0,0 +1,97 @@ +namespace NServiceBus.Router +{ + using System; + using Raw; + using Routing; + using Transport; + using Unicast.Subscriptions.MessageDrivenSubscriptions; + + /// + /// Configures the switch port. + /// + /// Type of transport. + public class SendOnlyInterfaceConfiguration : IInterfaceConfiguration + where T : TransportDefinition, new() + { + Action> customization; + string overriddenEndpointName; + + /// + public SettingsHolder Settings { get; } = new SettingsHolder(); + + /// + /// Name of the interface. + /// + public string Name { get; } + + /// + /// Router's configuration. + /// + public RouterConfiguration RouterConfiguration { get; } + + internal SendOnlyInterfaceConfiguration(string name, Action> customization, RouterConfiguration routerConfiguration) + { + Name = name; + this.customization = customization; + RouterConfiguration = routerConfiguration; + } + + /// + /// Adds a global (applicable to all interfaces) routing rule. + /// + /// Type of the rule. + /// Delegate that constructs a new instance of the rule. + /// Condition which must be true for the rule to be added to the chain. + public void AddRule(Func constructor, Func condition = null) + where TRule : IRule + { + RouterConfiguration.AddRule(constructor, context => + { + if (condition == null) + { + return context.InterfaceName == Name; + } + return condition(context) && context.InterfaceName == Name; + }); + } + + /// + /// Configures the port to use specified subscription persistence. + /// + [Obsolete("Use EnableMessageDrivenPublishSubscribe instead.")] + public void UseSubscriptionPersistence(ISubscriptionStorage subscriptionStorage) + { + this.EnableMessageDrivenPublishSubscribe(subscriptionStorage); + } + + /// + /// Overrides the interface endpoint name. + /// + /// Endpoint name to use for this interface instead of Router's name + public void OverrideEndpointName(string interfaceEndpointName) + { + overriddenEndpointName = interfaceEndpointName; + } + + /// + /// Distribution policy of the port. + /// + public RawDistributionPolicy DistributionPolicy { get; } = new RawDistributionPolicy(); + + /// + /// Physical routing settings of the port. + /// + public EndpointInstances EndpointInstances { get; } = new EndpointInstances(); + + internal Interface Create(string endpointName, RuntimeTypeGenerator typeGenerator, SettingsHolder routerSettings) + { + IRuleCreationContext ContextFactory(IRawEndpoint e) + { + Settings.Merge(routerSettings); + return new RuleCreationContext(Name, EndpointInstances, DistributionPolicy, e, typeGenerator, Settings); + } + + return new SendOnlyInterface(overriddenEndpointName ?? endpointName, Name, customization, ContextFactory); + } + } +} diff --git a/src/NServiceBus.Router/SendOnlyRawEndpointConfig.cs b/src/NServiceBus.Router/SendOnlyRawEndpointConfig.cs new file mode 100644 index 0000000..c839741 --- /dev/null +++ b/src/NServiceBus.Router/SendOnlyRawEndpointConfig.cs @@ -0,0 +1,76 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.Extensibility; +using NServiceBus.Logging; +using NServiceBus.Raw; +using NServiceBus.Settings; +using NServiceBus.Transport; + +class SendOnlyRawEndpointConfig : IStartableRawEndpoint, IReceivingRawEndpoint + where T : TransportDefinition, new() +{ + public SendOnlyRawEndpointConfig(string endpointName, Action> transportCustomization) + { + config = RawEndpointConfiguration.CreateSendOnly(endpointName); + var transport = config.UseTransport(); + transportCustomization(transport); + } + + public async Task Create() + { + startable = await RawEndpoint.Create(config); + config = null; + return this; + } + + async Task IStartableRawEndpoint.Start() + { + endpoint = await startable.Start().ConfigureAwait(false); + startable = null; + return this; + } + + async Task IReceivingRawEndpoint.StopReceiving() + { + await transitionSemaphore.WaitAsync().ConfigureAwait(false); + if (endpoint != null) + { + stoppable = await endpoint.StopReceiving().ConfigureAwait(false); + endpoint = null; + } + return this; + } + + async Task IStoppableRawEndpoint.Stop() + { + if (stoppable != null) + { + await stoppable.Stop().ConfigureAwait(false); + stoppable = null; + } + } + + string IRawEndpoint.ToTransportAddress(LogicalAddress logicalAddress) => startable?.ToTransportAddress(logicalAddress) ?? endpoint.ToTransportAddress(logicalAddress); + + Task IDispatchMessages.Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, ContextBag context) + { + return endpoint != null + ? endpoint.Dispatch(outgoingMessages, transaction, context) + : startable.Dispatch(outgoingMessages, transaction, context); + } + + string IRawEndpoint.TransportAddress => startable?.TransportAddress ?? endpoint.TransportAddress; + string IRawEndpoint.EndpointName => startable?.EndpointName ?? endpoint.EndpointName; + ReadOnlySettings IRawEndpoint.Settings => startable?.Settings ?? endpoint.Settings; + public IManageSubscriptions SubscriptionManager => startable?.SubscriptionManager ?? endpoint.SubscriptionManager; + + RawEndpointConfiguration config; + IReceivingRawEndpoint endpoint; + IStartableRawEndpoint startable; + SemaphoreSlim transitionSemaphore = new SemaphoreSlim(1); + + static ILog logger = LogManager.GetLogger(typeof(SendOnlyRawEndpointConfig)); + IStoppableRawEndpoint stoppable; +} \ No newline at end of file From d817c839bf39fdeaf1c91e1c8ec239cac1d0b92c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kato=20St=C3=B8len?= Date: Fri, 6 Mar 2020 14:57:27 +0100 Subject: [PATCH 2/5] Disable N/A rules for send-only interfaces The ForwardSubscribeNativeRule and ForwardUnsubscribeNativeRule are not applicable for send-only interfaces. --- src/NServiceBus.Router/NativePubSubFeature.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.Router/NativePubSubFeature.cs b/src/NServiceBus.Router/NativePubSubFeature.cs index bb10e2f..8a26792 100644 --- a/src/NServiceBus.Router/NativePubSubFeature.cs +++ b/src/NServiceBus.Router/NativePubSubFeature.cs @@ -8,8 +8,8 @@ public void Configure(RouterConfiguration routerConfig) { routerConfig.AddRule(c => new ForwardPublishNativeRule(), c => EnableNativePubSub(c)); routerConfig.AddRule(c => new ForwardPublishNullRule(), c => c.Settings.HasExplicitValue("NativePubSubDisabled")); - routerConfig.AddRule(c => new ForwardSubscribeNativeRule(c.Endpoint.SubscriptionManager), c => EnableNativePubSub(c)); - routerConfig.AddRule(c => new ForwardUnsubscribeNativeRule(c.Endpoint.SubscriptionManager), c => EnableNativePubSub(c)); + routerConfig.AddRule(c => new ForwardSubscribeNativeRule(c.Endpoint.SubscriptionManager), c => EnableNativePubSub(c) && !SendOnly(c)); + routerConfig.AddRule(c => new ForwardUnsubscribeNativeRule(c.Endpoint.SubscriptionManager), c => EnableNativePubSub(c) && !SendOnly(c)); } static bool EnableNativePubSub(IRuleCreationContext context) @@ -22,6 +22,11 @@ static bool EnableNativePubSub(IRuleCreationContext context) return transport.OutboundRoutingPolicy.Publishes == OutboundRoutingType.Multicast; } + static bool SendOnly(IRuleCreationContext context) + { + return context.Endpoint.Settings.GetOrDefault("Endpoint.SendOnly"); + } + class ForwardPublishNullRule : ChainTerminator { static Task falseResult = Task.FromResult(false); From ed306de90a9b9074a9123164d617c077a1b1d3eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kato=20St=C3=B8len?= Date: Mon, 16 Mar 2020 10:10:13 +0100 Subject: [PATCH 3/5] Remove SendOnlyRawEndpointConfig SedOnlyRawEndpointConfig ended up being an unnecessary abstraction of RawEndpointConfiguration (NServiceBus.Raw). Using RawEndpointConfiguration directly, instead. --- src/NServiceBus.Router/SendOnlyInterface.cs | 21 ++--- .../SendOnlyRawEndpointConfig.cs | 76 ------------------- 2 files changed, 11 insertions(+), 86 deletions(-) delete mode 100644 src/NServiceBus.Router/SendOnlyRawEndpointConfig.cs diff --git a/src/NServiceBus.Router/SendOnlyInterface.cs b/src/NServiceBus.Router/SendOnlyInterface.cs index bbf0d8f..ee20e39 100644 --- a/src/NServiceBus.Router/SendOnlyInterface.cs +++ b/src/NServiceBus.Router/SendOnlyInterface.cs @@ -12,11 +12,11 @@ public SendOnlyInterface(string endpointName, string interfaceName, Action(endpointName, ext => - { - SetTransportSpecificFlags(ext.GetSettings()); - transportCustomization?.Invoke(ext); - }); + + config = RawEndpointConfiguration.CreateSendOnly(endpointName); + var transport = config.UseTransport(); + SetTransportSpecificFlags(transport.GetSettings()); + transportCustomization?.Invoke(transport); } public string Name { get; } @@ -28,14 +28,15 @@ static void SetTransportSpecificFlags(NServiceBus.Settings.SettingsHolder settin public async Task Initialize(InterfaceChains interfaces, RootContext rootContext) { - sender = await rawConfig.Create().ConfigureAwait(false); - var ruleCreationContext = ruleCreationContextFactory(sender); + startable = await RawEndpoint.Create(config).ConfigureAwait(false); + config = null; + var ruleCreationContext = ruleCreationContextFactory(startable); interfaces.InitializeInterface(Name, ruleCreationContext); } public async Task StartReceiving() { - receiver = await sender.Start().ConfigureAwait(false); + receiver = await startable.Start().ConfigureAwait(false); } public async Task StopReceiving() @@ -59,10 +60,10 @@ public async Task Stop() } } + RawEndpointConfiguration config; + IStartableRawEndpoint startable; IReceivingRawEndpoint receiver; - IStartableRawEndpoint sender; IStoppableRawEndpoint stoppable; - SendOnlyRawEndpointConfig rawConfig; Func ruleCreationContextFactory; } diff --git a/src/NServiceBus.Router/SendOnlyRawEndpointConfig.cs b/src/NServiceBus.Router/SendOnlyRawEndpointConfig.cs deleted file mode 100644 index c839741..0000000 --- a/src/NServiceBus.Router/SendOnlyRawEndpointConfig.cs +++ /dev/null @@ -1,76 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using NServiceBus; -using NServiceBus.Extensibility; -using NServiceBus.Logging; -using NServiceBus.Raw; -using NServiceBus.Settings; -using NServiceBus.Transport; - -class SendOnlyRawEndpointConfig : IStartableRawEndpoint, IReceivingRawEndpoint - where T : TransportDefinition, new() -{ - public SendOnlyRawEndpointConfig(string endpointName, Action> transportCustomization) - { - config = RawEndpointConfiguration.CreateSendOnly(endpointName); - var transport = config.UseTransport(); - transportCustomization(transport); - } - - public async Task Create() - { - startable = await RawEndpoint.Create(config); - config = null; - return this; - } - - async Task IStartableRawEndpoint.Start() - { - endpoint = await startable.Start().ConfigureAwait(false); - startable = null; - return this; - } - - async Task IReceivingRawEndpoint.StopReceiving() - { - await transitionSemaphore.WaitAsync().ConfigureAwait(false); - if (endpoint != null) - { - stoppable = await endpoint.StopReceiving().ConfigureAwait(false); - endpoint = null; - } - return this; - } - - async Task IStoppableRawEndpoint.Stop() - { - if (stoppable != null) - { - await stoppable.Stop().ConfigureAwait(false); - stoppable = null; - } - } - - string IRawEndpoint.ToTransportAddress(LogicalAddress logicalAddress) => startable?.ToTransportAddress(logicalAddress) ?? endpoint.ToTransportAddress(logicalAddress); - - Task IDispatchMessages.Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, ContextBag context) - { - return endpoint != null - ? endpoint.Dispatch(outgoingMessages, transaction, context) - : startable.Dispatch(outgoingMessages, transaction, context); - } - - string IRawEndpoint.TransportAddress => startable?.TransportAddress ?? endpoint.TransportAddress; - string IRawEndpoint.EndpointName => startable?.EndpointName ?? endpoint.EndpointName; - ReadOnlySettings IRawEndpoint.Settings => startable?.Settings ?? endpoint.Settings; - public IManageSubscriptions SubscriptionManager => startable?.SubscriptionManager ?? endpoint.SubscriptionManager; - - RawEndpointConfiguration config; - IReceivingRawEndpoint endpoint; - IStartableRawEndpoint startable; - SemaphoreSlim transitionSemaphore = new SemaphoreSlim(1); - - static ILog logger = LogManager.GetLogger(typeof(SendOnlyRawEndpointConfig)); - IStoppableRawEndpoint stoppable; -} \ No newline at end of file From 7fa912b65f38bba730234a60edc8f56eb2ce576d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kato=20St=C3=B8len?= Date: Mon, 16 Mar 2020 11:31:19 +0100 Subject: [PATCH 4/5] Use explicit extension methods instead of abstraction Removing the IInterfaceConfiguration abstraction and adding extension methods for SendOnlyInterfaceConfiguration instead. --- .../InterfaceConfiguration.cs | 17 +++-------- .../MessageDrivenSubscriptionExtensions.cs | 30 +++++++++++++++++-- .../NativeSubscriptionExtensions.cs | 14 ++++++++- .../SendOnlyInterfaceConfiguration.cs | 6 ++-- 4 files changed, 49 insertions(+), 18 deletions(-) diff --git a/src/NServiceBus.Router/InterfaceConfiguration.cs b/src/NServiceBus.Router/InterfaceConfiguration.cs index e15b873..b7899a9 100644 --- a/src/NServiceBus.Router/InterfaceConfiguration.cs +++ b/src/NServiceBus.Router/InterfaceConfiguration.cs @@ -6,22 +6,11 @@ using Transport; using Unicast.Subscriptions.MessageDrivenSubscriptions; - /// - /// Represents an interface configuration, - /// - public interface IInterfaceConfiguration - { - /// - /// Interface's extensibility settings. - /// - SettingsHolder Settings { get; } - } - /// /// Configures the switch port. /// /// Type of transport. - public class InterfaceConfiguration : IInterfaceConfiguration + public class InterfaceConfiguration where T : TransportDefinition, new() { Action> customization; @@ -30,7 +19,9 @@ public class InterfaceConfiguration : IInterfaceConfiguration int? maximumConcurrency; string overriddenEndpointName; - /// + /// + /// Interface's extensibility settings. + /// public SettingsHolder Settings { get; } = new SettingsHolder(); /// diff --git a/src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs b/src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs index 5b19f7b..ff25f15 100644 --- a/src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs +++ b/src/NServiceBus.Router/MessageDrivenSubscriptionExtensions.cs @@ -1,5 +1,6 @@ namespace NServiceBus.Router { + using Transport; using Unicast.Subscriptions.MessageDrivenSubscriptions; /// @@ -12,7 +13,8 @@ public static class MessageDrivenSubscriptionExtensions /// /// Interface configuration. /// Subscription storage. - public static void EnableMessageDrivenPublishSubscribe(this IInterfaceConfiguration interfaceConfig, ISubscriptionStorage subscriptionStorage) + public static void EnableMessageDrivenPublishSubscribe(this InterfaceConfiguration interfaceConfig, ISubscriptionStorage subscriptionStorage) + where T : TransportDefinition, new() { interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", true); interfaceConfig.Settings.Set(subscriptionStorage); @@ -21,8 +23,32 @@ public static void EnableMessageDrivenPublishSubscribe(this IInterfaceConfigurat /// /// Disables message-driven storage-based publish/subscribe for a given interface. /// + /// /// - public static void DisableMessageDrivenPublishSubscribe(this IInterfaceConfiguration interfaceConfig) + public static void DisableMessageDrivenPublishSubscribe(this InterfaceConfiguration interfaceConfig) + where T : TransportDefinition, new() + { + interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", false); + } + + /// + /// Enables message-driven storage-based publish/subscribe for a given send-only interface. + /// + /// Send-only interface configuration. + /// Subscription storage. + public static void EnableMessageDrivenPublishSubscribe(this SendOnlyInterfaceConfiguration interfaceConfig, ISubscriptionStorage subscriptionStorage) + where T : TransportDefinition, new() + { + interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", true); + interfaceConfig.Settings.Set(subscriptionStorage); + } + + /// + /// Disables message-driven storage-based publish/subscribe for a given send-only interface. + /// + /// Send-only interface configuration. + public static void DisableMessageDrivenPublishSubscribe(this SendOnlyInterfaceConfiguration interfaceConfig) + where T : TransportDefinition, new() { interfaceConfig.Settings.Set("EnableMessageDrivenPubSub", false); } diff --git a/src/NServiceBus.Router/NativeSubscriptionExtensions.cs b/src/NServiceBus.Router/NativeSubscriptionExtensions.cs index bd03f38..294df01 100644 --- a/src/NServiceBus.Router/NativeSubscriptionExtensions.cs +++ b/src/NServiceBus.Router/NativeSubscriptionExtensions.cs @@ -1,5 +1,7 @@ namespace NServiceBus.Router { + using Transport; + /// /// Configures native pub/sub /// @@ -8,7 +10,17 @@ public static class NativeSubscriptionExtensions /// /// Disables native publish/subscribe handling for a given interface. /// - public static void DisableNativePubSub(this IInterfaceConfiguration interfaceConfig) + public static void DisableNativePubSub(this InterfaceConfiguration interfaceConfig) + where T : TransportDefinition, new() + { + interfaceConfig.Settings.Set("NativePubSubDisabled", true); + } + + /// + /// Disables native publish/subscribe handling for a given send-only interface. + /// + public static void DisableNativePubSub(this SendOnlyInterfaceConfiguration interfaceConfig) + where T : TransportDefinition, new() { interfaceConfig.Settings.Set("NativePubSubDisabled", true); } diff --git a/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs b/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs index 83fb674..86873bd 100644 --- a/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs +++ b/src/NServiceBus.Router/SendOnlyInterfaceConfiguration.cs @@ -10,13 +10,15 @@ /// Configures the switch port. /// /// Type of transport. - public class SendOnlyInterfaceConfiguration : IInterfaceConfiguration + public class SendOnlyInterfaceConfiguration where T : TransportDefinition, new() { Action> customization; string overriddenEndpointName; - /// + /// + /// Interface's extensibility settings. + /// public SettingsHolder Settings { get; } = new SettingsHolder(); /// From bbd88b467380b236683d4758b9e3107ac06940e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kato=20St=C3=B8len?= Date: Mon, 16 Mar 2020 11:48:19 +0100 Subject: [PATCH 5/5] Disable N/A rules for send-only endpoints --- src/NServiceBus.Router/MessageDrivenPubSubFeature.cs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/NServiceBus.Router/MessageDrivenPubSubFeature.cs b/src/NServiceBus.Router/MessageDrivenPubSubFeature.cs index 003cfe8..8d05957 100644 --- a/src/NServiceBus.Router/MessageDrivenPubSubFeature.cs +++ b/src/NServiceBus.Router/MessageDrivenPubSubFeature.cs @@ -12,9 +12,9 @@ public void Configure(RouterConfiguration routerConfig) { routerConfig.AddRule(c => new ForwardPublishStorageDrivenRule(GetSubscriptionStorage(c), c.DistributionPolicy), c => IsEnabled(c)); routerConfig.AddRule(c => new ForwardPublishNullRule(), c => IsExplicitlyDisabled(c)); - routerConfig.AddRule(c => new ForwardSubscribeMessageDrivenRule(c.Endpoint.TransportAddress, c.Endpoint.EndpointName), c => IsEnabled(c)); - routerConfig.AddRule(c => new ForwardUnsubscribeMessageDrivenRule(c.Endpoint.TransportAddress, c.Endpoint.EndpointName), c => IsEnabled(c)); - routerConfig.AddRule(c => new StorageDrivenSubscriptionRule(GetSubscriptionStorage(c)), c => IsEnabled(c)); + routerConfig.AddRule(c => new ForwardSubscribeMessageDrivenRule(c.Endpoint.TransportAddress, c.Endpoint.EndpointName), c => IsEnabled(c) && !SendOnly(c)); + routerConfig.AddRule(c => new ForwardUnsubscribeMessageDrivenRule(c.Endpoint.TransportAddress, c.Endpoint.EndpointName), c => IsEnabled(c) && !SendOnly(c)); + routerConfig.AddRule(c => new StorageDrivenSubscriptionRule(GetSubscriptionStorage(c)), c => IsEnabled(c) && !SendOnly(c)); } static ISubscriptionStorage GetSubscriptionStorage(IRuleCreationContext c) @@ -45,6 +45,11 @@ static bool IsExplicitlyDisabled(IRuleCreationContext context) && false == context.Settings.Get(SettingsKey); } + static bool SendOnly(IRuleCreationContext context) + { + return context.Endpoint.Settings.GetOrDefault("Endpoint.SendOnly"); + } + class ForwardPublishNullRule : ChainTerminator { static Task falseResult = Task.FromResult(false);