diff --git a/GitVersion.yml b/GitVersion.yml index 8144ee242..2b3463947 100644 --- a/GitVersion.yml +++ b/GitVersion.yml @@ -1,5 +1,5 @@ assembly-versioning-scheme: Major -next-version: 4.2.1 +next-version: 5.0.0 branches: master: mode: ContinuousDeployment diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/AcceptanceTests.snk b/src/NServiceBus.SqlServer.AcceptanceTests/AcceptanceTests.snk new file mode 100644 index 000000000..9e5b4a21e Binary files /dev/null and b/src/NServiceBus.SqlServer.AcceptanceTests/AcceptanceTests.snk differ diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/ConfigureEndpointSqlServerTransport.cs b/src/NServiceBus.SqlServer.AcceptanceTests/ConfigureEndpointSqlServerTransport.cs index 9ed95dc00..3a508e73b 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/ConfigureEndpointSqlServerTransport.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/ConfigureEndpointSqlServerTransport.cs @@ -5,15 +5,19 @@ using NServiceBus; using NServiceBus.AcceptanceTesting.Support; using NServiceBus.Configuration.AdvancedExtensibility; +using NServiceBus.Settings; using NServiceBus.Transport; +using NServiceBus.Transport.SQLServer; public class ConfigureEndpointSqlServerTransport : IConfigureEndpointTestExecution { - public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata) + public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings runSettings, PublisherMetadata publisherMetadata) { queueBindings = configuration.GetSettings().Get(); - + settings = configuration.GetSettings(); + doNotCleanNativeSubscriptions = runSettings.TryGet("DoNotCleanNativeSubscriptions", out _); connectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString"); + if (string.IsNullOrEmpty(connectionString)) { throw new Exception("The 'SqlServerTransportConnectionString' environment variable is not set."); @@ -21,26 +25,20 @@ public Task Configure(string endpointName, EndpointConfiguration configuration, var transportConfig = configuration.UseTransport(); transportConfig.ConnectionString(connectionString); + transportConfig.SubscriptionSettings().DisableSubscriptionCache(); #if !NET452 transportConfig.Transactions(TransportTransactionMode.SendsAtomicWithReceive); #endif - - var routingConfig = transportConfig.Routing(); - - foreach (var publisher in publisherMetadata.Publishers) - { - foreach (var eventType in publisher.Events) - { - routingConfig.RegisterPublisher(eventType, publisher.PublisherName); - } - } - return Task.FromResult(0); } public Task Cleanup() { + var subscriptionSettings = settings.GetOrDefault() ?? new SubscriptionSettings(); + settings.TryGet(SettingsKeys.DefaultSchemaSettingsKey, out string defaultSchemaOverride); + var subscriptionTable = subscriptionSettings.SubscriptionTable.Qualify(defaultSchemaOverride ?? "dbo", "nservicebus"); + using (var conn = new SqlConnection(connectionString)) { conn.Open(); @@ -48,20 +46,25 @@ public Task Cleanup() var queueAddresses = queueBindings.ReceivingAddresses.Select(QueueAddress.Parse).ToList(); foreach (var address in queueAddresses) { - TryDeleteTable(conn, address); - TryDeleteTable(conn, new QueueAddress(address.Table + ".Delayed", address.Schema, address.Catalog)); + TryDeleteTable(conn, address.QualifiedTableName); + TryDeleteTable(conn, new QueueAddress(address.Table + ".Delayed", address.Schema, address.Catalog).QualifiedTableName); + } + + if (!doNotCleanNativeSubscriptions) + { + TryDeleteTable(conn, subscriptionTable.QuotedQualifiedName); } } return Task.FromResult(0); } - static void TryDeleteTable(SqlConnection conn, QueueAddress address) + static void TryDeleteTable(SqlConnection conn, string address) { try { using (var comm = conn.CreateCommand()) { - comm.CommandText = $"IF OBJECT_ID('{address.QualifiedTableName}', 'U') IS NOT NULL DROP TABLE {address.QualifiedTableName}"; + comm.CommandText = $"IF OBJECT_ID('{address}', 'U') IS NOT NULL DROP TABLE {address}"; comm.ExecuteNonQuery(); } } @@ -74,8 +77,10 @@ static void TryDeleteTable(SqlConnection conn, QueueAddress address) } } + bool doNotCleanNativeSubscriptions; string connectionString; QueueBindings queueBindings; + SettingsHolder settings; class QueueAddress { diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/EndpointConfigurationExtensions.cs b/src/NServiceBus.SqlServer.AcceptanceTests/EndpointConfigurationExtensions.cs new file mode 100644 index 000000000..9a0f43a58 --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/EndpointConfigurationExtensions.cs @@ -0,0 +1,10 @@ +using NServiceBus; +using NServiceBus.Configuration.AdvancedExtensibility; + +public static class EndpointConfigurationExtensions +{ + public static TransportExtensions ConfigureSqlServerTransport(this EndpointConfiguration endpointConfiguration) + { + return new TransportExtensions(endpointConfiguration.GetSettings()); + } +} diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/MultiCatalog/When_custom_catalog_configured_for_legacy_publisher.cs b/src/NServiceBus.SqlServer.AcceptanceTests/MultiCatalog/When_custom_catalog_configured_for_legacy_publisher.cs new file mode 100644 index 000000000..38def54cd --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/MultiCatalog/When_custom_catalog_configured_for_legacy_publisher.cs @@ -0,0 +1,89 @@ +namespace NServiceBus.SqlServer.AcceptanceTests.MultiCatalog +{ + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using Configuration.AdvancedExtensibility; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + using Transport.SQLServer; + + public class When_custom_catalog_configured_for_legacy_publisher : MultiCatalogAcceptanceTest + { + static string PublisherConnectionString => WithCustomCatalog(GetDefaultConnectionString(), "nservicebus1"); + static string SubscriberConnectionString => WithCustomCatalog(GetDefaultConnectionString(), "nservicebus2"); + static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(LegacyPublisher)); + + [Test] + public Task Should_receive_event() + { + return Scenario.Define() + .WithEndpoint(b => b.When(c => c.Subscribed, session => session.Publish(new Event()))) + .WithEndpoint(b => b.When(c => c.EndpointsStarted, s => s.Subscribe(typeof(Event)))) + .Done(c => c.EventReceived) + .Run(); + } + + class Context : ScenarioContext + { + public bool EventReceived { get; set; } + public bool Subscribed { get; set; } + } + + class LegacyPublisher : EndpointConfigurationBuilder + { + public LegacyPublisher() + { + EndpointSetup(c => + { + var transport = c.UseTransport(); + transport.ConnectionString(PublisherConnectionString); + + transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo", "nservicebus"); + transport.SubscriptionSettings().DisableSubscriptionCache(); + + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + c.OnEndpointSubscribed((s, context) => + { + if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(Subscriber)))) + { + context.Subscribed = true; + } + }); + }); + } + } + + class Subscriber : EndpointConfigurationBuilder + { + public Subscriber() + { + EndpointSetup(b => + { + var transport = b.UseTransport(); + transport.ConnectionString(SubscriberConnectionString); + + transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo", "nservicebus"); + + transport.UseCatalogForEndpoint(PublisherEndpoint, "nservicebus1"); + transport.EnableMessageDrivenPubSubCompatibilityMode().RegisterPublisher(typeof(Event), PublisherEndpoint); + }); + } + + class EventHandler : IHandleMessages + { + public Context Context { get; set; } + + public Task Handle(Event message, IMessageHandlerContext context) + { + Context.EventReceived = true; + return Task.FromResult(0); + } + } + } + + public class Event : IEvent + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/MultiCatalog/When_custom_catalog_configured_for_publisher_and_subscriber.cs b/src/NServiceBus.SqlServer.AcceptanceTests/MultiCatalog/When_custom_catalog_configured_for_publisher_and_subscriber.cs new file mode 100644 index 000000000..bc41b72f7 --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/MultiCatalog/When_custom_catalog_configured_for_publisher_and_subscriber.cs @@ -0,0 +1,81 @@ +namespace NServiceBus.SqlServer.AcceptanceTests.MultiCatalog +{ + using System.Threading.Tasks; + using AcceptanceTesting; + using Features; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + using Transport.SQLServer; + + public class When_custom_catalog_configured_for_publisher_and_subscriber : MultiCatalogAcceptanceTest + { + static string PublisherConnectionString => WithCustomCatalog(GetDefaultConnectionString(), "nservicebus1"); + static string SubscriberConnectionString => WithCustomCatalog(GetDefaultConnectionString(), "nservicebus2"); + + [Test] + public Task Should_receive_event() + { + return Scenario.Define() + .WithEndpoint(b => b.When(c => c.Subscribed, session => session.Publish(new Event()))) + .WithEndpoint(b => b.When(c => c.EndpointsStarted, async (s, ctx) => + { + await s.Subscribe(typeof(Event)).ConfigureAwait(false); + ctx.Subscribed = true; + })) + .Done(c => c.EventReceived) + .Run(); + } + + class Context : ScenarioContext + { + public bool EventReceived { get; set; } + public bool Subscribed { get; set; } + } + + class Publisher : EndpointConfigurationBuilder + { + public Publisher() + { + EndpointSetup(b => + { + var transport = b.UseTransport(); + transport.ConnectionString(PublisherConnectionString); + + transport.SubscriptionSettings().DisableSubscriptionCache(); + transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo", "nservicebus"); + }); + } + } + + class Subscriber : EndpointConfigurationBuilder + { + public Subscriber() + { + EndpointSetup(c => + { + var transport = c.UseTransport(); + + transport.ConnectionString(SubscriberConnectionString); + transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo", "nservicebus"); + + c.DisableFeature(); + }); + } + + class EventHandler : IHandleMessages + { + public Context Context { get; set; } + + public Task Handle(Event message, IMessageHandlerContext context) + { + Context.EventReceived = true; + return Task.FromResult(0); + } + } + } + + public class Event : IEvent + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_legacy_publisher.cs b/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_legacy_publisher.cs new file mode 100644 index 000000000..ccb3f299b --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_legacy_publisher.cs @@ -0,0 +1,85 @@ +namespace NServiceBus.SqlServer.AcceptanceTests.MultiSchema +{ + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using Configuration.AdvancedExtensibility; + using NServiceBus.AcceptanceTests; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + using Transport.SQLServer; + + public class When_custom_schema_configured_for_legacy_publisher : NServiceBusAcceptanceTest + { + [Test] + public Task Should_receive_event() + { + return Scenario.Define() + .WithEndpoint(b => b.When(c => c.Subscribed, session => session.Publish(new Event()))) + .WithEndpoint(b => b.When(c => c.EndpointsStarted, s => s.Subscribe(typeof(Event)))) + .Done(c => c.EventReceived) + .Run(); + } + + class Context : ScenarioContext + { + public bool EventReceived { get; set; } + public bool Subscribed { get; set; } + } + + class LegacyPublisher : EndpointConfigurationBuilder + { + public LegacyPublisher() + { + EndpointSetup(c => + { + var transport = c.UseTransport(); + transport.DefaultSchema("sender"); + transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo"); + transport.SubscriptionSettings().DisableSubscriptionCache(); + + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + c.OnEndpointSubscribed((s, context) => + { + if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(Subscriber)))) + { + context.Subscribed = true; + } + }); + }); + } + } + + class Subscriber : EndpointConfigurationBuilder + { + public Subscriber() + { + EndpointSetup(b => + { + var publisherEndpoint = Conventions.EndpointNamingConvention(typeof(LegacyPublisher)); + + var transport = b.UseTransport(); + transport.DefaultSchema("receiver").UseSchemaForEndpoint(publisherEndpoint, "sender"); + transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo"); + + transport.EnableMessageDrivenPubSubCompatibilityMode().RegisterPublisher(typeof(Event), Conventions.EndpointNamingConvention(typeof(LegacyPublisher))); + }); + } + + class EventHandler : IHandleMessages + { + public Context Context { get; set; } + + public Task Handle(Event message, IMessageHandlerContext context) + { + Context.EventReceived = true; + return Task.FromResult(0); + } + } + } + + public class Event : IEvent + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_publisher.cs b/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_publisher_and_subscriber.cs similarity index 60% rename from src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_publisher.cs rename to src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_publisher_and_subscriber.cs index 78a8366b4..024d91cce 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_publisher.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/MultiSchema/When_custom_schema_configured_for_publisher_and_subscriber.cs @@ -2,28 +2,32 @@ { using System.Threading.Tasks; using AcceptanceTesting; - using AcceptanceTesting.Customization; + using Features; using NServiceBus.AcceptanceTests; using NServiceBus.AcceptanceTests.EndpointTemplates; using NUnit.Framework; using Transport.SQLServer; - public class When_custom_schema_configured_for_publisher : NServiceBusAcceptanceTest + public class When_custom_schema_configured_for_publisher_and_subscriber : NServiceBusAcceptanceTest { [Test] public Task Should_receive_event() { return Scenario.Define() .WithEndpoint(b => b.When(c => c.Subscribed, session => session.Publish(new Event()))) - .WithEndpoint() + .WithEndpoint(b => b.When(c => c.EndpointsStarted, async (s, ctx) => + { + await s.Subscribe(typeof(Event)).ConfigureAwait(false); + ctx.Subscribed = true; + })) .Done(c => c.EventReceived) .Run(); } class Context : ScenarioContext { - public bool Subscribed { get; set; } public bool EventReceived { get; set; } + public bool Subscribed { get; set; } } class Publisher : EndpointConfigurationBuilder @@ -32,10 +36,11 @@ public Publisher() { EndpointSetup(b => { - b.UseTransport() - .DefaultSchema("sender"); + var transport = b.UseTransport(); + transport.DefaultSchema("sender"); - b.OnEndpointSubscribed((args, context) => { context.Subscribed = true; }); + transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo"); + transport.SubscriptionSettings().DisableSubscriptionCache(); }); } } @@ -44,16 +49,13 @@ class Subscriber : EndpointConfigurationBuilder { public Subscriber() { - EndpointSetup(b => + EndpointSetup(c => { - var publisherEndpoint = Conventions.EndpointNamingConvention(typeof(Publisher)); + var transport = c.UseTransport(); + transport.DefaultSchema("receiver"); - b.UseTransport() - .DefaultSchema("receiver") - .UseSchemaForEndpoint(publisherEndpoint, "sender") - .Routing().RegisterPublisher( - eventType: typeof(Event), - publisherEndpoint: publisherEndpoint); + transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo"); + c.DisableFeature(); }); } diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj index ec43dd976..dbf0566ab 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj @@ -3,6 +3,8 @@ net452;netcoreapp2.1 true + true + AcceptanceTests.snk diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_migrating_publisher_first.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_migrating_publisher_first.cs new file mode 100644 index 000000000..258638114 --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_migrating_publisher_first.cs @@ -0,0 +1,189 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Support; + using Configuration.AdvancedExtensibility; + using EndpointTemplates; + using Features; + using NServiceBus.Routing.MessageDrivenSubscriptions; + using NUnit.Framework; + using Conventions = AcceptanceTesting.Customization.Conventions; + + public class When_migrating_publisher_first : NServiceBusAcceptanceTest + { + static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(Publisher)); + + [Test] + public async Task Should_not_lose_any_events() + { + var subscriptionStorage = new TestingInMemorySubscriptionStorage(); + + //Before migration begins + var beforeMigration = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.UsePersistence().UseStorage(subscriptionStorage); + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + }); + b.When(c => c.SubscribedMessageDriven, (session, ctx) => session.Publish(new MyEvent())); + }) + + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + c.GetSettings().GetOrCreate().AddOrReplacePublishers("LegacyConfig", new List + { + new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(PublisherEndpoint)) + }); + }); + b.When(async (session, ctx) => + { + await session.Subscribe(); + }); + }) + .Done(c => c.GotTheEvent) + .Run(TimeSpan.FromSeconds(30)); + + Assert.True(beforeMigration.GotTheEvent); + + //Publisher migrated and in compatibility mode + var publisherMigrated = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.UsePersistence().UseStorage(subscriptionStorage); + c.GetSettings().Set("NServiceBus.Subscriptions.EnableMigrationMode", true); + }); + b.When(c => c.EndpointsStarted, (session, ctx) => session.Publish(new MyEvent())); + }) + + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + c.GetSettings().GetOrCreate().AddOrReplacePublishers("LegacyConfig", new List + { + new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(PublisherEndpoint)) + }); + }); + b.When(async (session, ctx) => + { + await session.Subscribe(); + }); + }) + .Done(c => c.GotTheEvent) + .Run(TimeSpan.FromSeconds(30)); + + Assert.True(publisherMigrated.GotTheEvent); + + //Subscriber migrated and in compatibility mode + var subscriberMigratedRunSettings = new RunSettings + { + TestExecutionTimeout = TimeSpan.FromSeconds(30) + }; + subscriberMigratedRunSettings.Set("DoNotCleanNativeSubscriptions", true); + var subscriberMigrated = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.UsePersistence().UseStorage(subscriptionStorage); + c.GetSettings().Set("NServiceBus.Subscriptions.EnableMigrationMode", true); + }); + b.When(c => c.SubscribedMessageDriven && c.SubscribedNative, (session, ctx) => session.Publish(new MyEvent())); + }) + + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.GetSettings().Set("NServiceBus.Subscriptions.EnableMigrationMode", true); + var compatModeSettings = new SubscriptionMigrationModeSettings(c.GetSettings()); + compatModeSettings.RegisterPublisher(typeof(MyEvent), PublisherEndpoint); + }); + b.When(async (session, ctx) => + { + //Subscribes both using native feature and message-driven + await session.Subscribe(); + ctx.SubscribedNative = true; + }); + }) + .Done(c => c.GotTheEvent) + .Run(subscriberMigratedRunSettings); + + Assert.True(subscriberMigrated.GotTheEvent); + + //Compatibility mode disabled in both publisher and subscriber + var compatModeDisabled = await Scenario.Define() + .WithEndpoint(b => + { + b.When(c => c.EndpointsStarted, (session, ctx) => session.Publish(new MyEvent())); + }) + .WithEndpoint() + .Done(c => c.GotTheEvent) + .Run(TimeSpan.FromSeconds(30)); + + Assert.True(compatModeDisabled.GotTheEvent); + } + + public class Context : ScenarioContext + { + public bool GotTheEvent { get; set; } + public bool SubscribedMessageDriven { get; set; } + public bool SubscribedNative { get; set; } + } + + public class Publisher : EndpointConfigurationBuilder + { + public Publisher() + { + EndpointSetup(c => + { + c.OnEndpointSubscribed((s, context) => + { + if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(Subscriber)))) + { + context.SubscribedMessageDriven = true; + } + }); + }).IncludeType(); + } + } + + public class Subscriber : EndpointConfigurationBuilder + { + public Subscriber() + { + EndpointSetup(c => + { + c.DisableFeature(); + }, + metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + } + + public class MyEventHandler : IHandleMessages + { + public Context Context { get; set; } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + Context.GotTheEvent = true; + return Task.FromResult(0); + } + } + } + + public class MyEvent : IEvent + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_migrating_subscriber_first.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_migrating_subscriber_first.cs new file mode 100644 index 000000000..4abc4ff4c --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_migrating_subscriber_first.cs @@ -0,0 +1,193 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Support; + using Configuration.AdvancedExtensibility; + using EndpointTemplates; + using Features; + using NServiceBus.Routing.MessageDrivenSubscriptions; + using NUnit.Framework; + using Conventions = AcceptanceTesting.Customization.Conventions; + + public class When_migrating_subscriber_first : NServiceBusAcceptanceTest + { + static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(Publisher)); + + [Test] + public async Task Should_not_lose_any_events() + { + var subscriptionStorage = new TestingInMemorySubscriptionStorage(); + + //Before migration begins + var beforeMigration = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.UsePersistence().UseStorage(subscriptionStorage); + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + }); + b.When(c => c.SubscribedMessageDriven, (session, ctx) => session.Publish(new MyEvent())); + }) + + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + c.GetSettings().GetOrCreate().AddOrReplacePublishers("LegacyConfig", new List + { + new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(PublisherEndpoint)) + }); + }); + b.When(async (session, ctx) => + { + await session.Subscribe(); + }); + }) + .Done(c => c.GotTheEvent) + .Run(TimeSpan.FromSeconds(30)); + + Assert.True(beforeMigration.GotTheEvent); + + //Subscriber migrated and in compatibility mode. + var subscriberMigratedRunSettings = new RunSettings + { + TestExecutionTimeout = TimeSpan.FromSeconds(30) + }; + subscriberMigratedRunSettings.Set("DoNotCleanNativeSubscriptions", true); + var subscriberMigrated = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.UsePersistence().UseStorage(subscriptionStorage); + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + }); + b.When(c => c.SubscribedMessageDriven, (session, ctx) => session.Publish(new MyEvent())); + }) + + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.GetSettings().Set("NServiceBus.Subscriptions.EnableMigrationMode", true); + var compatModeSettings = new SubscriptionMigrationModeSettings(c.GetSettings()); + compatModeSettings.RegisterPublisher(typeof(MyEvent), PublisherEndpoint); + }); + b.When(async (session, ctx) => + { + //Subscribes both using native feature and message-driven + await session.Subscribe(); + ctx.SubscribedNative = true; + }); + }) + .Done(c => c.GotTheEvent && c.SubscribedNative) //we ensure the subscriber did subscriber with the native mechanism + .Run(subscriberMigratedRunSettings); + + Assert.True(subscriberMigrated.GotTheEvent); + + //Publisher migrated and in compatibility mode + var publisherMigratedRunSettings = new RunSettings + { + TestExecutionTimeout = TimeSpan.FromSeconds(30) + }; + publisherMigratedRunSettings.Set("DoNotCleanNativeSubscriptions", true); + var publisherMigrated = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.UsePersistence().UseStorage(subscriptionStorage); + c.GetSettings().Set("NServiceBus.Subscriptions.EnableMigrationMode", true); + }); + b.When(c => c.SubscribedMessageDriven && c.SubscribedNative, (session, ctx) => session.Publish(new MyEvent())); + }) + + .WithEndpoint(b => + { + b.CustomConfig(c => + { + c.GetSettings().Set("NServiceBus.Subscriptions.EnableMigrationMode", true); + var compatModeSettings = new SubscriptionMigrationModeSettings(c.GetSettings()); + compatModeSettings.RegisterPublisher(typeof(MyEvent), PublisherEndpoint); + }); + b.When(async (session, ctx) => + { + await session.Subscribe(); + ctx.SubscribedNative = true; + }); + }) + .Done(c => c.GotTheEvent) + .Run(publisherMigratedRunSettings); + + Assert.True(publisherMigrated.GotTheEvent); + + //Compatibility mode disabled in both publisher and subscriber + var compatModeDisabled = await Scenario.Define() + .WithEndpoint(b => + { + b.When(c => c.EndpointsStarted, (session, ctx) => session.Publish(new MyEvent())); + }) + .WithEndpoint() + .Done(c => c.GotTheEvent) + .Run(TimeSpan.FromSeconds(30)); + + Assert.True(compatModeDisabled.GotTheEvent); + } + + public class Context : ScenarioContext + { + public bool GotTheEvent { get; set; } + public bool SubscribedMessageDriven { get; set; } + public bool SubscribedNative { get; set; } + } + + public class Publisher : EndpointConfigurationBuilder + { + public Publisher() + { + EndpointSetup(c => + { + c.OnEndpointSubscribed((s, context) => + { + if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(Subscriber)))) + { + context.SubscribedMessageDriven = true; + } + }); + }).IncludeType(); + } + } + + public class Subscriber : EndpointConfigurationBuilder + { + public Subscriber() + { + EndpointSetup(c => + { + c.DisableFeature(); + }, + metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + } + + public class MyEventHandler : IHandleMessages + { + public Context Context { get; set; } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + Context.GotTheEvent = true; + return Task.FromResult(0); + } + } + } + + public class MyEvent : IEvent + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_publisher_runs_in_compat_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_publisher_runs_in_compat_mode.cs new file mode 100644 index 000000000..c262ba83e --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_publisher_runs_in_compat_mode.cs @@ -0,0 +1,87 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using Configuration.AdvancedExtensibility; + using EndpointTemplates; + using Features; + using NServiceBus.Routing.MessageDrivenSubscriptions; + using NUnit.Framework; + using Transport.SQLServer; + + public class When_publisher_runs_in_compat_mode : NServiceBusAcceptanceTest + { + static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(MigratedPublisher)); + + [Test] + public async Task Legacy_subscriber_can_subscribe() + { + var publisherMigrated = await Scenario.Define() + .WithEndpoint(b => b.When(c => c.SubscribedMessageDriven, (session, ctx) => session.Publish(new MyEvent()))) + .WithEndpoint(b => b.When((session, ctx) => session.Subscribe())) + .Done(c => c.GotTheEvent) + .Run(TimeSpan.FromSeconds(30)); + + Assert.True(publisherMigrated.GotTheEvent); + } + + public class Context : ScenarioContext + { + public bool GotTheEvent { get; set; } + public bool SubscribedMessageDriven { get; set; } + } + + public class MigratedPublisher : EndpointConfigurationBuilder + { + public MigratedPublisher() + { + EndpointSetup(c => + { + c.ConfigureSqlServerTransport().EnableMessageDrivenPubSubCompatibilityMode(); + c.OnEndpointSubscribed((s, context) => + { + if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(Subscriber)))) + { + context.SubscribedMessageDriven = true; + } + }); + }).IncludeType(); + } + } + + public class Subscriber : EndpointConfigurationBuilder + { + public Subscriber() + { + EndpointSetup(c => + { + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + //SqlServerTransport no longer implements message-driven pub sub interface so we need to configure Publishers "manually" + c.GetSettings().GetOrCreate().AddOrReplacePublishers("LegacyConfig", new List + { + new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(PublisherEndpoint)) + }); + c.DisableFeature(); + }); + } + + public class MyEventHandler : IHandleMessages + { + public Context Context { get; set; } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + Context.GotTheEvent = true; + return Task.FromResult(0); + } + } + } + + public class MyEvent : IEvent + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_subscriber_runs_in_compat_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_subscriber_runs_in_compat_mode.cs new file mode 100644 index 000000000..86954810f --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativePubSub/When_subscriber_runs_in_compat_mode.cs @@ -0,0 +1,81 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub +{ + using System; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using Configuration.AdvancedExtensibility; + using EndpointTemplates; + using Features; + using NUnit.Framework; + using Transport.SQLServer; + + public class When_subscriber_runs_in_compat_mode : NServiceBusAcceptanceTest + { + static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(LegacyPublisher)); + + [Test] + public async Task It_can_subscribe_for_event_published_by_legacy_publisher() + { + var publisherMigrated = await Scenario.Define() + .WithEndpoint(b => b.When(c => c.SubscribedMessageDriven, (session, ctx) => session.Publish(new MyEvent()))) + .WithEndpoint(b => b.When((session, ctx) => session.Subscribe())) + .Done(c => c.GotTheEvent) + .Run(TimeSpan.FromSeconds(30)); + + Assert.True(publisherMigrated.GotTheEvent); + } + + public class Context : ScenarioContext + { + public bool GotTheEvent { get; set; } + public bool SubscribedMessageDriven { get; set; } + } + + public class LegacyPublisher : EndpointConfigurationBuilder + { + public LegacyPublisher() + { + EndpointSetup(c => + { + c.GetSettings().Set("SqlServer.DisableNativePubSub", true); + c.OnEndpointSubscribed((s, context) => + { + if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(MigratedSubscriber)))) + { + context.SubscribedMessageDriven = true; + } + }); + }).IncludeType(); + } + } + + public class MigratedSubscriber : EndpointConfigurationBuilder + { + public MigratedSubscriber() + { + EndpointSetup(c => + { + var compatMode = c.ConfigureSqlServerTransport().EnableMessageDrivenPubSubCompatibilityMode(); + compatMode.RegisterPublisher(typeof(MyEvent), PublisherEndpoint); + c.DisableFeature(); + }); + } + + public class MyEventHandler : IHandleMessages + { + public Context Context { get; set; } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + Context.GotTheEvent = true; + return Task.FromResult(0); + } + } + } + + public class MyEvent : IEvent + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_configuring_delayed_delivery.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_configuring_delayed_delivery.cs index 4b8cefd91..f03009867 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_configuring_delayed_delivery.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_configuring_delayed_delivery.cs @@ -87,7 +87,7 @@ public EndpointWithTimeoutManagerAndNative() EndpointSetup(config => { config.EnableFeature(); - config.UseTransport().UseNativeDelayedDelivery(); + config.UseTransport().NativeDelayedDelivery(); }); } } @@ -98,7 +98,7 @@ public EndpointWithOnlyNative() { EndpointSetup(config => { - var settings = config.UseTransport().UseNativeDelayedDelivery(); + var settings = config.UseTransport().NativeDelayedDelivery(); settings.DisableTimeoutManagerCompatibility(); }); } @@ -112,7 +112,7 @@ public EndpointWithTimeoutManagerAndNativeEnabledButCompatibilityDisabled() { config.EnableFeature(); - var settings = config.UseTransport().UseNativeDelayedDelivery(); + var settings = config.UseTransport().NativeDelayedDelivery(); settings.DisableTimeoutManagerCompatibility(); }); } diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_in_native_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_in_native_mode.cs index 1256c0370..fd4a4302e 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_in_native_mode.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_in_native_mode.cs @@ -43,7 +43,7 @@ public class Endpoint : EndpointConfigurationBuilder { public Endpoint() { - EndpointSetup(config => config.UseTransport().UseNativeDelayedDelivery()); + EndpointSetup(config => config.UseTransport().NativeDelayedDelivery()); } public class MyMessageHandler : IHandleMessages diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_in_timeout_manager_compatibility_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_in_timeout_manager_compatibility_mode.cs index 77fbf7273..074c4409b 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_in_timeout_manager_compatibility_mode.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_in_timeout_manager_compatibility_mode.cs @@ -71,7 +71,7 @@ public CompatibilityModeEndpoint() EndpointSetup(c => { c.EnableFeature(); //Because the acceptance tests framework disables it by default. - c.UseTransport().UseNativeDelayedDelivery(); + c.UseTransport().NativeDelayedDelivery(); }); } } diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_to_the_past_in_native_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_to_the_past_in_native_mode.cs index a065a41fa..3e0ca8f77 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_to_the_past_in_native_mode.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_deferring_a_message_to_the_past_in_native_mode.cs @@ -37,7 +37,7 @@ public class Endpoint : EndpointConfigurationBuilder { public Endpoint() { - EndpointSetup(config => config.UseTransport().UseNativeDelayedDelivery()); + EndpointSetup(config => config.UseTransport().NativeDelayedDelivery()); } public class MyMessageHandler : IHandleMessages diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_using_TTBR_for_deferred_message_in_native_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_using_TTBR_for_deferred_message_in_native_mode.cs index 2e625aaa5..715c40257 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_using_TTBR_for_deferred_message_in_native_mode.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NativeTimeouts/When_using_TTBR_for_deferred_message_in_native_mode.cs @@ -36,7 +36,7 @@ public class Endpoint : EndpointConfigurationBuilder { public Endpoint() { - EndpointSetup(config => config.UseTransport().UseNativeDelayedDelivery()); + EndpointSetup(config => config.UseTransport().NativeDelayedDelivery()); } public class MyMessageHandler : IHandleMessages diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/TestSuiteConstraints.cs b/src/NServiceBus.SqlServer.AcceptanceTests/TestSuiteConstraints.cs index 9a5c10d99..73b57b1f3 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/TestSuiteConstraints.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/TestSuiteConstraints.cs @@ -10,7 +10,7 @@ public partial class TestSuiteConstraints public bool SupportsDtc => false; #endif public bool SupportsCrossQueueTransactions => true; - public bool SupportsNativePubSub => false; + public bool SupportsNativePubSub => true; public bool SupportsNativeDeferral => true; public bool SupportsOutbox => true; public IConfigureEndpointTestExecution CreateTransportConfiguration() => new ConfigureEndpointSqlServerTransport(); diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/TestingInMemoryPersistence.cs b/src/NServiceBus.SqlServer.AcceptanceTests/TestingInMemoryPersistence.cs new file mode 100644 index 000000000..67691b7ae --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/TestingInMemoryPersistence.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.Configuration.AdvancedExtensibility; +using NServiceBus.Extensibility; +using NServiceBus.Features; +using NServiceBus.Persistence; +using NServiceBus.Unicast.Subscriptions; +using NServiceBus.Unicast.Subscriptions.MessageDrivenSubscriptions; + +public class TestingInMemoryPersistence : PersistenceDefinition +{ + internal TestingInMemoryPersistence() + { + Supports(s => + { + s.EnableFeatureByDefault(); + }); + } +} + +public static class InMemoryPersistenceExtensions +{ + public static void UseStorage(this PersistenceExtensions extensions, TestingInMemorySubscriptionStorage storageInstance) + { + extensions.GetSettings().Set("InMemoryPersistence.StorageInstance", storageInstance); + } +} + +public class TestingInMemorySubscriptionPersistence : Feature +{ + internal TestingInMemorySubscriptionPersistence() + { + } + + protected override void Setup(FeatureConfigurationContext context) + { + var storageInstance = context.Settings.GetOrDefault("InMemoryPersistence.StorageInstance"); + context.Container.RegisterSingleton(storageInstance ?? new TestingInMemorySubscriptionStorage()); + } +} + +public class TestingInMemorySubscriptionStorage : ISubscriptionStorage +{ + public Task Subscribe(Subscriber subscriber, MessageType messageType, ContextBag context) + { + var dict = storage.GetOrAdd(messageType, type => new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase)); + + dict.AddOrUpdate(BuildKey(subscriber), _ => subscriber, (_, __) => subscriber); + return Task.FromResult(true); + } + + static string BuildKey(Subscriber subscriber) + { + return $"{subscriber.TransportAddress ?? ""}_{subscriber.Endpoint ?? ""}"; + } + + public Task Unsubscribe(Subscriber subscriber, MessageType messageType, ContextBag context) + { + if (storage.TryGetValue(messageType, out var dict)) + { + dict.TryRemove(BuildKey(subscriber), out var _); + } + return Task.FromResult(true); + } + + public Task> GetSubscriberAddressesForMessage(IEnumerable messageTypes, ContextBag context) + { + var result = new HashSet(); + foreach (var m in messageTypes) + { + if (storage.TryGetValue(m, out var list)) + { + result.UnionWith(list.Values); + } + } + return Task.FromResult((IEnumerable)result); + } + + ConcurrentDictionary> storage = new ConcurrentDictionary>(); +} diff --git a/src/NServiceBus.SqlServer.IntegrationTests/SqlServerTransportTests.cs b/src/NServiceBus.SqlServer.IntegrationTests/SqlServerTransportTests.cs index 5157b8303..f41ad0adc 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/SqlServerTransportTests.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/SqlServerTransportTests.cs @@ -32,6 +32,9 @@ public void It_reads_catalog_from_open_connection() }; var settings = new SettingsHolder(); settings.Set(SettingsKeys.ConnectionFactoryOverride, factory); + var pubSubSettings = new SubscriptionSettings(); + pubSubSettings.DisableSubscriptionCache(); + settings.Set(pubSubSettings); definition.Initialize(settings, "Invalid-connection-string"); Assert.Pass(); } diff --git a/src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs b/src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs index e2acf1d8f..bc3cd1ebb 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs @@ -44,7 +44,8 @@ public async Task It_returns_type_for_headers_column() static async Task ResetQueue(QueueAddressTranslator addressTranslator, SqlConnectionFactory sqlConnectionFactory) { - var queueCreator = new QueueCreator(sqlConnectionFactory, addressTranslator); + var queueCreator = new QueueCreator(sqlConnectionFactory, addressTranslator, + new CanonicalQueueAddress("Delayed", "dbo", "nservicebus")); var queueBindings = new QueueBindings(); queueBindings.BindReceiving(QueueTableName); diff --git a/src/NServiceBus.SqlServer.IntegrationTests/When_dispatching_messages.cs b/src/NServiceBus.SqlServer.IntegrationTests/When_dispatching_messages.cs index 8e8755955..dd3584742 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/When_dispatching_messages.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/When_dispatching_messages.cs @@ -102,12 +102,13 @@ public void Prepare() async Task PrepareAsync() { var addressParser = new QueueAddressTranslator("nservicebus", "dbo", null, null); + var tableCache = new TableBasedQueueCache(addressParser); await CreateOutputQueueIfNecessary(addressParser, sqlConnectionFactory); await PurgeOutputQueue(addressParser); - dispatcher = new MessageDispatcher(new TableBasedQueueDispatcher(sqlConnectionFactory, new TableBasedQueueOperationsReader(addressParser)), addressParser); + dispatcher = new MessageDispatcher(addressParser, new NoOpMulticastToUnicastConverter(), tableCache, null, sqlConnectionFactory); } Task PurgeOutputQueue(QueueAddressTranslator addressTranslator) @@ -121,7 +122,7 @@ Task PurgeOutputQueue(QueueAddressTranslator addressTranslator) static Task CreateOutputQueueIfNecessary(QueueAddressTranslator addressTranslator, SqlConnectionFactory sqlConnectionFactory) { - var queueCreator = new QueueCreator(sqlConnectionFactory, addressTranslator); + var queueCreator = new QueueCreator(sqlConnectionFactory, addressTranslator, new CanonicalQueueAddress("Delayed", "dbo", "nservicebus")); var queueBindings = new QueueBindings(); queueBindings.BindReceiving(validAddress); @@ -136,6 +137,14 @@ static Task CreateOutputQueueIfNecessary(QueueAddressTranslator addressTranslato SqlConnectionFactory sqlConnectionFactory; + class NoOpMulticastToUnicastConverter : IMulticastToUnicastConverter + { + public Task> Convert(MulticastTransportOperation transportOperation) + { + return Task.FromResult(new List()); + } + } + interface IContextProvider : IDisposable { ContextBag Context { get; } diff --git a/src/NServiceBus.SqlServer.IntegrationTests/When_message_receive_takes_long.cs b/src/NServiceBus.SqlServer.IntegrationTests/When_message_receive_takes_long.cs index 94aa14a63..46f052328 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/When_message_receive_takes_long.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/When_message_receive_takes_long.cs @@ -95,7 +95,7 @@ static async Task ReceiveWithLongHandling(TableBasedQueue tableBasedQueue, SqlCo static Task CreateQueueIfNotExists(QueueAddressTranslator addressTranslator, SqlConnectionFactory sqlConnectionFactory) { - var queueCreator = new QueueCreator(sqlConnectionFactory, addressTranslator); + var queueCreator = new QueueCreator(sqlConnectionFactory, addressTranslator, new CanonicalQueueAddress("Delayed", "dbo", "nservicebus")); var queueBindings = new QueueBindings(); queueBindings.BindReceiving(QueueTableName); diff --git a/src/NServiceBus.SqlServer.IntegrationTests/When_receiving_messages.cs b/src/NServiceBus.SqlServer.IntegrationTests/When_receiving_messages.cs index 3c9cd6894..3ac3994aa 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/When_receiving_messages.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/When_receiving_messages.cs @@ -30,7 +30,7 @@ public async Task Should_stop_pumping_messages_after_first_unsuccessful_receive( var sqlConnectionFactory = SqlConnectionFactory.Default(connectionString); var pump = new MessagePump( - m => new ProcessWithNoTransaction(sqlConnectionFactory), + m => new ProcessWithNoTransaction(sqlConnectionFactory, null), qa => qa == "input" ? (TableBasedQueue)inputQueue : new TableBasedQueue(parser.Parse(qa).QualifiedTableName, qa), new QueuePurger(sqlConnectionFactory), new ExpiredMessagesPurger(_ => sqlConnectionFactory.OpenNewConnection(), 0, false), diff --git a/src/NServiceBus.SqlServer.IntegrationTests/When_using_ttbr.cs b/src/NServiceBus.SqlServer.IntegrationTests/When_using_ttbr.cs index 4d402af67..f9ea4c44b 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/When_using_ttbr.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/When_using_ttbr.cs @@ -121,6 +121,7 @@ public void Prepare() async Task PrepareAsync() { var addressParser = new QueueAddressTranslator("nservicebus", "dbo", null, new QueueSchemaAndCatalogSettings()); + var tableCache = new TableBasedQueueCache(addressParser); var connectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString"); if (string.IsNullOrEmpty(connectionString)) @@ -134,7 +135,7 @@ async Task PrepareAsync() await PurgeOutputQueue(addressParser); - dispatcher = new MessageDispatcher(new TableBasedQueueDispatcher(sqlConnectionFactory, new TableBasedQueueOperationsReader(addressParser)), addressParser); + dispatcher = new MessageDispatcher(addressParser, new NoOpMulticastToUnicastConverter(), tableCache, null, sqlConnectionFactory); } Task PurgeOutputQueue(QueueAddressTranslator addressParser) @@ -148,7 +149,7 @@ Task PurgeOutputQueue(QueueAddressTranslator addressParser) static Task CreateOutputQueueIfNecessary(QueueAddressTranslator addressParser, SqlConnectionFactory sqlConnectionFactory) { - var queueCreator = new QueueCreator(sqlConnectionFactory, addressParser); + var queueCreator = new QueueCreator(sqlConnectionFactory, addressParser, new CanonicalQueueAddress("Delayed", "dbo", "nservicebus")); var queueBindings = new QueueBindings(); queueBindings.BindReceiving(validAddress); @@ -161,5 +162,14 @@ static Task CreateOutputQueueIfNecessary(QueueAddressTranslator addressParser, S SqlConnectionFactory sqlConnectionFactory; const string validAddress = "TTBRTests"; + + class NoOpMulticastToUnicastConverter : IMulticastToUnicastConverter + { + public Task> Convert(MulticastTransportOperation transportOperation) + { + return Task.FromResult(new List()); + } + } + } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs index ae800b41c..46acf79f4 100644 --- a/src/NServiceBus.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs @@ -22,18 +22,25 @@ public TransportConfigurationResult Configure(SettingsHolder settings, Transport this.settings = settings; settings.Set(transportTransactionMode); settings.Set("NServiceBus.SharedQueue", settings.EndpointName()); - settings.Set(LogicalAddress.CreateLocalAddress(settings.EndpointName(), new Dictionary())); var delayedDeliverySettings = new DelayedDeliverySettings(); delayedDeliverySettings.TableSuffix("Delayed"); settings.Set(delayedDeliverySettings); + + var pubSubSettings = new SubscriptionSettings(); + pubSubSettings.DisableSubscriptionCache(); + settings.Set(pubSubSettings); + connectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString"); if (string.IsNullOrEmpty(connectionString)) { connectionString = @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True"; } + + var logicalAddress = LogicalAddress.CreateLocalAddress(settings.ErrorQueueAddress(), new Dictionary()); + var localAddress = settings.EndpointName(); return new TransportConfigurationResult { - TransportInfrastructure = new SqlServerTransport().Initialize(settings, connectionString) + TransportInfrastructure = new SqlServerTransportInfrastructure("nservicebus", settings, connectionString, () => localAddress, () => logicalAddress) }; } diff --git a/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj b/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj index 99b448497..0d7083d02 100644 --- a/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj +++ b/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj @@ -2,6 +2,8 @@ net46;netcoreapp2.1 + true + ..\NServiceBus.snk true diff --git a/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt index 28dbd638d..92cc11d77 100644 --- a/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -1,9 +1,27 @@ -[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.IntegrationTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.AcceptanceTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100d5a2fc697d5277272662d3154a752010b3de6d598204c10c4b09ebb28b469640efcf04978e95a15f4e0461f02316c96b349083a2e2a4f07fe7dfb713b99189b634473c73c1387149a37dbc836028bc2ca21de196bbd374f4024b920a0da86fe47bf541771352246cd8ef54d48654f39f4073aa114b70dc7d4712c3d9dd83faad")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.IntegrationTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.TransportTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] [assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] namespace NServiceBus { - public class SqlServerTransport : NServiceBus.Transport.TransportDefinition, NServiceBus.Routing.IMessageDrivenSubscriptionTransport + public class static MessageDrivenPubSubCompatibility + { + [System.ObsoleteAttribute("Publishing can not be disabled in version 5.0 and above. The transport handles pu" + + "blish-subscribe natively and does not require a separate subscription persistenc" + + "e. The member currently throws a NotImplementedException. Will be removed in ver" + + "sion 6.0.0.", true)] + public static void DisablePublishing(this NServiceBus.TransportExtensions transportExtensions) { } + [System.ObsoleteAttribute(@"Publisher registration has been moved to message-driven pub-sub migration mode.\r\n\r\nvar compatMode = transport.EnableMessageDrivenPubSubCompatibilityMode();\r\ncompatMode.RegisterPublisher(eventType, publisherEndpoint);. Use `SubscriptionMigrationModeSettings.RegisterPublisher(routingSettings, eventType, publisherEndpoint)` instead. The member currently throws a NotImplementedException. Will be removed in version 6.0.0.", true)] + public static void RegisterPublisher(this NServiceBus.RoutingSettings routingSettings, System.Type eventType, string publisherEndpoint) { } + [System.ObsoleteAttribute(@"Publisher registration has been moved to message-driven pub-sub migration mode.\r\n\r\nvar compatMode = transport.EnableMessageDrivenPubSubCompatibilityMode();\r\ncompatMode.RegisterPublisher(assembly, publisherEndpoint);. Use `SubscriptionMigrationModeSettings.RegisterPublisher(routingSettings, assembly, publisherEndpoint)` instead. The member currently throws a NotImplementedException. Will be removed in version 6.0.0.", true)] + public static void RegisterPublisher(this NServiceBus.RoutingSettings routingSettings, System.Reflection.Assembly assembly, string publisherEndpoint) { } + [System.ObsoleteAttribute(@"Publisher registration has been moved to message-driven pub-sub migration mode.\r\n\r\nvar compatMode = transport.EnableMessageDrivenPubSubCompatibilityMode();\r\ncompatMode.RegisterPublisher(assembly, namespace, publisherEndpoint);. Use `SubscriptionMigrationModeSettings.RegisterPublisher(routingSettings, assembly, namespace, publisherEndpoint)` instead. The member currently throws a NotImplementedException. Will be removed in version 6.0.0.", true)] + public static void RegisterPublisher(this NServiceBus.RoutingSettings routingSettings, System.Reflection.Assembly assembly, string @namespace, string publisherEndpoint) { } + [System.ObsoleteAttribute(@"Subscription authorization has been moved to message-driven pub-sub migration mode. \r\n\r\nvar compatMode = transport.EnableMessageDrivenPubSubCompatibilityMode();\r\ncompatMode.SubscriptionAuthorizer(authorizer);. Use `SubscriptionMigrationModeSettings.SubscriptionAuthorizer(transportExtensions, authorizer)` instead. The member currently throws a NotImplementedException. Will be removed in version 6.0.0.", true)] + public static void SubscriptionAuthorizer(this NServiceBus.TransportExtensions transportExtensions, System.Func authorizer) { } + } + public class SqlServerTransport : NServiceBus.Transport.TransportDefinition { public SqlServerTransport() { } public override string ExampleConnectionStringForErrorMessage { get; } @@ -21,11 +39,15 @@ namespace NServiceBus.Transport.SQLServer public void ProcessingInterval(System.TimeSpan interval) { } public void TableSuffix(string suffix) { } } + public class static MessageDrivenPubSubCompatibilityModeConfiguration + { + public static NServiceBus.SubscriptionMigrationModeSettings EnableMessageDrivenPubSubCompatibilityMode(this NServiceBus.TransportExtensions transportExtensions) { } + } public class static SendOptionsExtensions { [System.ObsoleteAttribute("The connection parameter is no longer required. Use `UseCustomSqlTransaction` ins" + - "tead. Will be treated as an error from version 5.0.0. Will be removed in version" + - " 6.0.0.", false)] + "tead. The member currently throws a NotImplementedException. Will be removed in " + + "version 6.0.0.", true)] public static void UseCustomSqlConnectionAndTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlConnection connection, System.Data.SqlClient.SqlTransaction transaction) { } public static void UseCustomSqlTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlTransaction transaction) { } } @@ -35,58 +57,45 @@ namespace NServiceBus.Transport.SQLServer public static readonly string AddMessageBodyStringColumn; public static readonly string CheckHeadersColumnType; public static readonly string CheckIfExpiresIndexIsPresent; - public const string CreateDelayedMessageStoreText = @" -IF EXISTS ( - SELECT * - FROM {1}.sys.objects - WHERE object_id = OBJECT_ID(N'{0}') - AND type in (N'U')) -RETURN -EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive' -IF EXISTS ( - SELECT * - FROM {1}.sys.objects - WHERE object_id = OBJECT_ID(N'{0}') - AND type in (N'U')) -BEGIN - EXEC sp_releaseapplock @Resource = '{0}_lock' - RETURN -END -CREATE TABLE {0} ( - Headers nvarchar(max) NOT NULL, - Body varbinary(max), - Due datetime NOT NULL, - RowVersion bigint IDENTITY(1,1) NOT NULL -); -CREATE NONCLUSTERED INDEX [Index_Due] ON {0} -( - [Due] -) -EXEC sp_releaseapplock @Resource = '{0}_lock'"; + public static readonly string CreateDelayedMessageStoreText; public static readonly string CreateQueueText; + public static readonly string CreateSubscriptionTableText; + public static readonly string GetSubscribersText; + public static readonly string MoveDueDelayedMessageText; public static readonly string PeekText; public static readonly string PurgeBatchOfExpiredMessagesText; public static readonly string PurgeText; public static readonly string ReceiveText; public static readonly string SendText; + public static readonly string StoreDelayedMessageText; + public static readonly string SubscribeText; + public static readonly string UnsubscribeText; } public class static SqlServerTransportSettingsExtensions { public static NServiceBus.TransportExtensions CreateMessageBodyComputedColumn(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions DefaultSchema(this NServiceBus.TransportExtensions transportExtensions, string schemaName) { } - [System.ObsoleteAttribute("Multi-instance mode has been deprecated. Use Transport Bridge and/or multi-catalo" + - "g addressing instead. The member currently throws a NotImplementedException. Wil" + - "l be removed in version 5.0.0.", true)] - public static NServiceBus.TransportExtensions EnableLegacyMultiInstanceMode(this NServiceBus.TransportExtensions transportExtensions, System.Func> sqlConnectionFactory) { } + public static NServiceBus.Transport.SQLServer.DelayedDeliverySettings NativeDelayedDelivery(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions PurgeExpiredMessagesOnStartup(this NServiceBus.TransportExtensions transportExtensions, System.Nullable purgeBatchSize) { } + public static NServiceBus.Transport.SQLServer.SubscriptionSettings SubscriptionSettings(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions TimeToWaitBeforeTriggeringCircuitBreaker(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan waitTime) { } public static NServiceBus.TransportExtensions TransactionScopeOptions(this NServiceBus.TransportExtensions transportExtensions, System.Nullable timeout = null, System.Nullable isolationLevel = null) { } public static NServiceBus.TransportExtensions UseCatalogForEndpoint(this NServiceBus.TransportExtensions transportExtensions, string endpointName, string catalog) { } public static NServiceBus.TransportExtensions UseCatalogForQueue(this NServiceBus.TransportExtensions transportExtensions, string queueName, string catalog) { } public static NServiceBus.TransportExtensions UseCustomSqlConnectionFactory(this NServiceBus.TransportExtensions transportExtensions, System.Func> sqlConnectionFactory) { } + [System.ObsoleteAttribute("Starting from version 5 native delayed delivery is always enabled. It can be conf" + + "igured via NativeDelayedDelivery. Use `NativeDelayedDelivery` instead. The membe" + + "r currently throws a NotImplementedException. Will be removed in version 6.0.0.", true)] public static NServiceBus.Transport.SQLServer.DelayedDeliverySettings UseNativeDelayedDelivery(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions UseSchemaForEndpoint(this NServiceBus.TransportExtensions transportExtensions, string endpointName, string schema) { } public static NServiceBus.TransportExtensions UseSchemaForQueue(this NServiceBus.TransportExtensions transportExtensions, string queueName, string schema) { } public static NServiceBus.TransportExtensions WithPeekDelay(this NServiceBus.TransportExtensions transportExtensions, System.Nullable delay = null) { } } + public class SubscriptionSettings + { + public SubscriptionSettings() { } + public void CacheSubscriptionInformationFor(System.TimeSpan timeSpan) { } + public void DisableSubscriptionCache() { } + public void SubscriptionTableName(string tableName, string schemaName = null, string catalogName = null) { } + } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.UnitTests/Sending/MessageDispatcherTests.cs b/src/NServiceBus.SqlServer.UnitTests/Sending/OperationSorterTests.cs similarity index 55% rename from src/NServiceBus.SqlServer.UnitTests/Sending/MessageDispatcherTests.cs rename to src/NServiceBus.SqlServer.UnitTests/Sending/OperationSorterTests.cs index b3e9ec254..2ac03a122 100644 --- a/src/NServiceBus.SqlServer.UnitTests/Sending/MessageDispatcherTests.cs +++ b/src/NServiceBus.SqlServer.UnitTests/Sending/OperationSorterTests.cs @@ -2,26 +2,23 @@ { using System.Collections.Generic; using System.Linq; - using System.Threading.Tasks; - using Extensibility; using NUnit.Framework; using Routing; using Transport; using Transport.SQLServer; [TestFixture] - public class MessageDispatcherTests + public class OperationSorterTests { [TestCaseSource(nameof(TestCases))] - public async Task It_deduplicates_based_on_message_id_and_address(TransportOperations transportOperations, int expectedDispatchedMessageCount) + public void It_deduplicates_based_on_message_id_and_address(TransportOperations transportOperations, int expectedDispatchedMessageCount) { - var queueDispatcher = new FakeTableBasedQueueDispatcher(); + var queueAddressTranslator = new QueueAddressTranslator("nservicebus", "dbo", null, null); - var dispatcher = new MessageDispatcher(queueDispatcher, new QueueAddressTranslator("nservicebus", "dbo", null, null)); + var sortResult = transportOperations.UnicastTransportOperations.SortAndDeduplicate(queueAddressTranslator); - await dispatcher.Dispatch(transportOperations, new TransportTransaction(), new ContextBag()); - - Assert.AreEqual(expectedDispatchedMessageCount, queueDispatcher.DispatchedMessageIds.Count); + Assert.AreEqual(expectedDispatchedMessageCount, sortResult.DefaultDispatch.Count()); + Assert.IsNull(sortResult.IsolatedDispatch); } static object[] TestCases = @@ -61,26 +58,24 @@ public async Task It_deduplicates_based_on_message_id_and_address(TransportOpera } }; - static TransportOperation CreateTransportOperations(string messageId, string destination) + [Test] + public void It_sorts_isolated_and_default_dispatch() { - return new TransportOperation(new OutgoingMessage(messageId, new Dictionary(), new byte[0]), new UnicastAddressTag(destination)); - } + var queueAddressTranslator = new QueueAddressTranslator("nservicebus", "dbo", null, null); - class FakeTableBasedQueueDispatcher : IQueueDispatcher - { - public List DispatchedMessageIds = new List(); + var operations = new TransportOperations( + CreateTransportOperations("1", "dest", DispatchConsistency.Default), + CreateTransportOperations("2", "dest", DispatchConsistency.Isolated)); - public Task DispatchAsNonIsolated(List operations, TransportTransaction transportTransaction) - { - DispatchedMessageIds.AddRange(operations.Select(x => x.Message.MessageId)); - return Task.FromResult(0); - } + var sortResult = operations.UnicastTransportOperations.SortAndDeduplicate(queueAddressTranslator); - public Task DispatchAsIsolated(List operations) - { - DispatchedMessageIds.AddRange(operations.Select(x => x.Message.MessageId)); - return Task.FromResult(0); - } + Assert.AreEqual(1, sortResult.DefaultDispatch.Count()); + Assert.AreEqual(1, sortResult.IsolatedDispatch.Count()); + } + + static TransportOperation CreateTransportOperations(string messageId, string destination, DispatchConsistency dispatchConsistency = DispatchConsistency.Default) + { + return new TransportOperation(new OutgoingMessage(messageId, new Dictionary(), new byte[0]), new UnicastAddressTag(destination), dispatchConsistency); } } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.UnitTests/SqlServerTransportTests.cs b/src/NServiceBus.SqlServer.UnitTests/SqlServerTransportTests.cs index a25f1bbbc..2f6cd19c1 100644 --- a/src/NServiceBus.SqlServer.UnitTests/SqlServerTransportTests.cs +++ b/src/NServiceBus.SqlServer.UnitTests/SqlServerTransportTests.cs @@ -2,6 +2,7 @@ { using NUnit.Framework; using Settings; + using Transport.SQLServer; [TestFixture] public class SqlServerTransportTests @@ -10,7 +11,12 @@ public class SqlServerTransportTests public void It_rejects_connection_string_without_catalog_property() { var definition = new SqlServerTransport(); - Assert.That( () => definition.Initialize(new SettingsHolder(), @"Data Source=.\SQLEXPRESS;Integrated Security=True"), + var subscriptionSettings = new SubscriptionSettings(); + subscriptionSettings.DisableSubscriptionCache(); + var settings = new SettingsHolder(); + settings.Set(subscriptionSettings); + + Assert.That( () => definition.Initialize(settings, @"Data Source=.\SQLEXPRESS;Integrated Security=True"), Throws.Exception.Message.Contains("Initial Catalog property is mandatory in the connection string.")); } @@ -22,7 +28,13 @@ public void It_rejects_connection_string_without_catalog_property() public void It_accepts_connection_string_with_catalog_property(string connectionString) { var definition = new SqlServerTransport(); - definition.Initialize(new SettingsHolder(), connectionString); + + var subscriptionSettings = new SubscriptionSettings(); + subscriptionSettings.DisableSubscriptionCache(); + var settings = new SettingsHolder(); + settings.Set(subscriptionSettings); + + definition.Initialize(settings, connectionString); Assert.Pass(); } } diff --git a/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs b/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs index 8424c84b5..d1db73726 100644 --- a/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs +++ b/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs @@ -17,6 +17,11 @@ class SettingsKeys public const string SchemaPropertyKey = "Schema"; public const string CatalogPropertyKey = "Catalog"; - public const string EnableMigrationMode = "NServiceBus.TimeoutManager.EnableMigrationMode"; + public const string TimeoutManagerMigrationMode = "NServiceBus.TimeoutManager.EnableMigrationMode"; + + /// + /// For testing the migration process only + /// + public const string DisableNativePubSub = "SqlServer.DisableNativePubSub"; } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeiveryTableBasedQueueFactory.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeiveryTableBasedQueueFactory.cs deleted file mode 100644 index 89ee309ad..000000000 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeiveryTableBasedQueueFactory.cs +++ /dev/null @@ -1,80 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using System; - using System.Data.SqlClient; - using System.Linq; - using System.Threading.Tasks; - using DelayedDelivery; - using DeliveryConstraints; - using Performance.TimeToBeReceived; - - class DelayedDeliveryTableBasedQueueOperationsReader : ITableBasedQueueOperationsReader - { - public DelayedDeliveryTableBasedQueueOperationsReader(DelayedMessageTable delayedMessageTable, ITableBasedQueueOperationsReader immediateDeliveryQueueOperationsReader) - { - this.delayedMessageTable = delayedMessageTable; - this.immediateDeliveryQueueOperationsReader = immediateDeliveryQueueOperationsReader; - } - - public Func Get(UnicastTransportOperation operation) - { - var behavior = GetDueTime(operation); - TryGetConstraint(operation, out DiscardIfNotReceivedBefore discardIfNotReceivedBefore); - if (behavior.Defer) - { - // align with TimeoutManager behavior - if (discardIfNotReceivedBefore != null && discardIfNotReceivedBefore.MaxTime < TimeSpan.MaxValue) - { - throw new Exception("Delayed delivery of messages with TimeToBeReceived set is not supported. Remove the TimeToBeReceived attribute to delay messages of this type."); - } - - return (conn, trans) => delayedMessageTable.Store(operation.Message, behavior.DueAfter, behavior.Destination, conn, trans); - } - return immediateDeliveryQueueOperationsReader.Get(operation); - } - - static DispatchBehavior GetDueTime(UnicastTransportOperation operation) - { - if (TryGetConstraint(operation, out DoNotDeliverBefore doNotDeliverBefore)) - { - return DispatchBehavior.Deferred(doNotDeliverBefore.At - DateTime.UtcNow, operation.Destination); - } - if (TryGetConstraint(operation, out DelayDeliveryWith delayDeliveryWith)) - { - return DispatchBehavior.Deferred(delayDeliveryWith.Delay, operation.Destination); - } - return DispatchBehavior.Immediately(); - } - - static bool TryGetConstraint(IOutgoingTransportOperation operation, out T constraint) where T : DeliveryConstraint - { - constraint = operation.DeliveryConstraints.OfType().FirstOrDefault(); - return constraint != null; - } - - DelayedMessageTable delayedMessageTable; - ITableBasedQueueOperationsReader immediateDeliveryQueueOperationsReader; - - struct DispatchBehavior - { - public bool Defer; - public TimeSpan DueAfter; - public string Destination; - - public static DispatchBehavior Immediately() - { - return new DispatchBehavior(); - } - - public static DispatchBehavior Deferred(TimeSpan dueAfter, string destination) - { - return new DispatchBehavior - { - DueAfter = dueAfter < TimeSpan.Zero ? TimeSpan.Zero : dueAfter, - Defer = true, - Destination = destination - }; - } - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryInfrastructure.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryInfrastructure.cs deleted file mode 100644 index 992f7ef39..000000000 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryInfrastructure.cs +++ /dev/null @@ -1,34 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using Features; - using Logging; - using Settings; - - static class DelayedDeliveryInfrastructure - { - public static StartupCheckResult CheckForInvalidSettings(SettingsHolder settings) - { - var delayedDeliverySettings = settings.GetOrDefault(); - if (delayedDeliverySettings != null) - { - var sendOnlyEndpoint = settings.GetOrDefault("Endpoint.SendOnly"); - if (sendOnlyEndpoint) - { - return StartupCheckResult.Failed("Native delayed delivery is only supported for endpoints capable of receiving messages."); - } - } - else - { - var timeoutManagerEnabled = settings.IsFeatureActive(typeof(TimeoutManager)); - if (timeoutManagerEnabled) - { - Logger.Warn("Current configuration of the endpoint uses the TimeoutManager feature for delayed delivery - an option which is not recommended for new deployments. SqlTransport native delayed delivery should be used instead. It can be enabled by calling `UseNativeDelayedDelivery()`."); - } - } - - return StartupCheckResult.Success; - } - - static ILog Logger = LogManager.GetLogger("DelayedDeliveryInfrastructure"); - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryMessagePump.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryMessagePump.cs deleted file mode 100644 index 26ca365ea..000000000 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryMessagePump.cs +++ /dev/null @@ -1,44 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using System; - using System.Threading.Tasks; - - class DelayedDeliveryMessagePump : IPushMessages - { - public DelayedDeliveryMessagePump(IPushMessages pump, DelayedMessageProcessor delayedMessageProcessor) - { - this.pump = pump; - this.delayedMessageProcessor = delayedMessageProcessor; - } - - public Task Init(Func onMessage, Func> onError, CriticalError criticalError, PushSettings settings) - { - delayedMessageProcessor.Init(settings.InputQueue); - return pump.Init(async context => - { - if (await delayedMessageProcessor.Handle(context).ConfigureAwait(false)) - { - return; - } - await onMessage(context).ConfigureAwait(false); - }, context => - { - delayedMessageProcessor.HandleError(context); - return onError(context); - }, criticalError, settings); - } - - public void Start(PushRuntimeSettings limitations) - { - pump.Start(limitations); - } - - public Task Stop() - { - return pump.Stop(); - } - - IPushMessages pump; - DelayedMessageProcessor delayedMessageProcessor; - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryQueueCreator.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryQueueCreator.cs deleted file mode 100644 index 2ec9ae79c..000000000 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryQueueCreator.cs +++ /dev/null @@ -1,63 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using System.Data; - using System.Data.SqlClient; - using System.Threading.Tasks; - - class DelayedDeliveryQueueCreator : ICreateQueues - { - public DelayedDeliveryQueueCreator(SqlConnectionFactory connectionFactory, ICreateQueues queueCreator, CanonicalQueueAddress delayedMessageTable, bool createMessageBodyComputedColumn = false) - { - this.connectionFactory = connectionFactory; - this.queueCreator = queueCreator; - this.delayedMessageTable = delayedMessageTable; - this.createMessageBodyComputedColumn = createMessageBodyComputedColumn; - } - - public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity) - { - await queueCreator.CreateQueueIfNecessary(queueBindings, identity).ConfigureAwait(false); - using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) - using (var transaction = connection.BeginTransaction()) - { - await CreateDelayedMessageQueue(delayedMessageTable, connection, transaction, createMessageBodyComputedColumn).ConfigureAwait(false); - - transaction.Commit(); - } - } - - static async Task CreateDelayedMessageQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction, bool createMessageBodyComputedColumn) - { -#pragma warning disable 618 - var sql = string.Format(SqlConstants.CreateDelayedMessageStoreText, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName); -#pragma warning restore 618 - using (var command = new SqlCommand(sql, connection, transaction) - { - CommandType = CommandType.Text - }) - { - await command.ExecuteNonQueryAsync().ConfigureAwait(false); - } - if (createMessageBodyComputedColumn) - { -#pragma warning disable 618 - var bodyStringSql = string.Format(SqlConstants.AddMessageBodyStringColumn, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName); -#pragma warning restore 618 - using (var command = new SqlCommand(bodyStringSql, connection, transaction) - { - CommandType = CommandType.Text - }) - { - await command.ExecuteNonQueryAsync().ConfigureAwait(false); - } - - } - } - - - SqlConnectionFactory connectionFactory; - ICreateQueues queueCreator; - CanonicalQueueAddress delayedMessageTable; - bool createMessageBodyComputedColumn; - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageProcessor.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageProcessor.cs deleted file mode 100644 index 3b98d65e8..000000000 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageProcessor.cs +++ /dev/null @@ -1,49 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using System.Threading.Tasks; - using Routing; - - class DelayedMessageProcessor - { - const string ForwardHeader = "NServiceBus.SqlServer.ForwardDestination"; - - public DelayedMessageProcessor(IDispatchMessages dispatcher) - { - this.dispatcher = dispatcher; - } - - public void Init(string localAddress) - { - this.localAddress = localAddress; - } - - public async Task Handle(MessageContext context) - { - context.Headers.TryGetValue(ForwardHeader, out var forwardDestination); - if (forwardDestination == null) - { - //This is not a delayed message. Process in local endpoint instance. - return false; - } - if (forwardDestination == localAddress) - { - context.Headers.Remove(ForwardHeader); - //Do not forward the message. Process in local endpoint instance. - return false; - } - var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, context.Body); - var transportOperation = new TransportOperation(outgoingMessage, new UnicastAddressTag(forwardDestination)); - context.Headers.Remove(ForwardHeader); - await dispatcher.Dispatch(new TransportOperations(transportOperation), context.TransportTransaction, context.Extensions).ConfigureAwait(false); - return true; - } - - public void HandleError(ErrorContext context) - { - context.Message.Headers.Remove(ForwardHeader); - } - - IDispatchMessages dispatcher; - string localAddress; - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs index ef8112804..5aeaa608e 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs @@ -6,13 +6,26 @@ namespace NServiceBus.Transport.SQLServer using System.Threading.Tasks; using Transport; - class DelayedMessageTable + interface IDelayedMessageStore + { + Task Store(OutgoingMessage message, TimeSpan dueAfter, string destination, SqlConnection connection, SqlTransaction transaction); + } + + class SendOnlyDelayedMessageStore : IDelayedMessageStore + { + public Task Store(OutgoingMessage message, TimeSpan dueAfter, string destination, SqlConnection connection, SqlTransaction transaction) + { + throw new Exception("Delayed delivery is not supported for send-only endpoints."); + } + } + + class DelayedMessageTable : IDelayedMessageStore { public DelayedMessageTable(string delayedQueueTable, string inputQueueTable) { #pragma warning disable 618 storeCommand = string.Format(SqlConstants.StoreDelayedMessageText, delayedQueueTable); - moveMaturedCommand = string.Format(SqlConstants.MoveMaturedDelayedMessageText, delayedQueueTable, inputQueueTable); + moveDueCommand = string.Format(SqlConstants.MoveDueDelayedMessageText, delayedQueueTable, inputQueueTable); #pragma warning restore 618 } @@ -26,9 +39,9 @@ public async Task Store(OutgoingMessage message, TimeSpan dueAfter, string desti } } - public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken) + public async Task MoveDueMessages(int batchSize, SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken) { - using (var command = new SqlCommand(moveMaturedCommand, connection, transaction)) + using (var command = new SqlCommand(moveDueCommand, connection, transaction)) { command.Parameters.AddWithValue("BatchSize", batchSize); await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); @@ -36,6 +49,6 @@ public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, S } string storeCommand; - string moveMaturedCommand; + string moveDueCommand; } -} \ No newline at end of file +} diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DueDelayedMessageProcessor.cs similarity index 86% rename from src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs rename to src/NServiceBus.SqlServer/DelayedDelivery/DueDelayedMessageProcessor.cs index 192e71260..cf9e7a08c 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DueDelayedMessageProcessor.cs @@ -6,9 +6,9 @@ namespace NServiceBus.Transport.SQLServer using System.Threading.Tasks; using Logging; - class DelayedMessageHandler + class DueDelayedMessageProcessor { - public DelayedMessageHandler(DelayedMessageTable table, SqlConnectionFactory connectionFactory, TimeSpan interval, int batchSize) + public DueDelayedMessageProcessor(DelayedMessageTable table, SqlConnectionFactory connectionFactory, TimeSpan interval, int batchSize) { this.table = table; this.connectionFactory = connectionFactory; @@ -44,7 +44,7 @@ async Task MoveMaturedDelayedMessages() { using (var transaction = connection.BeginTransaction()) { - await table.MoveMaturedMessages(batchSize, connection, transaction, cancellationToken).ConfigureAwait(false); + await table.MoveDueMessages(batchSize, connection, transaction, cancellationToken).ConfigureAwait(false); transaction.Commit(); } } @@ -81,6 +81,6 @@ await Task.Delay(interval, cancellationToken).IgnoreCancellation() CancellationTokenSource cancellationTokenSource; Task task; - static ILog Logger = LogManager.GetLogger(); + static ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/InternalsVisibleTo.cs b/src/NServiceBus.SqlServer/InternalsVisibleTo.cs index abf73b519..7b8d4a6e2 100644 --- a/src/NServiceBus.SqlServer/InternalsVisibleTo.cs +++ b/src/NServiceBus.SqlServer/InternalsVisibleTo.cs @@ -1,4 +1,6 @@ using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("NServiceBus.SqlServer.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] -[assembly: InternalsVisibleTo("NServiceBus.SqlServer.IntegrationTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] \ No newline at end of file +[assembly: InternalsVisibleTo("NServiceBus.SqlServer.IntegrationTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] +[assembly: InternalsVisibleTo("NServiceBus.SqlServer.TransportTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] +[assembly: InternalsVisibleTo("NServiceBus.SqlServer.AcceptanceTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100d5a2fc697d5277272662d3154a752010b3de6d598204c10c4b09ebb28b469640efcf04978e95a15f4e0461f02316c96b349083a2e2a4f07fe7dfb713b99189b634473c73c1387149a37dbc836028bc2ca21de196bbd374f4024b920a0da86fe47bf541771352246cd8ef54d48654f39f4073aa114b70dc7d4712c3d9dd83faad")] \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index bf6905715..3b644a119 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -19,7 +19,7 @@ - + diff --git a/src/NServiceBus.SqlServer/PubSub/CachedSubscriptionStore.cs b/src/NServiceBus.SqlServer/PubSub/CachedSubscriptionStore.cs new file mode 100644 index 000000000..304bf6392 --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/CachedSubscriptionStore.cs @@ -0,0 +1,68 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Threading.Tasks; + + + class CachedSubscriptionStore : ISubscriptionStore + { + public CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor) + { + this.inner = inner; + this.cacheFor = cacheFor; + } + + public Task> GetSubscribers(Type eventType) + { + var cacheItem = Cache.GetOrAdd(CacheKey(eventType), + _ => new CacheItem + { + Stored = DateTime.UtcNow, + Subscribers = inner.GetSubscribers(eventType) + }); + + var age = DateTime.UtcNow - cacheItem.Stored; + if (age >= cacheFor) + { + cacheItem.Subscribers = inner.GetSubscribers(eventType); + cacheItem.Stored = DateTime.UtcNow; + } + + return cacheItem.Subscribers; + } + + public async Task Subscribe(string endpointName, string endpointAddress, Type eventType) + { + await inner.Subscribe(endpointName, endpointAddress, eventType).ConfigureAwait(false); + ClearForMessageType(CacheKey(eventType)); + } + + public async Task Unsubscribe(string endpointName, Type eventType) + { + await inner.Unsubscribe(endpointName, eventType).ConfigureAwait(false); + ClearForMessageType(CacheKey(eventType)); + } + + void ClearForMessageType(string topic) + { + Cache.TryRemove(topic, out _); + } + + static string CacheKey(Type eventType) + { + return eventType.FullName; + } + + TimeSpan cacheFor; + ISubscriptionStore inner; + ConcurrentDictionary Cache = new ConcurrentDictionary(); + + class CacheItem + { + public DateTime Stored { get; set; } + public Task> Subscribers { get; set; } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/ISubscriptionStore.cs b/src/NServiceBus.SqlServer/PubSub/ISubscriptionStore.cs new file mode 100644 index 000000000..7106494e3 --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/ISubscriptionStore.cs @@ -0,0 +1,13 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + + interface ISubscriptionStore + { + Task> GetSubscribers(Type eventType); + Task Subscribe(string endpointName, string endpointAddress, Type eventType); + Task Unsubscribe(string endpointName, Type eventType); + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/MessageDrivenPubSubCompatibilityModeConfiguration.cs b/src/NServiceBus.SqlServer/PubSub/MessageDrivenPubSubCompatibilityModeConfiguration.cs new file mode 100644 index 000000000..b6e8fcd02 --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/MessageDrivenPubSubCompatibilityModeConfiguration.cs @@ -0,0 +1,21 @@ +namespace NServiceBus.Transport.SQLServer +{ + using Configuration.AdvancedExtensibility; + + /// + /// Configuration extensions for Message-Driven Pub-Sub compatibility mode + /// + public static class MessageDrivenPubSubCompatibilityModeConfiguration + { + /// + /// Enables compatibility with endpoints running on message-driven pub-sub + /// + /// The transport to enable pub-sub compatibility on + public static SubscriptionMigrationModeSettings EnableMessageDrivenPubSubCompatibilityMode(this TransportExtensions transportExtensions) + { + var settings = transportExtensions.GetSettings(); + settings.Set("NServiceBus.Subscriptions.EnableMigrationMode", true); + return new SubscriptionMigrationModeSettings(settings); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/MulticastToUnicastConverter.cs b/src/NServiceBus.SqlServer/PubSub/MulticastToUnicastConverter.cs new file mode 100644 index 000000000..9d14e2851 --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/MulticastToUnicastConverter.cs @@ -0,0 +1,29 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + + class MulticastToUnicastConverter : IMulticastToUnicastConverter + { + ISubscriptionStore subscriptions; + + public MulticastToUnicastConverter(ISubscriptionStore subscriptions) + { + this.subscriptions = subscriptions; + } + + public async Task> Convert(MulticastTransportOperation transportOperation) + { + var subscribers = await subscriptions.GetSubscribers(transportOperation.MessageType).ConfigureAwait(false); + + return (from subscriber in subscribers + select new UnicastTransportOperation( + transportOperation.Message, + subscriber, + transportOperation.RequiredDispatchConsistency, + transportOperation.DeliveryConstraints + )).ToList(); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/PolymorphicSubscriptionStore.cs b/src/NServiceBus.SqlServer/PubSub/PolymorphicSubscriptionStore.cs new file mode 100644 index 000000000..d487f8ae6 --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/PolymorphicSubscriptionStore.cs @@ -0,0 +1,61 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + + class PolymorphicSubscriptionStore : ISubscriptionStore + { + public PolymorphicSubscriptionStore(SubscriptionTable subscriptionTable) + { + this.subscriptionTable = subscriptionTable; + } + + public Task> GetSubscribers(Type eventType) + { + var topics = GetTopics(eventType); + return subscriptionTable.GetSubscribers(topics.ToArray()); + } + + public Task Subscribe(string endpointName, string endpointAddress, Type eventType) + { + return subscriptionTable.Subscribe(endpointName, endpointAddress, TopicName.From(eventType)); + } + + public Task Unsubscribe(string endpointName, Type eventType) + { + return subscriptionTable.Unsubscribe(endpointName, TopicName.From(eventType)); + } + + IEnumerable GetTopics(Type messageType) + { + return eventTypeToTopicListMap.GetOrAdd(messageType, GenerateTopics); + } + + static string[] GenerateTopics(Type messageType) + { + return GenerateMessageHierarchy(messageType) + .Select(TopicName.From) + .ToArray(); + } + + static IEnumerable GenerateMessageHierarchy(Type messageType) + { + var t = messageType; + while (t != null) + { + yield return t; + t = t.BaseType; + } + foreach (var iface in messageType.GetInterfaces()) + { + yield return iface; + } + } + + ConcurrentDictionary eventTypeToTopicListMap = new ConcurrentDictionary(); + SubscriptionTable subscriptionTable; + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/QualifiedSubscriptionTableName.cs b/src/NServiceBus.SqlServer/PubSub/QualifiedSubscriptionTableName.cs new file mode 100644 index 000000000..bc9637c11 --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/QualifiedSubscriptionTableName.cs @@ -0,0 +1,20 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + using static NameHelper; + + class QualifiedSubscriptionTableName + { + public string QuotedCatalog; + public string QuotedQualifiedName; + + public QualifiedSubscriptionTableName(string table, string schema, string catalog) + { + if (table == null) throw new ArgumentNullException(nameof(table)); + if (schema == null) throw new ArgumentNullException(nameof(schema)); + if (catalog == null) throw new ArgumentNullException(nameof(catalog)); + QuotedCatalog = Quote(catalog); + QuotedQualifiedName = $"{Quote(catalog)}.{Quote(schema)}.{Quote(table)}"; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/SubscriptionManager.cs b/src/NServiceBus.SqlServer/PubSub/SubscriptionManager.cs new file mode 100644 index 000000000..4a440c65b --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/SubscriptionManager.cs @@ -0,0 +1,30 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + using System.Threading.Tasks; + using Extensibility; + + class SubscriptionManager : IManageSubscriptions + { + public SubscriptionManager(ISubscriptionStore subscriptionStore, string endpointName, string localAddress) + { + this.subscriptionStore = subscriptionStore; + this.endpointName = endpointName; + this.localAddress = localAddress; + } + + public Task Subscribe(Type eventType, ContextBag context) + { + return subscriptionStore.Subscribe(endpointName, localAddress, eventType); + } + + public Task Unsubscribe(Type eventType, ContextBag context) + { + return subscriptionStore.Unsubscribe(endpointName, eventType); + } + + ISubscriptionStore subscriptionStore; + string endpointName; + string localAddress; + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/SubscriptionSettings.cs b/src/NServiceBus.SqlServer/PubSub/SubscriptionSettings.cs new file mode 100644 index 000000000..d10b0a495 --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/SubscriptionSettings.cs @@ -0,0 +1,47 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + + /// + /// Configures the native pub/sub behavior + /// + public class SubscriptionSettings + { + internal SubscriptionTableName SubscriptionTable = new SubscriptionTableName("SubscriptionRouting", null, null); + /// + /// Default to 5 seconds caching. If a system is under load that prevent doing an extra roundtrip for each Publish operation. If + /// a system is not under load, doing an extra roundtrip every 5 seconds is not a problem and 5 seconds is small enough value that + /// people accepts as we always say that subscription operation is not instantaneous. + /// + internal TimeSpan? TimeToCacheSubscriptions = TimeSpan.FromSeconds(5); + + /// + /// Overrides the default name for the subscription table. All endpoints in a given system need to agree on that name in order for them to be able + /// to subscribe to and publish events. + /// + /// Name of the table. + /// Schema in which the table is defined if different from default schema configured for the transport. + /// Catalog in which the table is defined if different from default catalog configured for the transport. + public void SubscriptionTableName(string tableName, string schemaName = null, string catalogName = null) + { + SubscriptionTable = new SubscriptionTableName(tableName, schemaName, catalogName); + } + + /// + /// Cache subscriptions for a given . + /// + public void CacheSubscriptionInformationFor(TimeSpan timeSpan) + { + Guard.AgainstNegativeAndZero(nameof(timeSpan), timeSpan); + TimeToCacheSubscriptions = timeSpan; + } + + /// + /// Do not cache subscriptions. + /// + public void DisableSubscriptionCache() + { + TimeToCacheSubscriptions = null; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/SubscriptionTable.cs b/src/NServiceBus.SqlServer/PubSub/SubscriptionTable.cs new file mode 100644 index 000000000..e3f91f6fa --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/SubscriptionTable.cs @@ -0,0 +1,92 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System.Collections.Generic; + using System.Data; + using System.Linq; + using System.Threading.Tasks; + using System.Transactions; + + class SubscriptionTable + { + string qualifiedTableName; + SqlConnectionFactory connectionFactory; + string subscribeCommand; + string unsubscribeCommand; + + public SubscriptionTable(string qualifiedTableName, SqlConnectionFactory connectionFactory) + { + this.qualifiedTableName = qualifiedTableName; + this.connectionFactory = connectionFactory; +#pragma warning disable 618 + subscribeCommand = string.Format(SqlConstants.SubscribeText, qualifiedTableName); + unsubscribeCommand = string.Format(SqlConstants.UnsubscribeText, qualifiedTableName); +#pragma warning restore 618 + } + + public async Task Subscribe(string endpointName, string queueAddress, string topic) + { + using (new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled)) + { + using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) + using (var command = connection.CreateCommand()) + { + command.CommandText = subscribeCommand; + command.Parameters.Add("Endpoint", SqlDbType.VarChar).Value = endpointName; + command.Parameters.Add("QueueAddress", SqlDbType.VarChar).Value = queueAddress; + command.Parameters.Add("Topic", SqlDbType.VarChar).Value = topic; + + await command.ExecuteNonQueryAsync().ConfigureAwait(false); + } + } + } + + public async Task Unsubscribe(string endpointName, string topic) + { + using (new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled)) + { + using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) + using (var command = connection.CreateCommand()) + { + command.CommandText = unsubscribeCommand; + command.Parameters.Add("Endpoint", SqlDbType.VarChar).Value = endpointName; + command.Parameters.Add("Topic", SqlDbType.VarChar).Value = topic; + + await command.ExecuteNonQueryAsync().ConfigureAwait(false); + } + } + } + + public async Task> GetSubscribers(string[] topics) + { + var results = new List(); + + var argumentsList = string.Join(", ", Enumerable.Range(0, topics.Length).Select(i => $"@Topic_{i}")); +#pragma warning disable 618 + var getSubscribersCommand = string.Format(SqlConstants.GetSubscribersText, qualifiedTableName, argumentsList); +#pragma warning restore 618 + + using (new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled)) + { + using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) + using (var command = connection.CreateCommand()) + { + command.CommandText = getSubscribersCommand; + for (var i = 0; i < topics.Length; i++) + { + command.Parameters.Add($"Topic_{i}", SqlDbType.VarChar).Value = topics[i]; + } + + using (var reader = await command.ExecuteReaderAsync().ConfigureAwait(false)) + { + while (await reader.ReadAsync().ConfigureAwait(false)) + { + results.Add(reader.GetString(0)); + } + } + + return results; + } + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/SubscriptionTableName.cs b/src/NServiceBus.SqlServer/PubSub/SubscriptionTableName.cs new file mode 100644 index 000000000..14b1a6f21 --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/SubscriptionTableName.cs @@ -0,0 +1,23 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + + class SubscriptionTableName + { + string table; + string schema; + string catalog; + + public SubscriptionTableName(string table, string schema, string catalog) + { + this.table = table ?? throw new ArgumentNullException(nameof(table)); + this.schema = schema; + this.catalog = catalog; + } + + public QualifiedSubscriptionTableName Qualify(string defaultSchema, string defaultCatalog) + { + return new QualifiedSubscriptionTableName(table, schema ?? defaultSchema, catalog ?? defaultCatalog); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/PubSub/TopicNames.cs b/src/NServiceBus.SqlServer/PubSub/TopicNames.cs new file mode 100644 index 000000000..f2de4529d --- /dev/null +++ b/src/NServiceBus.SqlServer/PubSub/TopicNames.cs @@ -0,0 +1,9 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + + static class TopicName + { + public static string From(Type type) => type.FullName; + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Queuing/ITableBasedQueueOperationsReader.cs b/src/NServiceBus.SqlServer/Queuing/ITableBasedQueueOperationsReader.cs deleted file mode 100644 index 6fb6ca729..000000000 --- a/src/NServiceBus.SqlServer/Queuing/ITableBasedQueueOperationsReader.cs +++ /dev/null @@ -1,11 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using System; - using System.Data.SqlClient; - using System.Threading.Tasks; - - interface ITableBasedQueueOperationsReader - { - Func Get(UnicastTransportOperation operation); - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs index e2686b91e..339f132cf 100644 --- a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs +++ b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs @@ -38,7 +38,7 @@ THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END, IF (@NOCOUNT = 'ON') SET NOCOUNT ON; IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; - internal const string StoreDelayedMessageText = + public static readonly string StoreDelayedMessageText = @" DECLARE @NOCOUNT VARCHAR(3) = 'OFF'; IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON' @@ -90,7 +90,7 @@ ELSE 1 IF (@NOCOUNT = 'ON') SET NOCOUNT ON; IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; - internal const string MoveMaturedDelayedMessageText = @" + public static readonly string MoveDueDelayedMessageText = @" DECLARE @NOCOUNT VARCHAR(3) = 'OFF'; IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'; SET NOCOUNT ON; @@ -201,7 +201,7 @@ Expires IS NOT NULL EXEC sp_releaseapplock @Resource = '{0}_lock'"; - public const string CreateDelayedMessageStoreText = @" + public static readonly string CreateDelayedMessageStoreText = @" IF EXISTS ( SELECT * FROM {1}.sys.objects @@ -255,5 +255,71 @@ FROM sys.columns c WHERE c.object_id = OBJECT_ID('{0}') AND c.name = 'Headers'"; + public static readonly string CreateSubscriptionTableText = @" + +IF EXISTS ( + SELECT * + FROM {1}.sys.objects + WHERE object_id = OBJECT_ID(N'{0}') + AND type in (N'U')) +RETURN + +EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive' + +IF EXISTS ( + SELECT * + FROM {1}.sys.objects + WHERE object_id = OBJECT_ID(N'{0}') + AND type in (N'U')) +BEGIN + EXEC sp_releaseapplock @Resource = '{0}_lock' + RETURN +END + +CREATE TABLE {0} ( + QueueAddress NVARCHAR(200) NOT NULL, + Endpoint NVARCHAR(200), + Topic NVARCHAR(200) NOT NULL, + PRIMARY KEY CLUSTERED + ( + Endpoint, + Topic + ) +) +EXEC sp_releaseapplock @Resource = '{0}_lock'"; + + public static readonly string SubscribeText = @" +MERGE {0} WITH (HOLDLOCK, TABLOCK) AS target +USING(SELECT @Endpoint AS Endpoint, @QueueAddress AS QueueAddress, @Topic AS Topic) AS source +ON target.Endpoint = source.Endpoint +AND target.Topic = source.Topic +WHEN MATCHED AND target.QueueAddress <> source.QueueAddress THEN +UPDATE SET QueueAddress = @QueueAddress +WHEN NOT MATCHED THEN +INSERT +( + QueueAddress, + Topic, + Endpoint +) +VALUES +( + @QueueAddress, + @Topic, + @Endpoint +);"; + + public static readonly string GetSubscribersText = @" +SELECT DISTINCT QueueAddress +FROM {0} +WHERE Topic IN ({1}) +"; + + public static readonly string UnsubscribeText = @" +DELETE FROM {0} +WHERE + Endpoint = @Endpoint and + Topic = @Topic"; + } } diff --git a/src/NServiceBus.SqlServer/Queuing/TableBasedQueueCache.cs b/src/NServiceBus.SqlServer/Queuing/TableBasedQueueCache.cs new file mode 100644 index 000000000..c29978507 --- /dev/null +++ b/src/NServiceBus.SqlServer/Queuing/TableBasedQueueCache.cs @@ -0,0 +1,25 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + using System.Collections.Concurrent; + + class TableBasedQueueCache + { + public TableBasedQueueCache(QueueAddressTranslator addressTranslator) + { + this.addressTranslator = addressTranslator; + } + + public TableBasedQueue Get(string destination) + { + var address = addressTranslator.Parse(destination); + var key = Tuple.Create(address.QualifiedTableName, address.Address); + var queue = cache.GetOrAdd(key, x => new TableBasedQueue(x.Item1, x.Item2)); + + return queue; + } + + QueueAddressTranslator addressTranslator; + ConcurrentDictionary, TableBasedQueue> cache = new ConcurrentDictionary, TableBasedQueue>(); + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Queuing/TableBasedQueueOperationsReader.cs b/src/NServiceBus.SqlServer/Queuing/TableBasedQueueOperationsReader.cs deleted file mode 100644 index e68225ceb..000000000 --- a/src/NServiceBus.SqlServer/Queuing/TableBasedQueueOperationsReader.cs +++ /dev/null @@ -1,37 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using System; - using System.Collections.Concurrent; - using System.Data.SqlClient; - using System.Linq; - using System.Threading.Tasks; - using DeliveryConstraints; - using Performance.TimeToBeReceived; - - class TableBasedQueueOperationsReader : ITableBasedQueueOperationsReader - { - public TableBasedQueueOperationsReader(QueueAddressTranslator addressTranslator) - { - this.addressTranslator = addressTranslator; - } - - public Func Get(UnicastTransportOperation operation) - { - TryGetConstraint(operation, out DiscardIfNotReceivedBefore discardIfNotReceivedBefore); - - var address = addressTranslator.Parse(operation.Destination); - var key = Tuple.Create(address.QualifiedTableName, address.Address); - var queue = cache.GetOrAdd(key, x => new TableBasedQueue(x.Item1, x.Item2)); - - return (conn, trans) => queue.Send(operation.Message, discardIfNotReceivedBefore?.MaxTime ?? TimeSpan.MaxValue, conn, trans); - } - - static void TryGetConstraint(IOutgoingTransportOperation operation, out T constraint) where T : DeliveryConstraint - { - constraint = operation.DeliveryConstraints.OfType().FirstOrDefault(); - } - - QueueAddressTranslator addressTranslator; - ConcurrentDictionary, TableBasedQueue> cache = new ConcurrentDictionary, TableBasedQueue>(); - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Receiving/ProcessWithNativeTransaction.cs b/src/NServiceBus.SqlServer/Receiving/ProcessWithNativeTransaction.cs index e8af67c15..94d2f18b6 100644 --- a/src/NServiceBus.SqlServer/Receiving/ProcessWithNativeTransaction.cs +++ b/src/NServiceBus.SqlServer/Receiving/ProcessWithNativeTransaction.cs @@ -9,7 +9,8 @@ class ProcessWithNativeTransaction : ReceiveStrategy { - public ProcessWithNativeTransaction(TransactionOptions transactionOptions, SqlConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, bool transactionForReceiveOnly = false) + public ProcessWithNativeTransaction(TransactionOptions transactionOptions, SqlConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, TableBasedQueueCache tableBasedQueueCache, bool transactionForReceiveOnly = false) + : base(tableBasedQueueCache) { this.connectionFactory = connectionFactory; this.failureInfoStorage = failureInfoStorage; diff --git a/src/NServiceBus.SqlServer/Receiving/ProcessWithNoTransaction.cs b/src/NServiceBus.SqlServer/Receiving/ProcessWithNoTransaction.cs index c70970483..00948a3bd 100644 --- a/src/NServiceBus.SqlServer/Receiving/ProcessWithNoTransaction.cs +++ b/src/NServiceBus.SqlServer/Receiving/ProcessWithNoTransaction.cs @@ -7,7 +7,8 @@ namespace NServiceBus.Transport.SQLServer class ProcessWithNoTransaction : ReceiveStrategy { - public ProcessWithNoTransaction(SqlConnectionFactory connectionFactory) + public ProcessWithNoTransaction(SqlConnectionFactory connectionFactory, TableBasedQueueCache tableBasedQueueCache) + : base(tableBasedQueueCache) { this.connectionFactory = connectionFactory; } diff --git a/src/NServiceBus.SqlServer/Receiving/ProcessWithTransactionScope.cs b/src/NServiceBus.SqlServer/Receiving/ProcessWithTransactionScope.cs index 3b18dafae..d03353c30 100644 --- a/src/NServiceBus.SqlServer/Receiving/ProcessWithTransactionScope.cs +++ b/src/NServiceBus.SqlServer/Receiving/ProcessWithTransactionScope.cs @@ -7,7 +7,8 @@ class ProcessWithTransactionScope : ReceiveStrategy { - public ProcessWithTransactionScope(TransactionOptions transactionOptions, SqlConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage) + public ProcessWithTransactionScope(TransactionOptions transactionOptions, SqlConnectionFactory connectionFactory, FailureInfoStorage failureInfoStorage, TableBasedQueueCache tableBasedQueueCache) + : base(tableBasedQueueCache) { this.transactionOptions = transactionOptions; this.connectionFactory = connectionFactory; diff --git a/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs b/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs index eddc57eb6..372fa62df 100644 --- a/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs +++ b/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs @@ -8,10 +8,12 @@ namespace NServiceBus.Transport.SQLServer class QueueCreator : ICreateQueues { - public QueueCreator(SqlConnectionFactory connectionFactory, QueueAddressTranslator addressTranslator, bool createMessageBodyColumn = false) + public QueueCreator(SqlConnectionFactory connectionFactory, QueueAddressTranslator addressTranslator, + CanonicalQueueAddress delayedQueueAddress, bool createMessageBodyColumn = false) { this.connectionFactory = connectionFactory; this.addressTranslator = addressTranslator; + this.delayedQueueAddress = delayedQueueAddress; this.createMessageBodyColumn = createMessageBodyColumn; } @@ -22,20 +24,22 @@ public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string ide { foreach (var receivingAddress in queueBindings.ReceivingAddresses) { - await CreateQueue(addressTranslator.Parse(receivingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false); + await CreateQueue(SqlConstants.CreateQueueText, addressTranslator.Parse(receivingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false); } foreach (var sendingAddress in queueBindings.SendingAddresses) { - await CreateQueue(addressTranslator.Parse(sendingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false); + await CreateQueue(SqlConstants.CreateQueueText, addressTranslator.Parse(sendingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false); } + + await CreateQueue(SqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress, connection, transaction, createMessageBodyColumn).ConfigureAwait(false); transaction.Commit(); } } - static async Task CreateQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction, bool createMessageBodyColumn) + static async Task CreateQueue(string creationScript, CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction, bool createMessageBodyColumn) { - var sql = string.Format(SqlConstants.CreateQueueText, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName); + var sql = string.Format(creationScript, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName); using (var command = new SqlCommand(sql, connection, transaction) { CommandType = CommandType.Text @@ -59,6 +63,7 @@ static async Task CreateQueue(CanonicalQueueAddress canonicalQueueAddress, SqlCo SqlConnectionFactory connectionFactory; QueueAddressTranslator addressTranslator; + CanonicalQueueAddress delayedQueueAddress; bool createMessageBodyColumn; } } diff --git a/src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs b/src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs index 6237a9fd2..d61e61f12 100644 --- a/src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs +++ b/src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs @@ -14,6 +14,11 @@ abstract class ReceiveStrategy Func onMessage; Func> onError; + protected ReceiveStrategy(TableBasedQueueCache tableBasedQueueCache) + { + this.tableBasedQueueCache = tableBasedQueueCache; + } + public void Init(TableBasedQueue inputQueue, TableBasedQueue errorQueue, Func onMessage, Func> onError, CriticalError criticalError) { InputQueue = inputQueue; @@ -38,6 +43,11 @@ protected async Task TryReceive(SqlConnection connection, SqlTransactio if (receiveResult.Successful) { + if (await TryHandleDelayedMessage(receiveResult.Message, connection, transaction).ConfigureAwait(false)) + { + return null; + } + return receiveResult.Message; } receiveCancellationTokenSource.Cancel(); @@ -53,7 +63,6 @@ protected async Task TryProcessingMessage(Message message, TransportTransa using (var pushCancellationTokenSource = new CancellationTokenSource()) { var messageContext = new MessageContext(message.TransportId, message.Headers, message.Body, transportTransaction, pushCancellationTokenSource, new ContextBag()); - await onMessage(messageContext).ConfigureAwait(false); // Cancellation is requested when message processing is aborted. @@ -69,6 +78,7 @@ protected async Task HandleError(Exception exception, Message try { var errorContext = new ErrorContext(exception, message.Headers, message.TransportId, message.Body, transportTransaction, processingAttempts); + errorContext.Message.Headers.Remove(ForwardHeader); return await onError(errorContext).ConfigureAwait(false); } @@ -80,6 +90,32 @@ protected async Task HandleError(Exception exception, Message } } + async Task TryHandleDelayedMessage(Message message, SqlConnection connection, SqlTransaction transaction) + { + if (message.Headers.TryGetValue(ForwardHeader, out var forwardDestination)) + { + message.Headers.Remove(ForwardHeader); + } + + if (forwardDestination == null) + { + //This is not a delayed message. Process in local endpoint instance. + return false; + } + if (forwardDestination == InputQueue.Name) + { + //Do not forward the message. Process in local endpoint instance. + return false; + } + var outgoingMessage = new OutgoingMessage(message.TransportId, message.Headers, message.Body); + + var destinationQueue = tableBasedQueueCache.Get(forwardDestination); + await destinationQueue.Send(outgoingMessage, TimeSpan.MaxValue, connection, transaction).ConfigureAwait(false); + return true; + } + + const string ForwardHeader = "NServiceBus.SqlServer.ForwardDestination"; + TableBasedQueueCache tableBasedQueueCache; CriticalError criticalError; } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs index 406f3d347..707d19e0c 100644 --- a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs +++ b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs @@ -6,29 +6,8 @@ /// /// Adds transport specific settings to SendOptions /// - public static class SendOptionsExtensions + public static partial class SendOptionsExtensions { - /// - /// Enables providing and instances that will be used by send operations. The same connection and transaction - /// can be used in more than one send operation. - /// - /// The to extend. - /// Open instance to be used by send operations. - /// instance that will be used by any operations performed by the transport. - [ObsoleteEx( - RemoveInVersion = "6.0", - TreatAsErrorFromVersion = "5.0", - ReplacementTypeOrMember = "UseCustomSqlTransaction", - Message = "The connection parameter is no longer required.")] - public static void UseCustomSqlConnectionAndTransaction(this SendOptions options, SqlConnection connection, SqlTransaction transaction) - { - var transportTransaction = new TransportTransaction(); - transportTransaction.Set(connection); - transportTransaction.Set(transaction); - - options.GetExtensions().Set(transportTransaction); - } - /// /// Enables the use of custom SqlTransaction instances for send operations. The same transaction can be used in more than one send operation. /// diff --git a/src/NServiceBus.SqlServer/Sending/IMulticastToUnicastConverter.cs b/src/NServiceBus.SqlServer/Sending/IMulticastToUnicastConverter.cs new file mode 100644 index 000000000..1aabfda37 --- /dev/null +++ b/src/NServiceBus.SqlServer/Sending/IMulticastToUnicastConverter.cs @@ -0,0 +1,10 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System.Collections.Generic; + using System.Threading.Tasks; + + interface IMulticastToUnicastConverter + { + Task> Convert(MulticastTransportOperation transportOperation); + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Sending/IQueueDispatcher.cs b/src/NServiceBus.SqlServer/Sending/IQueueDispatcher.cs deleted file mode 100644 index 9b1368862..000000000 --- a/src/NServiceBus.SqlServer/Sending/IQueueDispatcher.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using System.Collections.Generic; - using System.Threading.Tasks; - - interface IQueueDispatcher - { - Task DispatchAsNonIsolated(List operations, TransportTransaction transportTransaction); - - Task DispatchAsIsolated(List operations); - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Sending/MessageDispatcher.cs b/src/NServiceBus.SqlServer/Sending/MessageDispatcher.cs index 5875c3606..db43f9456 100644 --- a/src/NServiceBus.SqlServer/Sending/MessageDispatcher.cs +++ b/src/NServiceBus.SqlServer/Sending/MessageDispatcher.cs @@ -2,80 +2,184 @@ { using System; using System.Collections.Generic; + using System.Data.SqlClient; using System.Linq; using System.Threading.Tasks; + using System.Transactions; + using DelayedDelivery; + using DeliveryConstraints; using Extensibility; + using Performance.TimeToBeReceived; using Transport; class MessageDispatcher : IDispatchMessages { - public MessageDispatcher(IQueueDispatcher dispatcher, QueueAddressTranslator addressTranslator) + public MessageDispatcher(QueueAddressTranslator addressTranslator, IMulticastToUnicastConverter multicastToUnicastConverter, TableBasedQueueCache tableBasedQueueCache, IDelayedMessageStore delayedMessageTable, SqlConnectionFactory connectionFactory) { - this.dispatcher = dispatcher; this.addressTranslator = addressTranslator; + this.multicastToUnicastConverter = multicastToUnicastConverter; + this.tableBasedQueueCache = tableBasedQueueCache; + this.delayedMessageTable = delayedMessageTable; + this.connectionFactory = connectionFactory; } // We need to check if we can support cancellation in here as well? public async Task Dispatch(TransportOperations operations, TransportTransaction transportTransaction, ContextBag context) { - await DeduplicateAndDispatch(operations, dispatcher.DispatchAsIsolated, DispatchConsistency.Isolated).ConfigureAwait(false); - await DeduplicateAndDispatch(operations, ops => dispatcher.DispatchAsNonIsolated(ops, transportTransaction), DispatchConsistency.Default).ConfigureAwait(false); + var sortedOperations = operations.UnicastTransportOperations + .Concat(await ConvertToUnicastOperations(operations).ConfigureAwait(false)) + .SortAndDeduplicate(addressTranslator); + + await DispatchDefault(sortedOperations, transportTransaction).ConfigureAwait(false); + await DispatchIsolated(sortedOperations).ConfigureAwait(false); } - Task DeduplicateAndDispatch(TransportOperations operations, Func, Task> dispatchMethod, DispatchConsistency dispatchConsistency) + async Task> ConvertToUnicastOperations(TransportOperations operations) { - var operationsToDispatch = operations.UnicastTransportOperations - .Where(o => o.RequiredDispatchConsistency == dispatchConsistency) - .GroupBy(o => new DeduplicationKey(o.Message.MessageId, addressTranslator.Parse(o.Destination).Address)) - .Select(g => g.First()) - .ToList(); + if (operations.MulticastTransportOperations.Count == 0) + { + return emptyUnicastTransportOperationsList; + } - return dispatchMethod(operationsToDispatch); + var tasks = operations.MulticastTransportOperations.Select(multicastToUnicastConverter.Convert); + var result = await Task.WhenAll(tasks).ConfigureAwait(false); + return result.SelectMany(x => x); } - IQueueDispatcher dispatcher; - QueueAddressTranslator addressTranslator; - - class DeduplicationKey + async Task DispatchIsolated(SortingResult sortedOperations) { - string messageId; - string destination; + if (sortedOperations.IsolatedDispatch == null) + { + return; + } - public DeduplicationKey(string messageId, string destination) +#if NET452 + using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled)) + using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) { - this.messageId = messageId; - this.destination = destination; + await Dispatch(sortedOperations.IsolatedDispatch, connection, null).ConfigureAwait(false); + + scope.Complete(); + } +#else + using (var scope = new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled)) + using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) + using (var tx = connection.BeginTransaction()) + { + await Dispatch(sortedOperations.IsolatedDispatch, connection, tx).ConfigureAwait(false); + tx.Commit(); + scope.Complete(); } +#endif - bool Equals(DeduplicationKey other) + } + + async Task DispatchDefault(SortingResult sortedOperations, TransportTransaction transportTransaction) + { + if (sortedOperations.DefaultDispatch == null) { - return string.Equals(messageId, other.messageId) && string.Equals(destination, other.destination); + return; } - public override bool Equals(object obj) + if (InReceiveWithNoTransactionMode(transportTransaction) || InReceiveOnlyTransportTransactionMode(transportTransaction)) { - if (ReferenceEquals(null, obj)) + await DispatchUsingNewConnectionAndTransaction(sortedOperations.DefaultDispatch).ConfigureAwait(false); + return; + } + + await DispatchUsingReceiveTransaction(transportTransaction, sortedOperations.DefaultDispatch).ConfigureAwait(false); + } + + + async Task DispatchUsingNewConnectionAndTransaction(IEnumerable operations) + { + using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) + { + using (var transaction = connection.BeginTransaction()) { - return false; + await Dispatch(operations, connection, transaction).ConfigureAwait(false); + transaction.Commit(); } - if (ReferenceEquals(this, obj)) + } + } + + async Task DispatchUsingReceiveTransaction(TransportTransaction transportTransaction, IEnumerable operations) + { + transportTransaction.TryGet(out SqlConnection sqlTransportConnection); + transportTransaction.TryGet(out SqlTransaction sqlTransportTransaction); + transportTransaction.TryGet(out Transaction ambientTransaction); + + if (ambientTransaction != null) + { + using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) { - return true; + await Dispatch(operations, connection, null).ConfigureAwait(false); } - if (obj.GetType() != this.GetType()) + } + else + { + await Dispatch(operations, sqlTransportConnection, sqlTransportTransaction).ConfigureAwait(false); + } + } + + async Task Dispatch(IEnumerable operations, SqlConnection connection, SqlTransaction transaction) + { + foreach (var operation in operations) + { + await Dispatch(connection, transaction, operation).ConfigureAwait(false); + } + } + + Task Dispatch(SqlConnection connection, SqlTransaction transaction, UnicastTransportOperation operation) + { + TryGetConstraint(operation, out DiscardIfNotReceivedBefore discardIfNotReceivedBefore); + if (TryGetConstraint(operation, out DoNotDeliverBefore doNotDeliverBefore)) + { + if (discardIfNotReceivedBefore != null && discardIfNotReceivedBefore.MaxTime < TimeSpan.MaxValue) { - return false; + throw new Exception("Delayed delivery of messages with TimeToBeReceived set is not supported. Remove the TimeToBeReceived attribute to delay messages of this type."); } - return Equals((DeduplicationKey) obj); - } - public override int GetHashCode() + return delayedMessageTable.Store(operation.Message, doNotDeliverBefore.At - DateTime.UtcNow, operation.Destination, connection, transaction); + } + if (TryGetConstraint(operation, out DelayDeliveryWith delayDeliveryWith)) { - unchecked + if (discardIfNotReceivedBefore != null && discardIfNotReceivedBefore.MaxTime < TimeSpan.MaxValue) { - return (messageId.GetHashCode()*397) ^ destination.GetHashCode(); + throw new Exception("Delayed delivery of messages with TimeToBeReceived set is not supported. Remove the TimeToBeReceived attribute to delay messages of this type."); } + + return delayedMessageTable.Store(operation.Message, delayDeliveryWith.Delay, operation.Destination, connection, transaction); } + + var queue = tableBasedQueueCache.Get(operation.Destination); + return queue.Send(operation.Message, discardIfNotReceivedBefore?.MaxTime ?? TimeSpan.MaxValue, connection, transaction); } + + static bool InReceiveWithNoTransactionMode(TransportTransaction transportTransaction) + { + transportTransaction.TryGet(out SqlTransaction nativeTransaction); + transportTransaction.TryGet(out Transaction ambientTransaction); + + return nativeTransaction == null && ambientTransaction == null; + } + + static bool InReceiveOnlyTransportTransactionMode(TransportTransaction transportTransaction) + { + return transportTransaction.TryGet(ProcessWithNativeTransaction.ReceiveOnlyTransactionMode, out bool _); + } + + static bool TryGetConstraint(IOutgoingTransportOperation operation, out T constraint) where T : DeliveryConstraint + { + constraint = operation.DeliveryConstraints.OfType().FirstOrDefault(); + return constraint != null; + } + + TableBasedQueueCache tableBasedQueueCache; + IDelayedMessageStore delayedMessageTable; + SqlConnectionFactory connectionFactory; + QueueAddressTranslator addressTranslator; + IMulticastToUnicastConverter multicastToUnicastConverter; + static UnicastTransportOperation[] emptyUnicastTransportOperationsList = new UnicastTransportOperation[0]; } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Sending/OperationSorter.cs b/src/NServiceBus.SqlServer/Sending/OperationSorter.cs new file mode 100644 index 000000000..988fccdc9 --- /dev/null +++ b/src/NServiceBus.SqlServer/Sending/OperationSorter.cs @@ -0,0 +1,93 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System.Collections.Generic; + + struct SortingResult + { + public IEnumerable IsolatedDispatch; + public IEnumerable DefaultDispatch; + + public SortingResult(IEnumerable defaultDispatch, IEnumerable isolatedDispatch) + { + DefaultDispatch = defaultDispatch; + IsolatedDispatch = isolatedDispatch; + } + } + + static class OperationSorter + { + public static SortingResult SortAndDeduplicate(this IEnumerable source, QueueAddressTranslator addressTranslator) + { + Dictionary isolatedDispatch = null; + Dictionary defaultDispatch = null; + + foreach (var operation in source) + { + var destination = addressTranslator.Parse(operation.Destination).Address; + var messageId = operation.Message.MessageId; + var deduplicationKey = new DeduplicationKey(messageId, destination); + + if (operation.RequiredDispatchConsistency == DispatchConsistency.Default) + { + if (defaultDispatch == null) + { + defaultDispatch = new Dictionary(); + } + defaultDispatch[deduplicationKey] = operation; + } + else if (operation.RequiredDispatchConsistency == DispatchConsistency.Isolated) + { + if (isolatedDispatch == null) + { + isolatedDispatch = new Dictionary(); + } + isolatedDispatch[deduplicationKey] = operation; + } + } + + return new SortingResult(defaultDispatch?.Values, isolatedDispatch?.Values); + } + + class DeduplicationKey + { + string messageId; + string destination; + + public DeduplicationKey(string messageId, string destination) + { + this.messageId = messageId; + this.destination = destination; + } + + bool Equals(DeduplicationKey other) + { + return string.Equals(messageId, other.messageId) && string.Equals(destination, other.destination); + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + if (ReferenceEquals(this, obj)) + { + return true; + } + if (obj.GetType() != this.GetType()) + { + return false; + } + return Equals((DeduplicationKey)obj); + } + + public override int GetHashCode() + { + unchecked + { + return (messageId.GetHashCode() * 397) ^ destination.GetHashCode(); + } + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Sending/TableBasedQueueDispatcher.cs b/src/NServiceBus.SqlServer/Sending/TableBasedQueueDispatcher.cs deleted file mode 100644 index 7080ade43..000000000 --- a/src/NServiceBus.SqlServer/Sending/TableBasedQueueDispatcher.cs +++ /dev/null @@ -1,125 +0,0 @@ -namespace NServiceBus.Transport.SQLServer -{ - using System.Collections.Generic; - using System.Data.SqlClient; - using System.Threading.Tasks; - using System.Transactions; - using Transport; - - class TableBasedQueueDispatcher : IQueueDispatcher - { - public TableBasedQueueDispatcher(SqlConnectionFactory connectionFactory, ITableBasedQueueOperationsReader queueOperationsReader) - { - this.connectionFactory = connectionFactory; - this.queueOperationsReader = queueOperationsReader; - } - - public async Task DispatchAsIsolated(List operations) - { - if (operations.Count == 0) - { - return; - } -#if NET452 - using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled)) - using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) - { - await Send(operations, connection, null).ConfigureAwait(false); - - scope.Complete(); - } -#else - using (var scope = new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled)) - using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) - using (var tx = connection.BeginTransaction()) - { - await Send(operations, connection, tx).ConfigureAwait(false); - tx.Commit(); - scope.Complete(); - } -#endif - - } - - public async Task DispatchAsNonIsolated(List operations, TransportTransaction transportTransaction) - { - if (operations.Count == 0) - { - return; - } - - if (InReceiveWithNoTransactionMode(transportTransaction) || InReceiveOnlyTransportTransactionMode(transportTransaction)) - { - await DispatchOperationsWithNewConnectionAndTransaction(operations).ConfigureAwait(false); - return; - } - - await DispatchUsingReceiveTransaction(transportTransaction, operations).ConfigureAwait(false); - } - - - async Task DispatchOperationsWithNewConnectionAndTransaction(List operations) - { - using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) - { - if (operations.Count == 1) - { - await Send(operations, connection, null).ConfigureAwait(false); - return; - } - - using (var transaction = connection.BeginTransaction()) - { - await Send(operations, connection, transaction).ConfigureAwait(false); - transaction.Commit(); - } - } - } - - async Task DispatchUsingReceiveTransaction(TransportTransaction transportTransaction, List operations) - { - - transportTransaction.TryGet(out SqlConnection sqlTransportConnection); - transportTransaction.TryGet(out SqlTransaction sqlTransportTransaction); - transportTransaction.TryGet(out Transaction ambientTransaction); - - if (ambientTransaction != null) - { - using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) - { - await Send(operations, connection, null).ConfigureAwait(false); - } - } - else - { - await Send(operations, sqlTransportConnection, sqlTransportTransaction).ConfigureAwait(false); - } - } - - async Task Send(List operations, SqlConnection connection, SqlTransaction transaction) - { - foreach (var operation in operations) - { - var queueOperation = queueOperationsReader.Get(operation); - await queueOperation(connection, transaction).ConfigureAwait(false); - } - } - - static bool InReceiveWithNoTransactionMode(TransportTransaction transportTransaction) - { - transportTransaction.TryGet(out SqlTransaction nativeTransaction); - - transportTransaction.TryGet(out Transaction ambientTransaction); - - return nativeTransaction == null && ambientTransaction == null; - } - - static bool InReceiveOnlyTransportTransactionMode(TransportTransaction transportTransaction) - { - return transportTransaction.TryGet(ProcessWithNativeTransaction.ReceiveOnlyTransactionMode, out bool _); - } - - SqlConnectionFactory connectionFactory; - ITableBasedQueueOperationsReader queueOperationsReader; - } -} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerTransport.cs b/src/NServiceBus.SqlServer/SqlServerTransport.cs index 87c1f2148..dbd6c6026 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransport.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransport.cs @@ -4,7 +4,6 @@ namespace NServiceBus using System.Data.Common; using System.Data.SqlClient; using System.Threading.Tasks; - using Routing; using Settings; using Transport; using Transport.SQLServer; @@ -12,7 +11,7 @@ namespace NServiceBus /// /// SqlServer Transport /// - public class SqlServerTransport : TransportDefinition, IMessageDrivenSubscriptionTransport + public class SqlServerTransport : TransportDefinition { /// /// @@ -39,12 +38,9 @@ static bool LegacyMultiInstanceModeTurnedOn(SettingsHolder settings) /// public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString) { - settings.TryGet(SettingsKeys.DefaultSchemaSettingsKey, out string defaultSchemaOverride); - var queueSchemaSettings = settings.GetOrDefault(); - var catalog = GetDefaultCatalog(settings, connectionString); - var addressParser = new QueueAddressTranslator(catalog, "dbo", defaultSchemaOverride, queueSchemaSettings); - return new SqlServerTransportInfrastructure(addressParser, settings, connectionString); + + return new SqlServerTransportInfrastructure(catalog, settings, connectionString, settings.LocalAddress, settings.LogicalAddress); } static string GetDefaultCatalog(SettingsHolder settings, string connectionString) diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index 5ded99732..bde762d9c 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -12,32 +12,80 @@ namespace NServiceBus.Transport.SQLServer using Settings; using Transport; + /// + /// ConfigureReceiveInfrastructure is called first, before features are started + /// + /// ConfigureSendInfrastructure is called last, when starting + /// class SqlServerTransportInfrastructure : TransportInfrastructure { - internal SqlServerTransportInfrastructure(QueueAddressTranslator addressTranslator, SettingsHolder settings, string connectionString) + internal SqlServerTransportInfrastructure(string catalog, SettingsHolder settings, string connectionString, Func localAddress, Func logicalAddress) { - this.addressTranslator = addressTranslator; this.settings = settings; this.connectionString = connectionString; + this.localAddress = localAddress; + this.logicalAddress = logicalAddress; - schemaAndCatalogSettings = settings.GetOrCreate(); - delayedDeliverySettings = settings.GetOrDefault(); - var timeoutManagerFeatureDisabled = !settings.IsFeatureEnabled(typeof(TimeoutManager)); + if (settings.HasSetting(SettingsKeys.DisableNativePubSub)) + { + OutboundRoutingPolicy = new OutboundRoutingPolicy(OutboundRoutingType.Unicast, OutboundRoutingType.Unicast, OutboundRoutingType.Unicast); + } + else + { + OutboundRoutingPolicy = new OutboundRoutingPolicy(OutboundRoutingType.Unicast, OutboundRoutingType.Multicast, OutboundRoutingType.Unicast); + } + + settings.TryGet(SettingsKeys.DefaultSchemaSettingsKey, out string defaultSchemaOverride); + + var queueSchemaSettings = settings.GetOrDefault(); + addressTranslator = new QueueAddressTranslator(catalog, "dbo", defaultSchemaOverride, queueSchemaSettings); + tableBasedQueueCache = new TableBasedQueueCache(addressTranslator); + connectionFactory = CreateConnectionFactory(); + + //Configure the schema and catalog for logical endpoint-based routing + var schemaAndCatalogSettings = settings.GetOrCreate(); + settings.GetOrCreate().AddOrReplaceInstances("SqlServer", schemaAndCatalogSettings.ToEndpointInstances()); + + //Needs to be invoked here and not when configuring the receiving infrastructure because the EnableMigrationMode flag has to be set up before feature component is initialized. + HandleTimeoutManagerCompatibilityMode(); + + var pubSubSettings = settings.GetOrCreate(); + var subscriptionTableName = pubSubSettings.SubscriptionTable.Qualify(defaultSchemaOverride ?? "dbo", catalog); + subscriptionStore = new PolymorphicSubscriptionStore(new SubscriptionTable(subscriptionTableName.QuotedQualifiedName, connectionFactory)); + var timeToCacheSubscriptions = pubSubSettings.TimeToCacheSubscriptions; + if (timeToCacheSubscriptions.HasValue) + { + subscriptionStore = new CachedSubscriptionStore(subscriptionStore, timeToCacheSubscriptions.Value); + } + var subscriptionTableCreator = new SubscriptionTableCreator(subscriptionTableName, connectionFactory); + settings.Set(subscriptionTableCreator); + } + void HandleTimeoutManagerCompatibilityMode() + { + var delayedDeliverySettings = settings.GetOrCreate(); + var timeoutManagerFeatureDisabled = !settings.IsFeatureEnabled(typeof(TimeoutManager)); diagnostics.Add("NServiceBus.Transport.SqlServer.TimeoutManager", new { FeatureEnabled = !timeoutManagerFeatureDisabled }); - if (delayedDeliverySettings != null) + if (timeoutManagerFeatureDisabled) { - if (timeoutManagerFeatureDisabled) - { - delayedDeliverySettings.DisableTimeoutManagerCompatibility(); - } + delayedDeliverySettings.DisableTimeoutManagerCompatibility(); + } + + settings.Set(SettingsKeys.TimeoutManagerMigrationMode, delayedDeliverySettings.EnableMigrationMode); + } - settings.Set(SettingsKeys.EnableMigrationMode, delayedDeliverySettings.EnableMigrationMode); + SqlConnectionFactory CreateConnectionFactory() + { + if (settings.TryGet(SettingsKeys.ConnectionFactoryOverride, out Func> factoryOverride)) + { + return new SqlConnectionFactory(factoryOverride); } + + return SqlConnectionFactory.Default(connectionString); } public override IEnumerable DeliveryConstraints @@ -45,17 +93,14 @@ public override IEnumerable DeliveryConstraints get { yield return typeof(DiscardIfNotReceivedBefore); - if (delayedDeliverySettings != null) - { - yield return typeof(DoNotDeliverBefore); - yield return typeof(DelayDeliveryWith); - } + yield return typeof(DoNotDeliverBefore); + yield return typeof(DelayDeliveryWith); } } public override TransportTransactionMode TransactionMode { get; } = TransportTransactionMode.TransactionScope; - public override OutboundRoutingPolicy OutboundRoutingPolicy { get; } = new OutboundRoutingPolicy(OutboundRoutingType.Unicast, OutboundRoutingType.Unicast, OutboundRoutingType.Unicast); + public override OutboundRoutingPolicy OutboundRoutingPolicy { get; } public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() { @@ -88,77 +133,70 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() var createMessageBodyComputedColumn = settings.GetOrDefault(SettingsKeys.CreateMessageBodyComputedColumn); - var connectionFactory = CreateConnectionFactory(); - Func receiveStrategyFactory = guarantee => SelectReceiveStrategy(guarantee, scopeOptions.TransactionOptions, connectionFactory); var queuePurger = new QueuePurger(connectionFactory); var queuePeeker = new QueuePeeker(connectionFactory, queuePeekerOptions); - var expiredMessagesPurger = CreateExpiredMessagesPurger(connectionFactory); + var expiredMessagesPurger = CreateExpiredMessagesPurger(); var schemaVerification = new SchemaInspector(queue => connectionFactory.OpenNewConnection()); Func queueFactory = queueName => new TableBasedQueue(addressTranslator.Parse(queueName).QualifiedTableName, queueName); - var delayedMessageStore = GetDelayedQueueTableName(); + //Create delayed delivery infrastructure + var delayedDeliverySettings = settings.GetOrDefault(); + settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new + { + Native = true, + delayedDeliverySettings.Suffix, + delayedDeliverySettings.Interval, + BatchSize = delayedDeliverySettings.MatureBatchSize, + TimoutManager = delayedDeliverySettings.EnableMigrationMode ? "enabled" : "disabled" + }); + + var delayedQueueCanonicalAddress = GetDelayedTableAddress(delayedDeliverySettings); + var inputQueueTable = addressTranslator.Parse(ToTransportAddress(logicalAddress())).QualifiedTableName; + var delayedMessageTable = new DelayedMessageTable(delayedQueueCanonicalAddress.QualifiedTableName, inputQueueTable); + + //Allows dispatcher to store messages in the delayed store + delayedMessageStore = delayedMessageTable; + dueDelayedMessageProcessor = new DueDelayedMessageProcessor(delayedMessageTable, connectionFactory, delayedDeliverySettings.Interval, delayedDeliverySettings.MatureBatchSize); - var sendInfra = ConfigureSendInfrastructure(); return new TransportReceiveInfrastructure( - () => - { - var pump = new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaVerification, waitTimeCircuitBreaker); - if (delayedDeliverySettings == null) - { - return pump; - } - var dispatcher = sendInfra.DispatcherFactory(); - var delayedMessageProcessor = new DelayedMessageProcessor(dispatcher); - return new DelayedDeliveryMessagePump(pump, delayedMessageProcessor); - }, - () => - { - var creator = new QueueCreator(connectionFactory, addressTranslator, createMessageBodyComputedColumn); - if (delayedDeliverySettings == null) - { - return creator; - } - return new DelayedDeliveryQueueCreator(connectionFactory, creator, delayedMessageStore, createMessageBodyComputedColumn); - }, - () => CheckForAmbientTransactionEnlistmentSupport(connectionFactory, scopeOptions.TransactionOptions)); + () => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaVerification, waitTimeCircuitBreaker), + () => new QueueCreator(connectionFactory, addressTranslator, delayedQueueCanonicalAddress, createMessageBodyComputedColumn), + () => CheckForAmbientTransactionEnlistmentSupport(scopeOptions.TransactionOptions)); } - SqlConnectionFactory CreateConnectionFactory() + CanonicalQueueAddress GetDelayedTableAddress(DelayedDeliverySettings delayedDeliverySettings) { - if (settings.TryGet(SettingsKeys.ConnectionFactoryOverride, out Func> factoryOverride)) - { - return new SqlConnectionFactory(factoryOverride); - } - - return SqlConnectionFactory.Default(connectionString); + var delayedQueueLogicalAddress = logicalAddress().CreateQualifiedAddress(delayedDeliverySettings.Suffix); + var delayedQueueAddress = addressTranslator.Generate(delayedQueueLogicalAddress); + return addressTranslator.GetCanonicalForm(delayedQueueAddress); } - static ReceiveStrategy SelectReceiveStrategy(TransportTransactionMode minimumConsistencyGuarantee, TransactionOptions options, SqlConnectionFactory connectionFactory) + ReceiveStrategy SelectReceiveStrategy(TransportTransactionMode minimumConsistencyGuarantee, TransactionOptions options, SqlConnectionFactory connectionFactory) { if (minimumConsistencyGuarantee == TransportTransactionMode.TransactionScope) { - return new ProcessWithTransactionScope(options, connectionFactory, new FailureInfoStorage(10000)); + return new ProcessWithTransactionScope(options, connectionFactory, new FailureInfoStorage(10000), tableBasedQueueCache); } if (minimumConsistencyGuarantee == TransportTransactionMode.SendsAtomicWithReceive) { - return new ProcessWithNativeTransaction(options, connectionFactory, new FailureInfoStorage(10000)); + return new ProcessWithNativeTransaction(options, connectionFactory, new FailureInfoStorage(10000), tableBasedQueueCache); } if (minimumConsistencyGuarantee == TransportTransactionMode.ReceiveOnly) { - return new ProcessWithNativeTransaction(options, connectionFactory, new FailureInfoStorage(10000), transactionForReceiveOnly: true); + return new ProcessWithNativeTransaction(options, connectionFactory, new FailureInfoStorage(10000), tableBasedQueueCache, transactionForReceiveOnly: true); } - return new ProcessWithNoTransaction(connectionFactory); + return new ProcessWithNoTransaction(connectionFactory, tableBasedQueueCache); } - ExpiredMessagesPurger CreateExpiredMessagesPurger(SqlConnectionFactory connectionFactory) + ExpiredMessagesPurger CreateExpiredMessagesPurger() { var purgeBatchSize = settings.HasSetting(SettingsKeys.PurgeBatchSizeKey) ? settings.Get(SettingsKeys.PurgeBatchSizeKey) : null; var enable = settings.GetOrDefault(SettingsKeys.PurgeEnableKey); @@ -172,7 +210,7 @@ ExpiredMessagesPurger CreateExpiredMessagesPurger(SqlConnectionFactory connectio return new ExpiredMessagesPurger(_ => connectionFactory.OpenNewConnection(), purgeBatchSize, enable); } - async Task CheckForAmbientTransactionEnlistmentSupport(SqlConnectionFactory connectionFactory, TransactionOptions transactionOptions) + async Task CheckForAmbientTransactionEnlistmentSupport(TransactionOptions transactionOptions) { if (!settings.TryGet(out TransportTransactionMode requestedTransportTransactionMode)) { @@ -207,59 +245,14 @@ async Task CheckForAmbientTransactionEnlistmentSupport(SqlCo public override TransportSendInfrastructure ConfigureSendInfrastructure() { - var connectionFactory = CreateConnectionFactory(); - - settings.GetOrCreate().AddOrReplaceInstances("SqlServer", schemaAndCatalogSettings.ToEndpointInstances()); - return new TransportSendInfrastructure( () => { - ITableBasedQueueOperationsReader queueOperationsReader = new TableBasedQueueOperationsReader(addressTranslator); - if (delayedDeliverySettings != null) - { - queueOperationsReader = new DelayedDeliveryTableBasedQueueOperationsReader(CreateDelayedMessageTable(), queueOperationsReader); - } - var dispatcher = new MessageDispatcher(new TableBasedQueueDispatcher(connectionFactory, queueOperationsReader), addressTranslator); + var multicastToUnicastConverter = new MulticastToUnicastConverter(subscriptionStore); + var dispatcher = new MessageDispatcher(addressTranslator, multicastToUnicastConverter, tableBasedQueueCache, delayedMessageStore, connectionFactory); return dispatcher; }, - () => Task.FromResult(DelayedDeliveryInfrastructure.CheckForInvalidSettings(settings))); - } - - DelayedMessageTable CreateDelayedMessageTable() - { - var deletedQueueTableName = GetDelayedQueueTableName(); - - var inputQueueTable = addressTranslator.Parse(ToTransportAddress(GetLogicalAddress())).QualifiedTableName; - return new DelayedMessageTable(deletedQueueTableName.QualifiedTableName, inputQueueTable); - } - - /// - /// This method is copied from the core because there is no other way to reliable get the address of the main input queue. - /// - /// - LogicalAddress GetLogicalAddress() - { - var queueNameBase = settings.GetOrDefault("BaseInputQueueName") ?? settings.EndpointName(); - - //note: This is an old hack, we are passing the endpoint name to bind but we only care about the properties - var mainInstanceProperties = BindToLocalEndpoint(new EndpointInstance(settings.EndpointName())).Properties; - - return LogicalAddress.CreateLocalAddress(queueNameBase, mainInstanceProperties); - } - - CanonicalQueueAddress GetDelayedQueueTableName() - { - if (delayedDeliverySettings == null) - { - return null; - } - if (string.IsNullOrEmpty(delayedDeliverySettings.Suffix)) - { - throw new Exception("Native delayed delivery feature requires configuring a table suffix."); - } - var delayedQueueLogicalAddress = GetLogicalAddress().CreateQualifiedAddress(delayedDeliverySettings.Suffix); - var delayedQueueAddress = addressTranslator.Generate(delayedQueueLogicalAddress); - return addressTranslator.GetCanonicalForm(delayedQueueAddress); + () => Task.FromResult(StartupCheckResult.Success)); } public override Task Start() @@ -269,38 +262,20 @@ public override Task Start() settings.AddStartupDiagnosticsSection(diagnosticSection.Key, diagnosticSection.Value); } - if (delayedDeliverySettings == null) - { - settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new - { - Native = false - }); - return Task.FromResult(0); - } - - settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new - { - Native = true, - delayedDeliverySettings.Suffix, - delayedDeliverySettings.Interval, - BatchSize = delayedDeliverySettings.MatureBatchSize, - TimoutManager = delayedDeliverySettings.EnableMigrationMode ? "enabled" : "disabled" - }); - - var delayedMessageTable = CreateDelayedMessageTable(); - delayedMessageHandler = new DelayedMessageHandler(delayedMessageTable, CreateConnectionFactory(), delayedDeliverySettings.Interval, delayedDeliverySettings.MatureBatchSize); - delayedMessageHandler.Start(); + dueDelayedMessageProcessor?.Start(); return Task.FromResult(0); } public override Task Stop() { - return delayedMessageHandler?.Stop() ?? Task.FromResult(0); + return dueDelayedMessageProcessor?.Stop() ?? Task.FromResult(0); } public override TransportSubscriptionInfrastructure ConfigureSubscriptionInfrastructure() { - throw new NotImplementedException(); + return new TransportSubscriptionInfrastructure(() => new SubscriptionManager(subscriptionStore, + settings.EndpointName(), + localAddress())); } public override EndpointInstance BindToLocalEndpoint(EndpointInstance instance) @@ -331,10 +306,14 @@ public override string MakeCanonicalForm(string transportAddress) QueueAddressTranslator addressTranslator; string connectionString; + Func localAddress; + Func logicalAddress; SettingsHolder settings; - EndpointSchemaAndCatalogSettings schemaAndCatalogSettings; - DelayedMessageHandler delayedMessageHandler; - DelayedDeliverySettings delayedDeliverySettings; + DueDelayedMessageProcessor dueDelayedMessageProcessor; Dictionary diagnostics = new Dictionary(); + SqlConnectionFactory connectionFactory; + ISubscriptionStore subscriptionStore; + IDelayedMessageStore delayedMessageStore = new SendOnlyDelayedMessageStore(); + TableBasedQueueCache tableBasedQueueCache; } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs index b2cca8110..e3bc90bb8 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs @@ -10,7 +10,7 @@ /// /// Adds extra configuration for the Sql Server transport. /// - public static class SqlServerTransportSettingsExtensions + public static partial class SqlServerTransportSettingsExtensions { /// /// Sets a default schema for both input and output queues @@ -178,20 +178,28 @@ public static TransportExtensions WithPeekDelay(this Transpo } /// - /// Enables native delayed delivery. + /// Configures native delayed delivery. /// - public static DelayedDeliverySettings UseNativeDelayedDelivery(this TransportExtensions transportExtensions) + public static DelayedDeliverySettings NativeDelayedDelivery(this TransportExtensions transportExtensions) { var sendOnlyEndpoint = transportExtensions.GetSettings().GetOrDefault("Endpoint.SendOnly"); if (sendOnlyEndpoint) { - throw new Exception("Native delayed delivery is only supported for endpoints capable of receiving messages."); + throw new Exception("Delayed delivery is only supported for endpoints capable of receiving messages."); } var settings = transportExtensions.GetSettings().GetOrCreate(); return settings; } + /// + /// Configures publish/subscribe behavior. + /// + public static SubscriptionSettings SubscriptionSettings(this TransportExtensions transportExtensions) + { + return transportExtensions.GetSettings().GetOrCreate(); + } + /// /// Instructs the transport to purge all expired messages from the input queue before starting the processing. /// @@ -218,20 +226,6 @@ public static TransportExtensions CreateMessageBodyComputedC return transportExtensions; } - /// - /// Enables multi-instance mode. - /// - /// The to extend. - /// Function that returns opened sql connection based on destination queue name. - [ObsoleteEx( - RemoveInVersion = "5.0", - TreatAsErrorFromVersion = "4.0", - Message = "Multi-instance mode has been deprecated. Use Transport Bridge and/or multi-catalog addressing instead.")] - public static TransportExtensions EnableLegacyMultiInstanceMode(this TransportExtensions transportExtensions, Func> sqlConnectionFactory) - { - throw new NotImplementedException(); - } - static ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SubscriptionTableCreator.cs b/src/NServiceBus.SqlServer/SubscriptionTableCreator.cs new file mode 100644 index 000000000..bfb4c2986 --- /dev/null +++ b/src/NServiceBus.SqlServer/SubscriptionTableCreator.cs @@ -0,0 +1,36 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System.Data; + using System.Data.SqlClient; + using System.Threading.Tasks; + + class SubscriptionTableCreator + { + QualifiedSubscriptionTableName tableName; + SqlConnectionFactory connectionFactory; + + public SubscriptionTableCreator(QualifiedSubscriptionTableName tableName, SqlConnectionFactory connectionFactory) + { + this.tableName = tableName; + this.connectionFactory = connectionFactory; + } + public async Task CreateIfNecessary() + { + using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) + using (var transaction = connection.BeginTransaction()) + { +#pragma warning disable 618 + var sql = string.Format(SqlConstants.CreateSubscriptionTableText, tableName.QuotedQualifiedName, tableName.QuotedCatalog); +#pragma warning restore 618 + using (var command = new SqlCommand(sql, connection, transaction) + { + CommandType = CommandType.Text + }) + { + await command.ExecuteNonQueryAsync().ConfigureAwait(false); + } + transaction.Commit(); + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SubscriptionTableInstaller.cs b/src/NServiceBus.SqlServer/SubscriptionTableInstaller.cs new file mode 100644 index 000000000..b20afec2b --- /dev/null +++ b/src/NServiceBus.SqlServer/SubscriptionTableInstaller.cs @@ -0,0 +1,26 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System.Threading.Tasks; + using Installation; + using Settings; + + /// + /// Subscription table cannot be created via ICreateQueues because it needs to be created also for send-only endpoints. Installers are registered + /// in the container via a convention so the only way to pass objects to them is via settings or via container. The transport infrastructure + /// cannot access the container so we're doing it via settings (HACK). + /// + class SubscriptionTableInstaller : INeedToInstallSomething + { + SubscriptionTableCreator creator; + + public SubscriptionTableInstaller(ReadOnlySettings settings) + { + creator = settings.GetOrDefault(); + } + + public Task Install(string identity) + { + return creator?.CreateIfNecessary(); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/obsoletes.cs b/src/NServiceBus.SqlServer/obsoletes.cs index 58d56b54b..f82f3c046 100644 --- a/src/NServiceBus.SqlServer/obsoletes.cs +++ b/src/NServiceBus.SqlServer/obsoletes.cs @@ -1,4 +1,113 @@ - -#pragma warning disable 1591 +#pragma warning disable 1591 + +namespace NServiceBus +{ + using System; + using System.Reflection; + using Pipeline; + + public static class MessageDrivenPubSubCompatibility + { + [ObsoleteEx( + Message = @"Subscription authorization has been moved to message-driven pub-sub migration mode. + +var compatMode = transport.EnableMessageDrivenPubSubCompatibilityMode(); +compatMode.SubscriptionAuthorizer(authorizer);", + ReplacementTypeOrMember = "SubscriptionMigrationModeSettings.SubscriptionAuthorizer(transportExtensions, authorizer)", + TreatAsErrorFromVersion = "5.0", + RemoveInVersion = "6.0")] + public static void SubscriptionAuthorizer(this TransportExtensions transportExtensions, Func authorizer) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + Message = "Publishing can not be disabled in version 5.0 and above. The transport handles publish-subscribe natively and does not require a separate subscription persistence.", + TreatAsErrorFromVersion = "5.0", + RemoveInVersion = "6.0")] + public static void DisablePublishing(this TransportExtensions transportExtensions) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + Message = @"Publisher registration has been moved to message-driven pub-sub migration mode. + +var compatMode = transport.EnableMessageDrivenPubSubCompatibilityMode(); +compatMode.RegisterPublisher(eventType, publisherEndpoint);", + ReplacementTypeOrMember = "SubscriptionMigrationModeSettings.RegisterPublisher(routingSettings, eventType, publisherEndpoint)", + TreatAsErrorFromVersion = "5.0", + RemoveInVersion = "6.0")] + public static void RegisterPublisher(this RoutingSettings routingSettings, Type eventType, string publisherEndpoint) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + Message = @"Publisher registration has been moved to message-driven pub-sub migration mode. + +var compatMode = transport.EnableMessageDrivenPubSubCompatibilityMode(); +compatMode.RegisterPublisher(assembly, publisherEndpoint);", + ReplacementTypeOrMember = "SubscriptionMigrationModeSettings.RegisterPublisher(routingSettings, assembly, publisherEndpoint)", + TreatAsErrorFromVersion = "5.0", + RemoveInVersion = "6.0")] + public static void RegisterPublisher(this RoutingSettings routingSettings, Assembly assembly, string publisherEndpoint) + { + throw new NotImplementedException(); + } + + [ObsoleteEx( + Message = @"Publisher registration has been moved to message-driven pub-sub migration mode. + +var compatMode = transport.EnableMessageDrivenPubSubCompatibilityMode(); +compatMode.RegisterPublisher(assembly, namespace, publisherEndpoint);", + ReplacementTypeOrMember = "SubscriptionMigrationModeSettings.RegisterPublisher(routingSettings, assembly, namespace, publisherEndpoint)", + TreatAsErrorFromVersion = "5.0", + RemoveInVersion = "6.0")] + public static void RegisterPublisher(this RoutingSettings routingSettings, Assembly assembly, string @namespace, string publisherEndpoint) + { + throw new NotImplementedException(); + } + } +} + +namespace NServiceBus.Transport.SQLServer +{ + using System; + using System.Data.SqlClient; + + public static partial class SendOptionsExtensions + { + [ObsoleteEx( + RemoveInVersion = "6.0", + TreatAsErrorFromVersion = "5.0", + ReplacementTypeOrMember = "UseCustomSqlTransaction", + Message = "The connection parameter is no longer required.")] + public static void UseCustomSqlConnectionAndTransaction(this SendOptions options, SqlConnection connection, SqlTransaction transaction) + { + throw new NotImplementedException(); + } + } +} + +namespace NServiceBus.Transport.SQLServer +{ + using System; + + public static partial class SqlServerTransportSettingsExtensions + { + [ObsoleteEx( + RemoveInVersion = "6.0", + TreatAsErrorFromVersion = "5.0", + ReplacementTypeOrMember = "NativeDelayedDelivery", + Message = "Starting from version 5 native delayed delivery is always enabled. It can be configured via NativeDelayedDelivery")] + public static DelayedDeliverySettings UseNativeDelayedDelivery(this TransportExtensions transportExtensions) + { + throw new NotImplementedException(); + } + } +} + + #pragma warning restore 1591 \ No newline at end of file