Skip to content

Commit

Permalink
Remove duplication from QueueCreator
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Nov 14, 2019
1 parent 6b2d133 commit 6d01d7a
Showing 1 changed file with 6 additions and 39 deletions.
45 changes: 6 additions & 39 deletions src/NServiceBus.SqlServer/Receiving/QueueCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,22 @@ public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string ide
{
foreach (var receivingAddress in queueBindings.ReceivingAddresses)
{
await CreateQueue(addressTranslator.Parse(receivingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false);
await CreateQueue(SqlConstants.CreateQueueText, addressTranslator.Parse(receivingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false);
}

foreach (var sendingAddress in queueBindings.SendingAddresses)
{
await CreateQueue(addressTranslator.Parse(sendingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false);
await CreateQueue(SqlConstants.CreateQueueText, addressTranslator.Parse(sendingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false);
}
transaction.Commit();
}

using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false))
using (var transaction = connection.BeginTransaction())
{
await CreateDelayedMessageQueue(connection, transaction, createMessageBodyColumn).ConfigureAwait(false);

await CreateQueue(SqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress, connection, transaction, createMessageBodyColumn).ConfigureAwait(false);
transaction.Commit();
}
}

static async Task CreateQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction, bool createMessageBodyColumn)
static async Task CreateQueue(string creationScript, CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction, bool createMessageBodyColumn)
{
var sql = string.Format(SqlConstants.CreateQueueText, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName);
var sql = string.Format(creationScript, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName);
using (var command = new SqlCommand(sql, connection, transaction)
{
CommandType = CommandType.Text
Expand All @@ -66,34 +61,6 @@ static async Task CreateQueue(CanonicalQueueAddress canonicalQueueAddress, SqlCo
}
}

async Task CreateDelayedMessageQueue(SqlConnection connection, SqlTransaction transaction, bool createMessageBodyComputedColumn)
{
#pragma warning disable 618
var sql = string.Format(SqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress.QualifiedTableName, delayedQueueAddress.QuotedCatalogName);
#pragma warning restore 618
using (var command = new SqlCommand(sql, connection, transaction)
{
CommandType = CommandType.Text
})
{
await command.ExecuteNonQueryAsync().ConfigureAwait(false);
}
if (createMessageBodyComputedColumn)
{
#pragma warning disable 618
var bodyStringSql = string.Format(SqlConstants.AddMessageBodyStringColumn, delayedQueueAddress.QualifiedTableName, delayedQueueAddress.QuotedCatalogName);
#pragma warning restore 618
using (var command = new SqlCommand(bodyStringSql, connection, transaction)
{
CommandType = CommandType.Text
})
{
await command.ExecuteNonQueryAsync().ConfigureAwait(false);
}

}
}

SqlConnectionFactory connectionFactory;
QueueAddressTranslator addressTranslator;
CanonicalQueueAddress delayedQueueAddress;
Expand Down

0 comments on commit 6d01d7a

Please sign in to comment.