From 44324782b1ba20714996fb9de5ea3091aa19ef28 Mon Sep 17 00:00:00 2001 From: Tomasz Masternak Date: Thu, 28 May 2020 10:56:29 +0200 Subject: [PATCH] Non-clustered index on `RowVersion` column (#619) * non-clustered RowVersion index * Update src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs Co-authored-by: Szymon Pobiega * typo fix * review fixes Co-authored-by: Szymon Pobiega --- .../Queuing/SqlConstants.cs | 11 ++++++-- .../Queuing/TableBasedQueue.cs | 19 +++++++++++--- .../Receiving/SchemaVerification.cs | 25 ++++++++++++++++--- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/SqlConstants.cs b/src/NServiceBus.Transport.SqlServer/Queuing/SqlConstants.cs index 1aabfc4c9..3a885256d 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/SqlConstants.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/SqlConstants.cs @@ -175,9 +175,9 @@ Body varbinary(max), RowVersion bigint IDENTITY(1,1) NOT NULL ); -CREATE CLUSTERED INDEX Index_RowVersion ON {0} +CREATE NONCLUSTERED INDEX Index_RowVersion ON {0} ( - RowVersion + [RowVersion] ASC ) CREATE NONCLUSTERED INDEX Index_Expires ON {0} @@ -241,6 +241,13 @@ FROM sys.indexes WHERE name = 'Index_Expires' AND object_id = OBJECT_ID('{0}')"; + public static readonly string CheckIfNonClusteredRowVersionIndexIsPresent = @" +SELECT COUNT(*) +FROM sys.indexes +WHERE name = 'Index_RowVersion' + AND object_id = OBJECT_ID('{0}') + AND type = 2"; // 2 = non-clustered index + public static readonly string CheckHeadersColumnType = @" SELECT t.name FROM sys.columns c diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs index 17ebc81e9..49421173f 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs @@ -25,7 +25,8 @@ public TableBasedQueue(string qualifiedTableName, string queueName) sendCommand = Format(SqlConstants.SendText, this.qualifiedTableName); purgeCommand = Format(SqlConstants.PurgeText, this.qualifiedTableName); purgeExpiredCommand = Format(SqlConstants.PurgeBatchOfExpiredMessagesText, this.qualifiedTableName); - checkIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName); + checkExpiresIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName); + checkNonClusteredRowVersionIndexCommand = Format(SqlConstants.CheckIfNonClusteredRowVersionIndexIsPresent, this.qualifiedTableName); checkHeadersColumnTypeCommand = Format(SqlConstants.CheckHeadersColumnType, this.qualifiedTableName); #pragma warning restore 618 } @@ -138,7 +139,16 @@ public async Task PurgeBatchOfExpiredMessages(SqlConnection connection, int public async Task CheckExpiresIndexPresence(SqlConnection connection) { - using (var command = new SqlCommand(checkIndexCommand, connection)) + using (var command = new SqlCommand(checkExpiresIndexCommand, connection)) + { + var rowsCount = (int) await command.ExecuteScalarAsync().ConfigureAwait(false); + return rowsCount > 0; + } + } + + public async Task CheckNonClusteredRowVersionIndexPresence(SqlConnection connection) + { + using (var command = new SqlCommand(checkNonClusteredRowVersionIndexCommand, connection)) { var rowsCount = (int) await command.ExecuteScalarAsync().ConfigureAwait(false); return rowsCount > 0; @@ -164,7 +174,8 @@ public override string ToString() string sendCommand; string purgeCommand; string purgeExpiredCommand; - string checkIndexCommand; + string checkExpiresIndexCommand; + string checkNonClusteredRowVersionIndexCommand; string checkHeadersColumnTypeCommand; } -} \ No newline at end of file +} diff --git a/src/NServiceBus.Transport.SqlServer/Receiving/SchemaVerification.cs b/src/NServiceBus.Transport.SqlServer/Receiving/SchemaVerification.cs index 256e9628d..11166a03b 100644 --- a/src/NServiceBus.Transport.SqlServer/Receiving/SchemaVerification.cs +++ b/src/NServiceBus.Transport.SqlServer/Receiving/SchemaVerification.cs @@ -19,19 +19,21 @@ public SchemaInspector(Func> openConnection public async Task PerformInspection(TableBasedQueue queue) { await VerifyExpiredIndex(queue).ConfigureAwait(false); + await VerifyNonClusteredRowVersionIndex(queue).ConfigureAwait(false); await VerifyHeadersColumnType(queue).ConfigureAwait(false); } - async Task VerifyExpiredIndex(TableBasedQueue queue) + async Task VerifyIndex(TableBasedQueue queue, Func> check, string noIndexMessage) { try { using (var connection = await openConnection(queue).ConfigureAwait(false)) { - var indexExists = await queue.CheckExpiresIndexPresence(connection).ConfigureAwait(false); + var indexExists = await check(queue, connection).ConfigureAwait(false); + if (!indexExists) { - Logger.Warn($@"Table {queue.Name} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."); + Logger.Warn(noIndexMessage); } } } @@ -41,6 +43,23 @@ async Task VerifyExpiredIndex(TableBasedQueue queue) } } + Task VerifyNonClusteredRowVersionIndex(TableBasedQueue queue) + { + return VerifyIndex( + queue, + (q, c) => q.CheckNonClusteredRowVersionIndexPresence(c), + $"Table {queue.Name} does not contain non-clustered index 'Index_RowVersion'.{Environment.NewLine}Migrating to this non-clustered index improves performance for send and receive operations."); + } + + Task VerifyExpiredIndex(TableBasedQueue queue) + { + return VerifyIndex( + queue, + (q, c) => q.CheckExpiresIndexPresence(c), + $"Table {queue.Name} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information." + ); + } + async Task VerifyHeadersColumnType(TableBasedQueue queue) { try