Skip to content

Commit

Permalink
Merge branch 'release-6.2' into encrypt-column
Browse files Browse the repository at this point in the history
  • Loading branch information
mauroservienti authored Nov 9, 2020
2 parents c351d04 + 17fa109 commit d85153c
Showing 1 changed file with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace NServiceBus.Transport.SqlServer
using System.Threading.Tasks;
using System.Transactions;
using DelayedDelivery;
using NServiceBus.Logging;
using Performance.TimeToBeReceived;
using Routing;
using Settings;
Expand Down Expand Up @@ -197,7 +198,7 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
return new TransportReceiveInfrastructure(
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaVerification, waitTimeCircuitBreaker),
() => new QueueCreator(connectionFactory, addressTranslator, delayedQueueCanonicalAddress, createMessageBodyComputedColumn),
() => CheckForAmbientTransactionEnlistmentSupport(scopeOptions.TransactionOptions));
() => ValidateDatabaseAccess(scopeOptions.TransactionOptions));
}

CanonicalQueueAddress GetDelayedTableAddress(string suffix)
Expand Down Expand Up @@ -249,33 +250,77 @@ IExpiredMessagesPurger CreateExpiredMessagesPurger()
return new ExpiredMessagesPurger(_ => connectionFactory.OpenNewConnection(), purgeBatchSize);
}

async Task<StartupCheckResult> CheckForAmbientTransactionEnlistmentSupport(TransactionOptions transactionOptions)
async Task<StartupCheckResult> ValidateDatabaseAccess(TransactionOptions transactionOptions)
{
var isDatabaseAccessible = await TryOpenDatabaseConnection().ConfigureAwait(false);

if (!isDatabaseAccessible.Succeeded)
{
return isDatabaseAccessible;
}

return await TryEscalateToDistributedTransactions(transactionOptions).ConfigureAwait(false);
}

async Task<StartupCheckResult> TryOpenDatabaseConnection()
{
try
{
using (await connectionFactory.OpenNewConnection().ConfigureAwait(false))
{
}

return StartupCheckResult.Success;
}
catch (Exception ex)
{
var message = "Could not open connection to the SQL instance. Check the original error message for details. Original error message: " + ex.Message;
return StartupCheckResult.Failed(message);
}
}

async Task<StartupCheckResult> TryEscalateToDistributedTransactions(TransactionOptions transactionOptions)
{
if (!settings.TryGet(out TransportTransactionMode requestedTransportTransactionMode))
{
requestedTransportTransactionMode = TransactionMode;
}


if (requestedTransportTransactionMode == TransportTransactionMode.TransactionScope)
{
var message = string.Empty;

try
{
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions, TransactionScopeAsyncFlowOption.Enabled))
using (await connectionFactory.OpenNewConnection().ConfigureAwait(false))
using (await connectionFactory.OpenNewConnection().ConfigureAwait(false))
{
scope.Complete();
}
}
catch (NotSupportedException ex)
{
var message = "The version of System.Data.SqlClient in use does not support one of the selected connection string options or " +
"enlisting SQL connections in distributed transactions. Check original error message for details. " +
message = "The version of the SqlClient in use does not support enlisting SQL connections in distributed transactions. " +
"Check original error message for details. " +
"In case the problem is related to distributed transactions you can still use SQL Server transport but " +
"specify a different transaction mode via `EndpointConfiguration.UseTransport<SqlServerTransport>().Transactions`. " +
"should specify a different transaction mode via `EndpointConfiguration.UseTransport<SqlServerTransport>().Transactions`. " +
"Note that different transaction modes may affect consistency guarantees as you can't rely on distributed " +
"transactions to atomically update the database and consume a message. Original error message: " + ex.Message;
}
catch (SqlException sqlException)
{
message = "Could not escalate to a distributed transaction while configured to use TransactionScope. Check original error message for details. " +
"In case the problem is related to distributed transactions you can still use SQL Server transport but " +
"should specify a different transaction mode via `EndpointConfiguration.UseTransport<SqlServerTransport>().Transactions`. " +
"Note that different transaction modes may affect consistency guarantees as you can't rely on distributed " +
"transactions to atomically update the database and consume a message. Original error message: " + sqlException.Message;
}

return StartupCheckResult.Failed(message);
if (!string.IsNullOrWhiteSpace(message))
{
Logger.Warn(message);
}
}

Expand Down Expand Up @@ -355,5 +400,7 @@ public override string MakeCanonicalForm(string transportAddress)
IDelayedMessageStore delayedMessageStore = new SendOnlyDelayedMessageStore();
TableBasedQueueCache tableBasedQueueCache;
bool isEncrypted;

static ILog Logger = LogManager.GetLogger<SqlServerTransportInfrastructure>();
}
}

0 comments on commit d85153c

Please sign in to comment.