Skip to content

Commit

Permalink
Merge branch 'hotfix-2.2.6' into support-2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Jul 14, 2017
2 parents fe958ea + 7547005 commit ce7ab32
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
<Compile Include="When_receiving_a_poison_message.cs" />
<Compile Include="When_sharing_the_receive_connection.cs" />
<Compile Include="When_publishing_an_event_with_the_subscriber_scaled_out.cs" />
<Compile Include="When_using_unicode_characters_in_headers.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\NServiceBus.SqlServer\NServiceBus.SqlServer.csproj">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,16 @@ public void When_multi_db_via_callback_it_fails_to_start_even_when_not_using_oth
}

[Test]
public void When_multi_schema_via_configuration_it_starts()
public void When_multi_schema_via_configuration_it_fails()
{
//Fails because there are multiple connection strings at play
var context = new Context();
Scenario.Define(context)
.WithEndpoint<Receiver>(b => b.CustomConfig(c => AddConnectionString("NServiceBus/Transport/OtherEndpoint", OtherSchemaConnectionString)))
.Done(c => true)
.Run();

Assert.IsNull(context.Exceptions);
StringAssert.Contains(ExceptionText, context.Exceptions);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
namespace NServiceBus.AcceptanceTests.Basic
{
using System.Collections.Generic;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

public class When_using_unicode_characters_in_headers : NServiceBusAcceptanceTest
{
[Test]
public void Should_support_unicode_characters()
{
var context = new Context();

var sentHeaders = new Dictionary<string, string>
{
{"a-B1", "a-B"},
{"a-B2", "a-ɤϡ֎ᾣ♥-b"},
{"a-ɤϡ֎ᾣ♥-B3", "a-B"},
{"a-B4", "a-\U0001F60D-b"},
{"a-\U0001F605-B5", "a-B"},
{"a-B6", "a-😍-b"},
{"a-😅-B7", "a-B"}
};

Scenario.Define(context)
.WithEndpoint<Endpoint>(b => b.Given(bus =>
{
var message = new TestMessage();
foreach (var header in sentHeaders)
{
bus.SetMessageHeader(message, header.Key, header.Value);
}
bus.SendLocal(message);
}))
.Done(c => c.Done)
.Run();

Assert.IsNotEmpty(context.Headers);
CollectionAssert.IsSubsetOf(sentHeaders, context.Headers);
}

public class Context : ScenarioContext
{
public bool Done { get; set; }
public Dictionary<string, string> Headers { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>();
}

class Handler : IHandleMessages<TestMessage>
{
public Context Context { get; set; }
public IBus Bus { get; set; }

public void Handle(TestMessage message)
{
Context.Headers = new Dictionary<string, string>(Bus.CurrentMessageContext.Headers);
Context.Done = true;
}
}
}

public class TestMessage : IMessage
{
}
}
}
2 changes: 2 additions & 0 deletions src/NServiceBus.SqlServer.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ II.2.12 &lt;HandlesEvent /&gt;&#xD;
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsParsFormattingSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsWrapperSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>



Expand Down
19 changes: 1 addition & 18 deletions src/NServiceBus.SqlServer/ExpiredMessagesPurger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class ExpiredMessagesPurger : IExecutor
{
static readonly ILog Logger = LogManager.GetLogger(typeof(ExpiredMessagesPurger));
static readonly ILog Logger = LogManager.GetLogger(typeof(ExpiredMessagesPurger));

readonly TableBasedQueue queue;
readonly Func<SqlConnection> openConnection;
Expand All @@ -25,8 +25,6 @@ public ExpiredMessagesPurger(TableBasedQueue queue, Func<SqlConnection> openConn

public void Start(int maximumConcurrency, CancellationToken token)
{
LogWarningWhenIndexIsMissing();

this.token = token;
purgeTaskTimer = new Timer(PurgeExpiredMessagesCallback, null, TimeSpan.Zero, Timeout.InfiniteTimeSpan);
}
Expand All @@ -40,21 +38,6 @@ public void Stop()
}
}

void LogWarningWhenIndexIsMissing()
{
try
{
using (var connection = openConnection())
{
queue.LogWarningWhenIndexIsMissing(connection);
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex);
}
}

void PurgeExpiredMessagesCallback(object state)
{
Logger.DebugFormat("Starting a new expired message purge task for table {0}.", queue);
Expand Down
1 change: 1 addition & 0 deletions src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
<Compile Include="ReceiveRampUpController.cs" />
<Compile Include="ReceiveResult.cs" />
<Compile Include="ReceiveStrategyFactory.cs" />
<Compile Include="SchemaInspector.cs" />
<Compile Include="TaskTracker.cs" />
<Compile Include="SecondaryReceiveConfiguration.cs" />
<Compile Include="SecondaryReceiveSettings.cs" />
Expand Down
63 changes: 63 additions & 0 deletions src/NServiceBus.SqlServer/SchemaInspector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
namespace NServiceBus.Transport.SQLServer
{
using System;
using System.Data.SqlClient;
using NServiceBus.Logging;
using NServiceBus.Transports.SQLServer;

class SchemaInspector
{
static readonly ILog Logger = LogManager.GetLogger<SchemaInspector>();

Func<SqlConnection> openConnection;

public SchemaInspector(Func<SqlConnection> openConnection)
{
this.openConnection = openConnection;
}

public void PerformInspection(TableBasedQueue queue)
{
VerifyExpiredIndex(queue);
VerifyHeadersColumnType(queue);
}

void VerifyExpiredIndex(TableBasedQueue queue)
{
try
{
using (var connection = openConnection())
{
var indexExists = queue.CheckExpiresIndexPresence(connection);
if (!indexExists)
{
Logger.Warn($@"Table {queue} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.");
}
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex);
}
}

void VerifyHeadersColumnType(TableBasedQueue queue)
{
try
{
using (var connection = openConnection())
{
var columnType = queue.CheckHeadersColumnType(connection);
if (string.Equals(columnType, "varchar", StringComparison.OrdinalIgnoreCase))
{
Logger.Warn($"Table {queue} stores headers in a non Unicode-compatible column (varchar).{Environment.NewLine}This may lead to data loss when sending non-ASCII characters in headers. SQL Server transport 3.1 and newer can take advantage of the nvarchar column type for headers. Please change the column type in the database.");
}
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking column type on table {0} failed. Exception: {1}", queue, ex);
}
}
}
}
7 changes: 6 additions & 1 deletion src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using CircuitBreakers;
using Janitor;
using NServiceBus.Features;
using NServiceBus.Transport.SQLServer;
using Unicast.Transport;

/// <summary>
Expand Down Expand Up @@ -55,16 +56,20 @@ public void Init(Address primaryAddress, TransactionSettings transactionSettings
var primaryQueue = new TableBasedQueue(primaryAddress, locaConnectionParams.Schema);
primaryReceiver = new AdaptivePollingReceiver(receiveStrategy, primaryQueue, endProcessMessage, circuitBreaker, transportNotifications);

var schemaInspector = new SchemaInspector(() => connectionFactory.OpenNewConnection(locaConnectionParams.ConnectionString));
schemaInspector.PerformInspection(primaryQueue);

if (secondaryReceiveSettings.IsEnabled)
{
var secondaryQueue = new TableBasedQueue(SecondaryReceiveSettings.ReceiveQueue.GetTableName(), locaConnectionParams.Schema);
secondaryReceiver = new AdaptivePollingReceiver(receiveStrategy, secondaryQueue, endProcessMessage, circuitBreaker, transportNotifications);
schemaInspector.PerformInspection(secondaryQueue);
}
else
{
secondaryReceiver = new NullExecutor();
}

expiredMessagesPurger = new ExpiredMessagesPurger(primaryQueue, () => connectionFactory.OpenNewConnection(locaConnectionParams.ConnectionString), purgeExpiredMessagesParams);
}

Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.SqlServer/SqlServerQueueCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ [CorrelationId] [varchar](255) NULL,
[ReplyToAddress] [varchar](255) NULL,
[Recoverable] [bit] NOT NULL,
[Expires] [datetime] NULL,
[Headers] [varchar](max) NOT NULL,
[Headers] [nvarchar](max) NOT NULL,
[Body] [varbinary](max) NULL,
[RowVersion] [bigint] IDENTITY(1,1) NOT NULL
) ON [PRIMARY];
Expand Down
26 changes: 19 additions & 7 deletions src/NServiceBus.SqlServer/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,24 @@ public int PurgeBatchOfExpiredMessages(SqlConnection connection, int purgeBatchS
}
}

public void LogWarningWhenIndexIsMissing(SqlConnection connection)
public bool CheckExpiresIndexPresence(SqlConnection connection)
{
var commandText = string.Format(SqlCheckIfExpiresIndexIsPresent, ExpiresIndexName, this.schema, this.tableName);

using (var command = new SqlCommand(commandText, connection))
{
var rowsCount = (int) command.ExecuteScalar();
return rowsCount > 0;
}
}

if (rowsCount == 0)
{
Logger.Warn($@"Table {schema}.{tableName} does not contain index '{ExpiresIndexName}'.
Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.");
}
public string CheckHeadersColumnType(SqlConnection connection)
{
var commandText = string.Format(SqlCheckHeaderColumnType, this.schema, this.tableName);

using (var command = new SqlCommand(commandText, connection))
{
return (string)command.ExecuteScalar();
}
}

Expand All @@ -239,7 +244,7 @@ public override string ToString()
SqlDbType.VarChar,
SqlDbType.Bit,
SqlDbType.Int,
SqlDbType.VarChar,
SqlDbType.NVarChar,
SqlDbType.VarBinary
};

Expand All @@ -261,6 +266,13 @@ DELETE FROM message

const string ExpiresIndexName = "Index_Expires";

const string SqlCheckHeaderColumnType =
@"SELECT t.name
FROM sys.columns c
INNER JOIN sys.types t ON c.system_type_id = t.system_type_id
WHERE c.object_id = OBJECT_ID('{0}.{1}')
AND c.name = 'Headers'";

const int IdColumn = 0;
const int CorrelationIdColumn = 1;
const int ReplyToAddressColumn = 2;
Expand Down

0 comments on commit ce7ab32

Please sign in to comment.