diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj index e6e59f711..35132ad2d 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj +++ b/src/NServiceBus.SqlServer.AcceptanceTests/NServiceBus.SqlServer.AcceptanceTests.csproj @@ -216,6 +216,7 @@ + diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs index eee05cf06..210341169 100644 --- a/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_in_native_transaction_mode.cs @@ -57,15 +57,16 @@ public void When_multi_db_via_callback_it_fails_to_start_even_when_not_using_oth } [Test] - public void When_multi_schema_via_configuration_it_starts() + public void When_multi_schema_via_configuration_it_fails() { + //Fails because there are multiple connection strings at play var context = new Context(); Scenario.Define(context) .WithEndpoint(b => b.CustomConfig(c => AddConnectionString("NServiceBus/Transport/OtherEndpoint", OtherSchemaConnectionString))) .Done(c => true) .Run(); - Assert.IsNull(context.Exceptions); + StringAssert.Contains(ExceptionText, context.Exceptions); } [Test] diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_using_unicode_characters_in_headers.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_using_unicode_characters_in_headers.cs new file mode 100644 index 000000000..725ece161 --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_using_unicode_characters_in_headers.cs @@ -0,0 +1,73 @@ +namespace NServiceBus.AcceptanceTests.Basic +{ + using System.Collections.Generic; + using NServiceBus.AcceptanceTesting; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + + public class When_using_unicode_characters_in_headers : NServiceBusAcceptanceTest + { + [Test] + public void Should_support_unicode_characters() + { + var context = new Context(); + + var sentHeaders = new Dictionary + { + {"a-B1", "a-B"}, + {"a-B2", "a-ɤϡ֎ᾣ♥-b"}, + {"a-ɤϡ֎ᾣ♥-B3", "a-B"}, + {"a-B4", "a-\U0001F60D-b"}, + {"a-\U0001F605-B5", "a-B"}, + {"a-B6", "a-😍-b"}, + {"a-😅-B7", "a-B"} + }; + + Scenario.Define(context) + .WithEndpoint(b => b.Given(bus => + { + var message = new TestMessage(); + foreach (var header in sentHeaders) + { + bus.SetMessageHeader(message, header.Key, header.Value); + } + bus.SendLocal(message); + })) + .Done(c => c.Done) + .Run(); + + Assert.IsNotEmpty(context.Headers); + CollectionAssert.IsSubsetOf(sentHeaders, context.Headers); + } + + public class Context : ScenarioContext + { + public bool Done { get; set; } + public Dictionary Headers { get; set; } + } + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() + { + EndpointSetup(); + } + + class Handler : IHandleMessages + { + public Context Context { get; set; } + public IBus Bus { get; set; } + + public void Handle(TestMessage message) + { + Context.Headers = new Dictionary(Bus.CurrentMessageContext.Headers); + Context.Done = true; + } + } + } + + public class TestMessage : IMessage + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.sln.DotSettings b/src/NServiceBus.SqlServer.sln.DotSettings index 0529d9fb6..190753cd9 100644 --- a/src/NServiceBus.SqlServer.sln.DotSettings +++ b/src/NServiceBus.SqlServer.sln.DotSettings @@ -560,6 +560,8 @@ II.2.12 <HandlesEvent /> True True True + True + True diff --git a/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs b/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs index 7ff8194a0..e1c974357 100644 --- a/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs +++ b/src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs @@ -7,7 +7,7 @@ class ExpiredMessagesPurger : IExecutor { - static readonly ILog Logger = LogManager.GetLogger(typeof(ExpiredMessagesPurger)); + static readonly ILog Logger = LogManager.GetLogger(typeof(ExpiredMessagesPurger)); readonly TableBasedQueue queue; readonly Func openConnection; @@ -25,8 +25,6 @@ public ExpiredMessagesPurger(TableBasedQueue queue, Func openConn public void Start(int maximumConcurrency, CancellationToken token) { - LogWarningWhenIndexIsMissing(); - this.token = token; purgeTaskTimer = new Timer(PurgeExpiredMessagesCallback, null, TimeSpan.Zero, Timeout.InfiniteTimeSpan); } @@ -40,21 +38,6 @@ public void Stop() } } - void LogWarningWhenIndexIsMissing() - { - try - { - using (var connection = openConnection()) - { - queue.LogWarningWhenIndexIsMissing(connection); - } - } - catch (Exception ex) - { - Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex); - } - } - void PurgeExpiredMessagesCallback(object state) { Logger.DebugFormat("Starting a new expired message purge task for table {0}.", queue); diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index 80c80daba..e036e9530 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -111,6 +111,7 @@ + diff --git a/src/NServiceBus.SqlServer/SchemaInspector.cs b/src/NServiceBus.SqlServer/SchemaInspector.cs new file mode 100644 index 000000000..56b4c3171 --- /dev/null +++ b/src/NServiceBus.SqlServer/SchemaInspector.cs @@ -0,0 +1,63 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + using System.Data.SqlClient; + using NServiceBus.Logging; + using NServiceBus.Transports.SQLServer; + + class SchemaInspector + { + static readonly ILog Logger = LogManager.GetLogger(); + + Func openConnection; + + public SchemaInspector(Func openConnection) + { + this.openConnection = openConnection; + } + + public void PerformInspection(TableBasedQueue queue) + { + VerifyExpiredIndex(queue); + VerifyHeadersColumnType(queue); + } + + void VerifyExpiredIndex(TableBasedQueue queue) + { + try + { + using (var connection = openConnection()) + { + var indexExists = queue.CheckExpiresIndexPresence(connection); + if (!indexExists) + { + Logger.Warn($@"Table {queue} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."); + } + } + } + catch (Exception ex) + { + Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex); + } + } + + void VerifyHeadersColumnType(TableBasedQueue queue) + { + try + { + using (var connection = openConnection()) + { + var columnType = queue.CheckHeadersColumnType(connection); + if (string.Equals(columnType, "varchar", StringComparison.OrdinalIgnoreCase)) + { + Logger.Warn($"Table {queue} stores headers in a non Unicode-compatible column (varchar).{Environment.NewLine}This may lead to data loss when sending non-ASCII characters in headers. SQL Server transport 3.1 and newer can take advantage of the nvarchar column type for headers. Please change the column type in the database."); + } + } + } + catch (Exception ex) + { + Logger.WarnFormat("Checking column type on table {0} failed. Exception: {1}", queue, ex); + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs index 2ceb18a64..31dbcdc48 100644 --- a/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs +++ b/src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs @@ -5,6 +5,7 @@ using CircuitBreakers; using Janitor; using NServiceBus.Features; + using NServiceBus.Transport.SQLServer; using Unicast.Transport; /// @@ -55,16 +56,20 @@ public void Init(Address primaryAddress, TransactionSettings transactionSettings var primaryQueue = new TableBasedQueue(primaryAddress, locaConnectionParams.Schema); primaryReceiver = new AdaptivePollingReceiver(receiveStrategy, primaryQueue, endProcessMessage, circuitBreaker, transportNotifications); + var schemaInspector = new SchemaInspector(() => connectionFactory.OpenNewConnection(locaConnectionParams.ConnectionString)); + schemaInspector.PerformInspection(primaryQueue); + if (secondaryReceiveSettings.IsEnabled) { var secondaryQueue = new TableBasedQueue(SecondaryReceiveSettings.ReceiveQueue.GetTableName(), locaConnectionParams.Schema); secondaryReceiver = new AdaptivePollingReceiver(receiveStrategy, secondaryQueue, endProcessMessage, circuitBreaker, transportNotifications); + schemaInspector.PerformInspection(secondaryQueue); } else { secondaryReceiver = new NullExecutor(); } - + expiredMessagesPurger = new ExpiredMessagesPurger(primaryQueue, () => connectionFactory.OpenNewConnection(locaConnectionParams.ConnectionString), purgeExpiredMessagesParams); } diff --git a/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs b/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs index 13612dba5..6d7e2b891 100644 --- a/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs +++ b/src/NServiceBus.SqlServer/SqlServerQueueCreator.cs @@ -14,7 +14,7 @@ [CorrelationId] [varchar](255) NULL, [ReplyToAddress] [varchar](255) NULL, [Recoverable] [bit] NOT NULL, [Expires] [datetime] NULL, - [Headers] [varchar](max) NOT NULL, + [Headers] [nvarchar](max) NOT NULL, [Body] [varbinary](max) NULL, [RowVersion] [bigint] IDENTITY(1,1) NOT NULL ) ON [PRIMARY]; diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 84694af01..0e26d4316 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -203,19 +203,24 @@ public int PurgeBatchOfExpiredMessages(SqlConnection connection, int purgeBatchS } } - public void LogWarningWhenIndexIsMissing(SqlConnection connection) + public bool CheckExpiresIndexPresence(SqlConnection connection) { var commandText = string.Format(SqlCheckIfExpiresIndexIsPresent, ExpiresIndexName, this.schema, this.tableName); using (var command = new SqlCommand(commandText, connection)) { var rowsCount = (int) command.ExecuteScalar(); + return rowsCount > 0; + } + } - if (rowsCount == 0) - { - Logger.Warn($@"Table {schema}.{tableName} does not contain index '{ExpiresIndexName}'. -Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."); - } + public string CheckHeadersColumnType(SqlConnection connection) + { + var commandText = string.Format(SqlCheckHeaderColumnType, this.schema, this.tableName); + + using (var command = new SqlCommand(commandText, connection)) + { + return (string)command.ExecuteScalar(); } } @@ -239,7 +244,7 @@ public override string ToString() SqlDbType.VarChar, SqlDbType.Bit, SqlDbType.Int, - SqlDbType.VarChar, + SqlDbType.NVarChar, SqlDbType.VarBinary }; @@ -261,6 +266,13 @@ DELETE FROM message const string ExpiresIndexName = "Index_Expires"; + const string SqlCheckHeaderColumnType = +@"SELECT t.name +FROM sys.columns c +INNER JOIN sys.types t ON c.system_type_id = t.system_type_id +WHERE c.object_id = OBJECT_ID('{0}.{1}') + AND c.name = 'Headers'"; + const int IdColumn = 0; const int CorrelationIdColumn = 1; const int ReplyToAddressColumn = 2;