Skip to content

Commit

Permalink
Merge pull request #482 from Particular/release-4.2
Browse files Browse the repository at this point in the history
Release 4.2
  • Loading branch information
SzymonPobiega authored Mar 20, 2019
2 parents f354822 + 06c5d01 commit 009340d
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using Routing;
using static AcceptanceTesting.Customization.Conventions;

//HINT: Message mappings specifed in app.config added to routing table using UnicastRoute.CreateFromPhysicalAddress.
//HINT: Message mappings specified in app.config added to routing table using UnicastRoute.CreateFromPhysicalAddress.
// As a result this test covers also an app.config message mappings scenario
public class When_custom_schema_configured_for_endpoint_inside_physical_address : When_custom_schema_configured_for_endpoint
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,33 @@ public async Task Should_be_used_by_send_operations()
{
var options = new SendOptions();

options.UseCustomSqlConnectionAndTransaction(connection, rolledbackTransaction);
options.UseCustomSqlTransaction(rolledbackTransaction);

await bus.Send(new FromRolledbackTransaction(), options);

rolledbackTransaction.Rollback();
}

using (var commitedTransaction = connection.BeginTransaction())
using (var committedTransaction = connection.BeginTransaction())
{
var options = new SendOptions();

options.UseCustomSqlConnectionAndTransaction(connection, commitedTransaction);
options.UseCustomSqlTransaction(committedTransaction);

await bus.Send(new FromCommitedTransaction(), options);
await bus.Send(new FromCommittedTransaction(), options);

commitedTransaction.Commit();
committedTransaction.Commit();
}
}

}))
.Done(c => c.ReceivedFromCommitedTransaction)
.Done(c => c.ReceivedFromCommittedTransaction)
.Run(TimeSpan.FromMinutes(1));

Assert.IsFalse(context.ReceivedFromRolledbackTransaction);
}

class FromCommitedTransaction : IMessage
class FromCommittedTransaction : IMessage
{
}

Expand All @@ -63,7 +63,7 @@ class FromRolledbackTransaction : IMessage

class MyContext : ScenarioContext
{
public bool ReceivedFromCommitedTransaction { get; set; }
public bool ReceivedFromCommittedTransaction { get; set; }
public bool ReceivedFromRolledbackTransaction { get; set; }
}

Expand All @@ -78,13 +78,13 @@ public AnEndpoint()
var routing = c.ConfigureTransport().Routing();
var anEndpointName = AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(AnEndpoint));

routing.RouteToEndpoint(typeof(FromCommitedTransaction), anEndpointName);
routing.RouteToEndpoint(typeof(FromCommittedTransaction), anEndpointName);
routing.RouteToEndpoint(typeof(FromRolledbackTransaction), anEndpointName);
});
}

class ReplyHandler : IHandleMessages<FromRolledbackTransaction>,
IHandleMessages<FromCommitedTransaction>
class ReplyHandler : IHandleMessages<FromRolledbackTransaction>,
IHandleMessages<FromCommittedTransaction>
{
public MyContext Context { get; set; }

Expand All @@ -95,15 +95,15 @@ public Task Handle(FromRolledbackTransaction message, IMessageHandlerContext con
return Task.FromResult(0);
}

public Task Handle(FromCommitedTransaction message, IMessageHandlerContext context)
public Task Handle(FromCommittedTransaction message, IMessageHandlerContext context)
{
Context.ReceivedFromCommitedTransaction = true;
Context.ReceivedFromCommittedTransaction = true;

return Task.FromResult(0);
}
}
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static string GetEndpointVersion<T>()
return "3.1";
}

throw new Exception("Unknow endpoint version.");
throw new Exception("Unknown endpoint version.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public void It_reads_catalog_from_open_connection()
var definition = new SqlServerTransport();
Func<Task<SqlConnection>> factory = async () =>
{

var connection = new SqlConnection(connectionString);
await connection.OpenAsync().ConfigureAwait(false);
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static async Task TryPeekQueueSize(TableBasedQueue tableBasedQueue, SqlConnectio
using (var connection = await sqlConnectionFactory.OpenNewConnection())
using (var tx = connection.BeginTransaction())
{
tableBasedQueue.FormatPeekCommand(100);
await tableBasedQueue.TryPeek(connection, tx, CancellationToken.None, PeekTimeoutInSeconds);
scope.Complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ namespace NServiceBus.Transport.SQLServer
}
public class static SendOptionsExtensions
{
[System.ObsoleteAttribute("The connection parameter is no longer required. Use the alternate overload with o" +
"nly the transaction parameter. Will be removed in version 5.0.0.", true)]
public static void UseCustomSqlConnectionAndTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlConnection connection, System.Data.SqlClient.SqlTransaction transaction) { }
public static void UseCustomSqlTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlTransaction transaction) { }
}
[System.ObsoleteAttribute("Not for public use.")]
public class static SqlConstants
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public void Can_round_trip()

void AssertDictionariesAreTheSame(Dictionary<string, string> before, Dictionary<string, string> after)
{
foreach (var beforeitem in before)
foreach (var beforeItem in before)
{
Assert.AreEqual(beforeitem.Value, after[beforeitem.Key]);
Assert.AreEqual(beforeItem.Value, after[beforeItem.Key]);
}
Assert.AreEqual(before.Count, after.Count);
}
Expand Down
2 changes: 2 additions & 0 deletions src/NServiceBus.SqlServer.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,8 @@ II.2.12 &lt;HandlesEvent /&gt;&#xD;
<s:String x:Key="/Default/CodeStyle/Naming/XamlNaming/UserRules/=XAML_005FFIELD/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/XamlNaming/UserRules/=XAML_005FRESOURCE/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpFileLayoutPatternsUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAlwaysTreatStructAsNotReorderableMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
Expand Down
14 changes: 7 additions & 7 deletions src/NServiceBus.SqlServer/Queuing/SqlConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ DELETE FROM message

public static readonly string PeekText = @"
SELECT count(*) Id
FROM {0} WITH (READPAST);";
FROM (SELECT TOP {1} * FROM {0} WITH (READPAST)) as count_table;";

public static readonly string AddMessageBodyStringColumn = @"
IF NOT EXISTS (
Expand All @@ -119,19 +119,19 @@ AND type in (N'U'))
RETURN
IF EXISTS (
SELECT *
FROM {1}.sys.columns
WHERE object_id = OBJECT_ID(N'{0}')
SELECT *
FROM {1}.sys.columns
WHERE object_id = OBJECT_ID(N'{0}')
AND name = 'BodyString'
)
RETURN
EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive'
IF EXISTS (
SELECT *
FROM {1}.sys.columns
WHERE object_id = OBJECT_ID(N'{0}')
SELECT *
FROM {1}.sys.columns
WHERE object_id = OBJECT_ID(N'{0}')
AND name = 'BodyString'
)
BEGIN
Expand Down
8 changes: 7 additions & 1 deletion src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public TableBasedQueue(string qualifiedTableName, string queueName)
#pragma warning disable 618
this.qualifiedTableName = qualifiedTableName;
Name = queueName;
peekCommand = Format(SqlConstants.PeekText, this.qualifiedTableName);
receiveCommand = Format(SqlConstants.ReceiveText, this.qualifiedTableName);
sendCommand = Format(SqlConstants.SendText, this.qualifiedTableName);
purgeCommand = Format(SqlConstants.PurgeText, this.qualifiedTableName);
Expand All @@ -39,6 +38,13 @@ public virtual async Task<int> TryPeek(SqlConnection connection, SqlTransaction
}
}

public void FormatPeekCommand(int maxRecordsToPeek)
{
#pragma warning disable 618
peekCommand = Format(SqlConstants.PeekText, qualifiedTableName, maxRecordsToPeek);
#pragma warning restore 618
}

public virtual async Task<MessageReadResult> TryReceive(SqlConnection connection, SqlTransaction transaction)
{
using (var command = new SqlCommand(receiveCommand, connection, transaction))
Expand Down
3 changes: 2 additions & 1 deletion src/NServiceBus.SqlServer/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public async Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext,

public void Start(PushRuntimeSettings limitations)
{
inputQueue.FormatPeekCommand(Math.Min(100, 10 * limitations.MaxConcurrency));
runningReceiveTasks = new ConcurrentDictionary<Task, Task>();
concurrencyLimiter = new SemaphoreSlim(limitations.MaxConcurrency);
cancellationTokenSource = new CancellationTokenSource();
Expand Down Expand Up @@ -202,7 +203,7 @@ async Task PurgeExpiredMessages()
}
catch (SqlException e) when (cancellationToken.IsCancellationRequested)
{
Logger.Debug("Exception thown while performing cancellation", e);
Logger.Debug("Exception thrown while performing cancellation", e);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.SqlServer/Receiving/QueuePeeker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public async Task<int> Peek(TableBasedQueue inputQueue, RepeatedFailuresOverTime
}
catch (SqlException e) when (cancellationToken.IsCancellationRequested)
{
Logger.Debug("Exception thown while performing cancellation", e);
Logger.Debug("Exception thrown while performing cancellation", e);
}
catch (Exception ex)
{
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.SqlServer/Receiving/ReceiveStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected async Task<ErrorHandleResult> HandleError(Exception exception, Message
}
catch (Exception ex)
{
criticalError.Raise($"Failed to execute reverability actions for message `{message.TransportId}`", ex);
criticalError.Raise($"Failed to execute recoverability actions for message `{message.TransportId}`", ex);

return ErrorHandleResult.RetryRequired;
}
Expand Down
25 changes: 22 additions & 3 deletions src/NServiceBus.SqlServer/SendOptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
public static class SendOptionsExtensions
{
/// <summary>
/// Enables providing SqlConnection and SqlTransaction instances that will be used by send operations. The same connection and transaction
/// Enables providing <see cref="SqlConnection"/> and <see cref="SqlTransaction"/> instances that will be used by send operations. The same connection and transaction
/// can be used in more than one send operation.
/// </summary>
/// <param name="options">The <see cref="SendOptions" /> to extend.</param>
/// <param name="connection">Open SqlConnection instance to be used by send operations.</param>
/// <param name="transaction">SqlTransaction instance that will be used by any operations perfromed by the transport.</param>
/// <param name="connection">Open <see cref="SqlConnection"/> instance to be used by send operations.</param>
/// <param name="transaction"><see cref="SqlTransaction"/> instance that will be used by any operations performed by the transport.</param>
[ObsoleteEx(
RemoveInVersion = "5.0",
TreatAsErrorFromVersion = "4.0",
Message = "The connection parameter is no longer required. Use the alternate overload with only the transaction parameter.")]
public static void UseCustomSqlConnectionAndTransaction(this SendOptions options, SqlConnection connection, SqlTransaction transaction)
{
var transportTransaction = new TransportTransaction();
Expand All @@ -23,5 +27,20 @@ public static void UseCustomSqlConnectionAndTransaction(this SendOptions options

options.GetExtensions().Set(transportTransaction);
}

/// <summary>
/// Enables providing SqlConnection and SqlTransaction instances that will be used by send operations. The same connection and transaction
/// can be used in more than one send operation.
/// </summary>
/// <param name="options">The <see cref="SendOptions" /> to extend.</param>
/// <param name="transaction">SqlTransaction instance that will be used by any operations perfromed by the transport.</param>
public static void UseCustomSqlTransaction(this SendOptions options, SqlTransaction transaction)
{
var transportTransaction = new TransportTransaction();
transportTransaction.Set(transaction.Connection);
transportTransaction.Set(transaction);

options.GetExtensions().Set(transportTransaction);
}
}
}
8 changes: 4 additions & 4 deletions src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ public override TransportSendInfrastructure ConfigureSendInfrastructure()

DelayedMessageTable CreateDelayedMessageTable()
{
var delatedQueueTableName = GetDelayedQueueTableName();
var deletedQueueTableName = GetDelayedQueueTableName();

var inputQueueTable = addressTranslator.Parse(ToTransportAddress(GetLogicalAddress())).QualifiedTableName;
return new DelayedMessageTable(delatedQueueTableName.QualifiedTableName, inputQueueTable);
return new DelayedMessageTable(deletedQueueTableName.QualifiedTableName, inputQueueTable);
}

/// <summary>
Expand All @@ -259,8 +259,8 @@ CanonicalQueueAddress GetDelayedQueueTableName()
{
throw new Exception("Native delayed delivery feature requires configuring a table suffix.");
}
var delayedQueueLogialAddress = GetLogicalAddress().CreateQualifiedAddress(delayedDeliverySettings.Suffix);
var delayedQueueAddress = addressTranslator.Generate(delayedQueueLogialAddress);
var delayedQueueLogicalAddress = GetLogicalAddress().CreateQualifiedAddress(delayedDeliverySettings.Suffix);
var delayedQueueAddress = addressTranslator.Generate(delayedQueueLogicalAddress);
return addressTranslator.GetCanonicalForm(delayedQueueAddress);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public static TransportExtensions<SqlServerTransport> TimeToWaitBeforeTriggering
Guard.AgainstNegativeAndZero(nameof(waitTime), waitTime);

transportExtensions.GetSettings().Set(SettingsKeys.TimeToWaitBeforeTriggering, waitTime);

return transportExtensions;
}

Expand All @@ -144,19 +144,19 @@ public static TransportExtensions<SqlServerTransport> UseCustomSqlConnectionFact
}

/// <summary>
/// Allows the IsolationLevel and transaction timeout to be changed for the TransactionScope used to receive messages.
/// Allows the <see cref="IsolationLevel"/> and transaction timeout to be changed for the <see cref="TransactionScope"/> used to receive messages.
/// </summary>
/// <remarks>
/// If not specified the default transaction timeout of the machine will be used and the isolation level will be set to
/// `ReadCommited`.
/// <see cref="IsolationLevel.ReadCommitted"/>.
/// </remarks>
public static TransportExtensions<SqlServerTransport> TransactionScopeOptions(this TransportExtensions<SqlServerTransport> transportExtensions, TimeSpan? timeout = null, IsolationLevel? isolationLevel = null)
{
Guard.AgainstNull(nameof(transportExtensions), transportExtensions);

if (isolationLevel != IsolationLevel.ReadCommitted && isolationLevel != IsolationLevel.RepeatableRead)
{
Logger.Warn("TransactionScope should be only used with either the ReadCommited or the RepeatableRead isolation level.");
Logger.Warn("TransactionScope should be only used with either the ReadCommitted or the RepeatableRead isolation level.");
}

transportExtensions.GetSettings().Set(new SqlScopeOptions(timeout, isolationLevel));
Expand Down Expand Up @@ -224,8 +224,8 @@ public static TransportExtensions<SqlServerTransport> CreateMessageBodyComputedC
/// <param name="transportExtensions">The <see cref="TransportExtensions{T}" /> to extend.</param>
/// <param name="sqlConnectionFactory">Function that returns opened sql connection based on destination queue name.</param>
[ObsoleteEx(
RemoveInVersion = "5.0",
TreatAsErrorFromVersion = "4.0",
RemoveInVersion = "5.0",
TreatAsErrorFromVersion = "4.0",
Message = "Multi-instance mode has been deprecated. Use Transport Bridge and/or multi-catalog addressing instead.")]
public static TransportExtensions<SqlServerTransport> EnableLegacyMultiInstanceMode(this TransportExtensions<SqlServerTransport> transportExtensions, Func<string, Task<SqlConnection>> sqlConnectionFactory)
{
Expand Down

0 comments on commit 009340d

Please sign in to comment.