Skip to content

Commit

Permalink
Merge branch 'hotfix-2.2.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcin Hoppe committed Jul 5, 2016
2 parents 90ed2a8 + ff5bed1 commit a3f27f3
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
<Compile Include="App_Packages\NSB.AcceptanceTests.5.2.6\Volatile\When_sending_to_non_durable_endpoint.cs" />
<Compile Include="When_in_native_transaction_mode.cs" />
<Compile Include="When_processing_messages.cs" />
<Compile Include="When_ReplyTo_address_does_not_exist.cs" />
<Compile Include="When_using_non_standard_schema.cs" />
<Compile Include="When_callback_receiver_is_disabled.cs" />
<Compile Include="When_using_different_connection_strings_for_each_endpoint.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
namespace NServiceBus.SqlServer.AcceptanceTests
{
using System;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Config;
using NServiceBus.Faults;
using NServiceBus.Features;
using NUnit.Framework;

public class When_ReplyTo_address_does_not_exist
{
[Test]
public void Should_throw()
{
var context = Scenario.Define<Context>()
.WithEndpoint<Attacker>(b => b.When(bus => bus.SendLocal(new StartCommand())))
.WithEndpoint<Victim>()
.AllowExceptions()
.Done(c => c.ExceptionReceived)
.Run();

Assert.That(context.ExceptionMessage, Contains.Substring("The destination queue") & Contains.Substring("could not be found"));
Assert.That(context.ExceptionMessage, Contains.Substring("error] VALUES(NEWID(), NULL, NULL, 1, NULL, '', NULL); DROP TABLE [Victim]; INSERT INTO [error"));
}

class Context : ScenarioContext
{
public bool ExceptionReceived { get; set; }
public string ExceptionMessage { get; set; }
}

class Attacker : EndpointConfigurationBuilder
{
public Attacker()
{
EndpointSetup<DefaultServer>(c =>
{
c.UseTransport<SqlServerTransport>()
.DisableCallbackReceiver();
c.OverridePublicReturnAddress(Address.Parse("error] VALUES(NEWID(), NULL, NULL, 1, NULL, '', NULL); DROP TABLE [Victim]; INSERT INTO [error"));
})
.AddMapping<AttackCommand>(typeof(Victim));
}

class StartHandler : IHandleMessages<StartCommand>
{
public IBus Bus { get; set; }

public void Handle(StartCommand message)
{
Bus.Send(new AttackCommand());
}
}

class AttackResponseHandler : IHandleMessages<AttackResponse>
{
public void Handle(AttackResponse message)
{
}
}
}

class Victim : EndpointConfigurationBuilder
{
public Victim()
{
EndpointSetup<DefaultServer>(b =>
{
b.RegisterComponents(c => c.ConfigureComponent<CustomFaultManager>(DependencyLifecycle.SingleInstance));
b.DisableFeature<TimeoutManager>();
})
.WithConfig<TransportConfig>(c => c.MaxRetries = 0);
}

class AttackHandler : IHandleMessages<AttackCommand>
{
public IBus Bus { get; set; }

public void Handle(AttackCommand message)
{
Bus.Reply(new AttackResponse());
}
}

class CustomFaultManager : IManageMessageFailures
{
public Context Context { get; set; }

public void SerializationFailedForMessage(TransportMessage message, Exception e)
{
}

public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e)
{
Context.ExceptionMessage = e.Message;
Context.ExceptionReceived = true;
}

public void Init(Address address)
{
}
}
}

class StartCommand : ICommand
{
}

class AttackCommand : ICommand
{
}

class AttackResponse : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="AdaptiveExecutorSimulator\Simulations.cs" />
<Compile Include="SqlConnectionFactoryConfigTests.cs" />
<Compile Include="TableBasedQueueTests.cs" />
</ItemGroup>
<ItemGroup>
<None Include="NServiceBus.snk" />
Expand Down
15 changes: 15 additions & 0 deletions src/NServiceBus.SqlServer.UnitTests/TableBasedQueueTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace NServiceBus.SqlServer.UnitTests
{
using NServiceBus.Transports.SQLServer;
using NUnit.Framework;

class TableBasedQueueTests
{
[Test]
public void Table_name_and_schema_should_be_quoted()
{
Assert.AreEqual("[nsb].[MyEndpoint]", new TableBasedQueue("MyEndpoint", "nsb").ToString());
Assert.AreEqual("[nsb].[MyEndoint]]; SOME OTHER SQL;--]", new TableBasedQueue("MyEndoint]; SOME OTHER SQL;--", "nsb").ToString());
}
}
}
19 changes: 11 additions & 8 deletions src/NServiceBus.SqlServer/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ public TableBasedQueue(Address address, string schema)

public TableBasedQueue(string tableName, string schema)
{
this.tableName = tableName;
this.schema = schema;
using (var sanitizer = new SqlCommandBuilder())
{
this.tableName = sanitizer.QuoteIdentifier(tableName);
this.schema = sanitizer.QuoteIdentifier(schema);
}
}

public void Send(TransportMessage message, SendOptions sendOptions, SqlConnection connection, SqlTransaction transaction = null)
Expand Down Expand Up @@ -209,15 +212,15 @@ public void LogWarningWhenIndexIsMissing(SqlConnection connection)

if (rowsCount == 0)
{
Logger.Warn($@"Table [{schema}].[{tableName}] does not contain index '{ExpiresIndexName}'.
Logger.Warn($@"Table {schema}.{tableName} does not contain index '{ExpiresIndexName}'.
Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.");
}
}
}

public override string ToString()
{
return tableName;
return $"{schema}.{tableName}";
}

static readonly ILog Logger = LogManager.GetLogger(typeof(TableBasedQueue));
Expand All @@ -240,20 +243,20 @@ public override string ToString()
};

const string SqlSend =
@"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body])
@"INSERT INTO {0}.{1} ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body])
VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,CASE WHEN @TimeToBeReceivedMs IS NOT NULL THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END,@Headers,@Body)";

const string SqlReceive =
@"WITH message AS (SELECT TOP(1) * FROM [{0}].[{1}] WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC)
@"WITH message AS (SELECT TOP(1) * FROM {0}.{1} WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC)
DELETE FROM message
OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress,
deleted.Recoverable, CASE WHEN deleted.Expires IS NOT NULL THEN DATEDIFF(ms, GETUTCDATE(), deleted.Expires) END, deleted.Headers, deleted.Body;";

const string SqlPurgeBatchOfExpiredMessages =
@"DELETE FROM [{1}].[{2}] WHERE [Id] IN (SELECT TOP ({0}) [Id] FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE() ORDER BY [RowVersion])";
@"DELETE FROM {1}.{2} WHERE [Id] IN (SELECT TOP ({0}) [Id] FROM {1}.{2} WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE() ORDER BY [RowVersion])";

const string SqlCheckIfExpiresIndexIsPresent =
@"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')";
@"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('{1}.{2}')";

const string ExpiresIndexName = "Index_Expires";

Expand Down

0 comments on commit a3f27f3

Please sign in to comment.