From 9e5f2844b86c32fde4d20150e34bf5a3cac4b1e2 Mon Sep 17 00:00:00 2001 From: Tomasz Masternak Date: Thu, 28 May 2020 10:55:54 +0200 Subject: [PATCH] Non-clustered index on `RowVersion` (#621) * Non-clustered index on `RowVersion` * Update src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs Co-authored-by: Szymon Pobiega * review fixes Co-authored-by: Szymon Pobiega --- .../APIApprovals.Approve.approved.txt | 3 ++- .../Queuing/SqlConstants.cs | 11 ++++++-- .../Queuing/TableBasedQueue.cs | 19 +++++++++++--- .../Receiving/SchemaVerification.cs | 25 ++++++++++++++++--- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt index 28dbd638d..371c6388e 100644 --- a/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.SqlServer.UnitTests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -1,4 +1,4 @@ -[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.IntegrationTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.IntegrationTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute(@"NServiceBus.SqlServer.UnitTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92")] [assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] namespace NServiceBus @@ -35,6 +35,7 @@ namespace NServiceBus.Transport.SQLServer public static readonly string AddMessageBodyStringColumn; public static readonly string CheckHeadersColumnType; public static readonly string CheckIfExpiresIndexIsPresent; + public static readonly string CheckIfNonClusteredRowVersionIndexIsPresent; public const string CreateDelayedMessageStoreText = @" IF EXISTS ( SELECT * diff --git a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs index e2686b91e..37abbb401 100644 --- a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs +++ b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs @@ -182,9 +182,9 @@ Body varbinary(max), RowVersion bigint IDENTITY(1,1) NOT NULL ); -CREATE CLUSTERED INDEX Index_RowVersion ON {0} +CREATE NONCLUSTERED INDEX Index_RowVersion ON {0} ( - RowVersion + [RowVersion] ASC ) CREATE NONCLUSTERED INDEX Index_Expires ON {0} @@ -248,6 +248,13 @@ FROM sys.indexes WHERE name = 'Index_Expires' AND object_id = OBJECT_ID('{0}')"; + public static readonly string CheckIfNonClusteredRowVersionIndexIsPresent = @" +SELECT COUNT(*) +FROM sys.indexes +WHERE name = 'Index_RowVersion' + AND object_id = OBJECT_ID('{0}') + AND type = 2"; // 2 = non-clustered index + public static readonly string CheckHeadersColumnType = @" SELECT t.name FROM sys.columns c diff --git a/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs b/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs index f9ea15c3a..e0633ef49 100644 --- a/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs @@ -21,7 +21,8 @@ public TableBasedQueue(string qualifiedTableName, string queueName) sendCommand = Format(SqlConstants.SendText, this.qualifiedTableName); purgeCommand = Format(SqlConstants.PurgeText, this.qualifiedTableName); purgeExpiredCommand = Format(SqlConstants.PurgeBatchOfExpiredMessagesText, this.qualifiedTableName); - checkIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName); + checkExpiresIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName); + checkNonClusteredRowVersionIndexCommand = Format(SqlConstants.CheckIfNonClusteredRowVersionIndexIsPresent, this.qualifiedTableName); checkHeadersColumnTypeCommand = Format(SqlConstants.CheckHeadersColumnType, this.qualifiedTableName); #pragma warning restore 618 } @@ -134,7 +135,16 @@ public async Task PurgeBatchOfExpiredMessages(SqlConnection connection, int public async Task CheckExpiresIndexPresence(SqlConnection connection) { - using (var command = new SqlCommand(checkIndexCommand, connection)) + using (var command = new SqlCommand(checkExpiresIndexCommand, connection)) + { + var rowsCount = (int) await command.ExecuteScalarAsync().ConfigureAwait(false); + return rowsCount > 0; + } + } + + public async Task CheckNonClusteredRowVersionIndexPresence(SqlConnection connection) + { + using (var command = new SqlCommand(checkNonClusteredRowVersionIndexCommand, connection)) { var rowsCount = (int) await command.ExecuteScalarAsync().ConfigureAwait(false); return rowsCount > 0; @@ -160,7 +170,8 @@ public override string ToString() string sendCommand; string purgeCommand; string purgeExpiredCommand; - string checkIndexCommand; + string checkExpiresIndexCommand; + string checkNonClusteredRowVersionIndexCommand; string checkHeadersColumnTypeCommand; } -} \ No newline at end of file +} diff --git a/src/NServiceBus.SqlServer/Receiving/SchemaVerification.cs b/src/NServiceBus.SqlServer/Receiving/SchemaVerification.cs index e9cf6c205..0de27eb13 100644 --- a/src/NServiceBus.SqlServer/Receiving/SchemaVerification.cs +++ b/src/NServiceBus.SqlServer/Receiving/SchemaVerification.cs @@ -15,19 +15,21 @@ public SchemaInspector(Func> openConnection public async Task PerformInspection(TableBasedQueue queue) { await VerifyExpiredIndex(queue).ConfigureAwait(false); + await VerifyNonClusteredRowVersionIndex(queue).ConfigureAwait(false); await VerifyHeadersColumnType(queue).ConfigureAwait(false); } - async Task VerifyExpiredIndex(TableBasedQueue queue) + async Task VerifyIndex(TableBasedQueue queue, Func> check, string noIndexMessage) { try { using (var connection = await openConnection(queue).ConfigureAwait(false)) { - var indexExists = await queue.CheckExpiresIndexPresence(connection).ConfigureAwait(false); + var indexExists = await check(queue, connection).ConfigureAwait(false); + if (!indexExists) { - Logger.Warn($@"Table {queue.Name} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."); + Logger.Warn(noIndexMessage); } } } @@ -37,6 +39,23 @@ async Task VerifyExpiredIndex(TableBasedQueue queue) } } + Task VerifyNonClusteredRowVersionIndex(TableBasedQueue queue) + { + return VerifyIndex( + queue, + (q, c) => q.CheckNonClusteredRowVersionIndexPresence(c), + $"Table {queue.Name} does not contain non-clustered index 'Index_RowVersion'.{Environment.NewLine}Migrating to this non-clustered index improves performance for send and receive operations."); + } + + Task VerifyExpiredIndex(TableBasedQueue queue) + { + return VerifyIndex( + queue, + (q, c) => q.CheckExpiresIndexPresence(c), + $"Table {queue.Name} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information." + ); + } + async Task VerifyHeadersColumnType(TableBasedQueue queue) { try