Skip to content

Commit

Permalink
Quote table and schema name when assembling queue table name
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcin Hoppe committed Jun 20, 2016
1 parent e52db4c commit ec2ca72
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
15 changes: 12 additions & 3 deletions src/NServiceBus.SqlServer/SqlServerMessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
public class SqlServerMessageSender : ISendMessages
{
const string SqlSend =
@"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body])
@"INSERT INTO {0}.{1} ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body])
VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,@Expires,@Headers,@Body)";

static JsonMessageSerializer Serializer = new JsonMessageSerializer(null);
Expand Down Expand Up @@ -71,7 +71,7 @@ public void Send(TransportMessage message, Address address)
//if there is an active transaction for the connection, we can use the same native transaction
var transaction = UnitOfWork.GetTransaction(queueConnectionString);

using (var command = new SqlCommand(string.Format(SqlSend, schemaName, TableNameUtils.GetTableName(address)), transaction.Connection, transaction)
using (var command = new SqlCommand(SendSqlCommandText(schemaName, address), transaction.Connection, transaction)
{
CommandType = CommandType.Text
})
Expand All @@ -85,7 +85,7 @@ public void Send(TransportMessage message, Address address)
using (var connection = new SqlConnection(queueConnectionString))
{
connection.Open();
using (var command = new SqlCommand(string.Format(SqlSend, schemaName, TableNameUtils.GetTableName(address)), connection)
using (var command = new SqlCommand(SendSqlCommandText(schemaName, address), connection)
{
CommandType = CommandType.Text
})
Expand Down Expand Up @@ -114,6 +114,15 @@ public void Send(TransportMessage message, Address address)
}
}

static string SendSqlCommandText(string schemaName, Address address)
{
var sanitizer = new SqlCommandBuilder();

var quotedSchema = sanitizer.QuoteIdentifier(schemaName);
var quotedTable = sanitizer.QuoteIdentifier(TableNameUtils.GetTableName(address));

return string.Format(SqlSend, quotedSchema, quotedTable);
}

private static void ThrowFailedToSendException(Address address, Exception ex)
{
Expand Down
18 changes: 11 additions & 7 deletions src/NServiceBus.SqlServer/SqlServerPollingDequeueStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ public void Init(Address address, TransactionSettings transactionSettings,
Timeout = transactionSettings.TransactionTimeout
};

tableName = TableNameUtils.GetTableName(address);
var sanitizer = new SqlCommandBuilder();

sql = string.Format(SqlReceive, SchemaName, tableName);
quotedSchemaName = sanitizer.QuoteIdentifier(SchemaName);
quotedTableName = sanitizer.QuoteIdentifier(TableNameUtils.GetTableName(address));

sql = string.Format(SqlReceive, quotedSchemaName, quotedTableName);

if (PurgeOnStartup)
{
Expand Down Expand Up @@ -110,14 +113,14 @@ void PurgeTable()
{
connection.Open();

using (var command = new SqlCommand(string.Format(SqlPurge, SchemaName, tableName), connection)
using (var command = new SqlCommand(string.Format(SqlPurge, quotedSchemaName, quotedTableName), connection)
{
CommandType = CommandType.Text
})
{
var numberOfPurgedRows = command.ExecuteNonQuery();

Logger.InfoFormat("{0} messages was purged from table {1}", numberOfPurgedRows, tableName);
Logger.InfoFormat("{0} messages was purged from table {1}", numberOfPurgedRows, quotedTableName);
}
}
}
Expand Down Expand Up @@ -405,12 +408,12 @@ IsolationLevel GetSqlIsolationLevel(System.Transactions.IsolationLevel isolation
}

const string SqlReceive =
@"WITH message AS (SELECT TOP(1) * FROM [{0}].[{1}] WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC)
@"WITH message AS (SELECT TOP(1) * FROM {0}.{1} WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC)
DELETE FROM message
OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress,
deleted.Recoverable, deleted.Expires, deleted.Headers, deleted.Body;";

const string SqlPurge = @"DELETE FROM [{0}].[{1}]";
const string SqlPurge = @"DELETE FROM {0}.{1}";

static readonly JsonMessageSerializer Serializer = new JsonMessageSerializer(null);
static readonly ILog Logger = LogManager.GetLogger(typeof(SqlServerPollingDequeueStrategy));
Expand All @@ -425,7 +428,8 @@ DELETE FROM message
Action<TransportMessage, Exception> endProcessMessage;
TransactionSettings settings;
string sql;
string tableName;
string quotedSchemaName;
string quotedTableName;
CancellationTokenSource tokenSource;
TransactionOptions transactionOptions;
Func<TransportMessage, bool> tryProcessMessage;
Expand Down

0 comments on commit ec2ca72

Please sign in to comment.