Skip to content

Commit

Permalink
Non-clustered index on RowVersion (#621)
Browse files Browse the repository at this point in the history
* Non-clustered index on `RowVersion`

* Update src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs

Co-authored-by: Szymon Pobiega <[email protected]>

* review fixes

Co-authored-by: Szymon Pobiega <[email protected]>
  • Loading branch information
tmasternak and SzymonPobiega authored May 28, 2020
1 parent 5740b39 commit 9e5f284
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.IntegrationTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.IntegrationTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
namespace NServiceBus
Expand Down Expand Up @@ -35,6 +35,7 @@ namespace NServiceBus.Transport.SQLServer
public static readonly string AddMessageBodyStringColumn;
public static readonly string CheckHeadersColumnType;
public static readonly string CheckIfExpiresIndexIsPresent;
public static readonly string CheckIfNonClusteredRowVersionIndexIsPresent;
public const string CreateDelayedMessageStoreText = @"
IF EXISTS (
SELECT *
Expand Down
11 changes: 9 additions & 2 deletions src/NServiceBus.SqlServer/Queuing/SqlConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ Body varbinary(max),
RowVersion bigint IDENTITY(1,1) NOT NULL
);
CREATE CLUSTERED INDEX Index_RowVersion ON {0}
CREATE NONCLUSTERED INDEX Index_RowVersion ON {0}
(
RowVersion
[RowVersion] ASC
)
CREATE NONCLUSTERED INDEX Index_Expires ON {0}
Expand Down Expand Up @@ -248,6 +248,13 @@ FROM sys.indexes
WHERE name = 'Index_Expires'
AND object_id = OBJECT_ID('{0}')";

public static readonly string CheckIfNonClusteredRowVersionIndexIsPresent = @"
SELECT COUNT(*)
FROM sys.indexes
WHERE name = 'Index_RowVersion'
AND object_id = OBJECT_ID('{0}')
AND type = 2"; // 2 = non-clustered index

public static readonly string CheckHeadersColumnType = @"
SELECT t.name
FROM sys.columns c
Expand Down
19 changes: 15 additions & 4 deletions src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public TableBasedQueue(string qualifiedTableName, string queueName)
sendCommand = Format(SqlConstants.SendText, this.qualifiedTableName);
purgeCommand = Format(SqlConstants.PurgeText, this.qualifiedTableName);
purgeExpiredCommand = Format(SqlConstants.PurgeBatchOfExpiredMessagesText, this.qualifiedTableName);
checkIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName);
checkExpiresIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName);
checkNonClusteredRowVersionIndexCommand = Format(SqlConstants.CheckIfNonClusteredRowVersionIndexIsPresent, this.qualifiedTableName);
checkHeadersColumnTypeCommand = Format(SqlConstants.CheckHeadersColumnType, this.qualifiedTableName);
#pragma warning restore 618
}
Expand Down Expand Up @@ -134,7 +135,16 @@ public async Task<int> PurgeBatchOfExpiredMessages(SqlConnection connection, int

public async Task<bool> CheckExpiresIndexPresence(SqlConnection connection)
{
using (var command = new SqlCommand(checkIndexCommand, connection))
using (var command = new SqlCommand(checkExpiresIndexCommand, connection))
{
var rowsCount = (int) await command.ExecuteScalarAsync().ConfigureAwait(false);
return rowsCount > 0;
}
}

public async Task<bool> CheckNonClusteredRowVersionIndexPresence(SqlConnection connection)
{
using (var command = new SqlCommand(checkNonClusteredRowVersionIndexCommand, connection))
{
var rowsCount = (int) await command.ExecuteScalarAsync().ConfigureAwait(false);
return rowsCount > 0;
Expand All @@ -160,7 +170,8 @@ public override string ToString()
string sendCommand;
string purgeCommand;
string purgeExpiredCommand;
string checkIndexCommand;
string checkExpiresIndexCommand;
string checkNonClusteredRowVersionIndexCommand;
string checkHeadersColumnTypeCommand;
}
}
}
25 changes: 22 additions & 3 deletions src/NServiceBus.SqlServer/Receiving/SchemaVerification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ public SchemaInspector(Func<TableBasedQueue, Task<SqlConnection>> openConnection
public async Task PerformInspection(TableBasedQueue queue)
{
await VerifyExpiredIndex(queue).ConfigureAwait(false);
await VerifyNonClusteredRowVersionIndex(queue).ConfigureAwait(false);
await VerifyHeadersColumnType(queue).ConfigureAwait(false);
}

async Task VerifyExpiredIndex(TableBasedQueue queue)
async Task VerifyIndex(TableBasedQueue queue, Func<TableBasedQueue, SqlConnection, Task<bool>> check, string noIndexMessage)
{
try
{
using (var connection = await openConnection(queue).ConfigureAwait(false))
{
var indexExists = await queue.CheckExpiresIndexPresence(connection).ConfigureAwait(false);
var indexExists = await check(queue, connection).ConfigureAwait(false);

if (!indexExists)
{
Logger.Warn($@"Table {queue.Name} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.");
Logger.Warn(noIndexMessage);
}
}
}
Expand All @@ -37,6 +39,23 @@ async Task VerifyExpiredIndex(TableBasedQueue queue)
}
}

Task VerifyNonClusteredRowVersionIndex(TableBasedQueue queue)
{
return VerifyIndex(
queue,
(q, c) => q.CheckNonClusteredRowVersionIndexPresence(c),
$"Table {queue.Name} does not contain non-clustered index 'Index_RowVersion'.{Environment.NewLine}Migrating to this non-clustered index improves performance for send and receive operations.");
}

Task VerifyExpiredIndex(TableBasedQueue queue)
{
return VerifyIndex(
queue,
(q, c) => q.CheckExpiresIndexPresence(c),
$"Table {queue.Name} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."
);
}

async Task VerifyHeadersColumnType(TableBasedQueue queue)
{
try
Expand Down

0 comments on commit 9e5f284

Please sign in to comment.