From 60d2768c38a916236b885be297ccea1e72dc159d Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Thu, 5 Nov 2020 14:08:12 +0200 Subject: [PATCH 1/5] Read entire byte array --- .../Queuing/MessageRow.cs | 11 ++--------- .../Queuing/TableBasedQueue.cs | 3 +-- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs b/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs index f27a4cc8a..e4eabc096 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs @@ -48,7 +48,6 @@ public void PrepareSendCommand(SqlCommand command) static async Task ReadRow(SqlDataReader dataReader) { - //HINT: we are assuming that dataReader is sequential. Order or reads is important ! return new MessageRow { id = await dataReader.GetFieldValueAsync(0).ConfigureAwait(false), @@ -95,15 +94,9 @@ static async Task GetHeaders(SqlDataReader dataReader, int headersIndex) } } - static async Task GetBody(SqlDataReader dataReader, int bodyIndex) + static Task GetBody(SqlDataReader dataReader, int bodyIndex) { - // Null values will be returned as an empty (zero bytes) Stream. - using (var outStream = new MemoryStream()) - using (var stream = dataReader.GetStream(bodyIndex)) - { - await stream.CopyToAsync(outStream).ConfigureAwait(false); - return outStream.ToArray(); - } + return Task.FromResult((byte[])dataReader[bodyIndex]); } static async Task GetNullableAsync(SqlDataReader dataReader, int index) where T : class diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs index 6c3652394..76414d532 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs @@ -72,8 +72,7 @@ public Task Send(OutgoingMessage message, TimeSpan timeToBeReceived, SqlConnecti static async Task ReadMessage(SqlCommand command) { - // We need sequential access to not buffer everything into memory - using (var dataReader = await command.ExecuteReaderAsync(CommandBehavior.SingleRow | CommandBehavior.SequentialAccess).ConfigureAwait(false)) + using (var dataReader = await command.ExecuteReaderAsync(CommandBehavior.SingleRow).ConfigureAwait(false)) { if (!await dataReader.ReadAsync().ConfigureAwait(false)) { From 725eb13ddda456d9eedd35293cf12b576f0666d3 Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Fri, 6 Nov 2020 09:54:46 +0200 Subject: [PATCH 2/5] Change behaviour based on connection string --- .../When_checking_schema.cs | 2 +- .../When_dispatching_messages.cs | 6 +++--- .../When_message_receive_takes_long.cs | 2 +- .../When_receiving_messages.cs | 4 ++-- .../When_using_ttbr.cs | 4 ++-- ...nfigureSqlServerTransportInfrastructure.cs | 2 +- .../Queuing/MessageRow.cs | 20 ++++++++++++++----- .../Queuing/TableBasedQueue.cs | 18 ++++++++++++----- .../Queuing/TableBasedQueueCache.cs | 6 ++++-- .../SqlServerTransport.cs | 18 ++++++++++++++++- .../SqlServerTransportInfrastructure.cs | 8 +++++--- 11 files changed, 64 insertions(+), 26 deletions(-) diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_checking_schema.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_checking_schema.cs index d4723a930..e98cfda1e 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_checking_schema.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_checking_schema.cs @@ -27,7 +27,7 @@ public async Task SetUp() await ResetQueue(addressParser, sqlConnectionFactory); - queue = new TableBasedQueue(addressParser.Parse(QueueTableName).QualifiedTableName, QueueTableName); + queue = new TableBasedQueue(addressParser.Parse(QueueTableName).QualifiedTableName, QueueTableName, false); } [Test] diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_dispatching_messages.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_dispatching_messages.cs index 1b6e7c57b..378cb53ed 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_dispatching_messages.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_dispatching_messages.cs @@ -106,7 +106,7 @@ public void Prepare() async Task PrepareAsync() { var addressParser = new QueueAddressTranslator("nservicebus", "dbo", null, null); - var tableCache = new TableBasedQueueCache(addressParser); + var tableCache = new TableBasedQueueCache(addressParser, true); await CreateOutputQueueIfNecessary(addressParser, sqlConnectionFactory); @@ -119,8 +119,8 @@ Task PurgeOutputQueue(QueueAddressTranslator addressTranslator) { purger = new QueuePurger(sqlConnectionFactory); var queueAddress = addressTranslator.Parse(validAddress).QualifiedTableName; - queue = new TableBasedQueue(queueAddress, validAddress); - + queue = new TableBasedQueue(queueAddress, validAddress, true); + return purger.Purge(queue); } diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_message_receive_takes_long.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_message_receive_takes_long.cs index c92d942dc..0540c03e6 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_message_receive_takes_long.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_message_receive_takes_long.cs @@ -32,7 +32,7 @@ public async Task SetUp() await CreateQueueIfNotExists(addressParser, sqlConnectionFactory); - queue = new TableBasedQueue(addressParser.Parse(QueueTableName).QualifiedTableName, QueueTableName); + queue = new TableBasedQueue(addressParser.Parse(QueueTableName).QualifiedTableName, QueueTableName, true); } [Test] diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_receiving_messages.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_receiving_messages.cs index f5936a3d0..c69fc3621 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_receiving_messages.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_receiving_messages.cs @@ -34,7 +34,7 @@ public async Task Should_stop_pumping_messages_after_first_unsuccessful_receive( var pump = new MessagePump( m => new ProcessWithNoTransaction(sqlConnectionFactory, null), - qa => qa == "input" ? (TableBasedQueue)inputQueue : new TableBasedQueue(parser.Parse(qa).QualifiedTableName, qa), + qa => qa == "input" ? (TableBasedQueue)inputQueue : new TableBasedQueue(parser.Parse(qa).QualifiedTableName, qa, true), new QueuePurger(sqlConnectionFactory), new NoOpExpiredMessagesPurger(), new QueuePeeker(sqlConnectionFactory, new QueuePeekerOptions()), @@ -81,7 +81,7 @@ class FakeTableBasedQueue : TableBasedQueue int queueSize; int successfulReceives; - public FakeTableBasedQueue(string address, int queueSize, int successfulReceives) : base(address, "") + public FakeTableBasedQueue(string address, int queueSize, int successfulReceives) : base(address, "", true) { this.queueSize = queueSize; this.successfulReceives = successfulReceives; diff --git a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_using_ttbr.cs b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_using_ttbr.cs index ae49b56a9..1df1fa2c8 100644 --- a/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_using_ttbr.cs +++ b/src/NServiceBus.Transport.SqlServer.IntegrationTests/When_using_ttbr.cs @@ -121,7 +121,7 @@ public void Prepare() async Task PrepareAsync() { var addressParser = new QueueAddressTranslator("nservicebus", "dbo", null, new QueueSchemaAndCatalogSettings()); - var tableCache = new TableBasedQueueCache(addressParser); + var tableCache = new TableBasedQueueCache(addressParser, true); var connectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString"); if (string.IsNullOrEmpty(connectionString)) @@ -142,7 +142,7 @@ Task PurgeOutputQueue(QueueAddressTranslator addressParser) { purger = new QueuePurger(sqlConnectionFactory); var queueAddress = addressParser.Parse(validAddress); - queue = new TableBasedQueue(queueAddress.QualifiedTableName, queueAddress.Address); + queue = new TableBasedQueue(queueAddress.QualifiedTableName, queueAddress.Address, true); return purger.Purge(queue); } diff --git a/src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs b/src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs index 9dca360ee..c625ab454 100644 --- a/src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs @@ -43,7 +43,7 @@ public TransportConfigurationResult Configure(SettingsHolder settings, Transport var localAddress = settings.EndpointName(); return new TransportConfigurationResult { - TransportInfrastructure = new SqlServerTransportInfrastructure("nservicebus", settings, connectionString, () => localAddress, () => logicalAddress) + TransportInfrastructure = new SqlServerTransportInfrastructure("nservicebus", settings, connectionString, () => localAddress, () => logicalAddress, false) }; } diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs b/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs index e4eabc096..bbd114fa8 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs @@ -16,9 +16,9 @@ class MessageRow { MessageRow() { } - public static async Task Read(SqlDataReader dataReader) + public static async Task Read(SqlDataReader dataReader, bool isStreamSupported) { - var row = await ReadRow(dataReader).ConfigureAwait(false); + var row = await ReadRow(dataReader, isStreamSupported).ConfigureAwait(false); return row.TryParse(); } @@ -46,7 +46,7 @@ public void PrepareSendCommand(SqlCommand command) AddParameter(command, "Body", SqlDbType.VarBinary, bodyBytes, -1); } - static async Task ReadRow(SqlDataReader dataReader) + static async Task ReadRow(SqlDataReader dataReader, bool isStreamSupported) { return new MessageRow { @@ -55,7 +55,7 @@ static async Task ReadRow(SqlDataReader dataReader) replyToAddress = await GetNullableAsync(dataReader, 2).ConfigureAwait(false), expired = await dataReader.GetFieldValueAsync(3).ConfigureAwait(false) == 1, headers = await GetHeaders(dataReader, 4).ConfigureAwait(false), - bodyBytes = await GetBody(dataReader, 5).ConfigureAwait(false) + bodyBytes = isStreamSupported ? await GetBody(dataReader, 5).ConfigureAwait(false) : await GetNonStreamBody(dataReader, 5).ConfigureAwait(false) }; } @@ -94,7 +94,17 @@ static async Task GetHeaders(SqlDataReader dataReader, int headersIndex) } } - static Task GetBody(SqlDataReader dataReader, int bodyIndex) + static async Task GetBody(SqlDataReader dataReader, int bodyIndex) + { + using (var outStream = new MemoryStream()) + using (var stream = dataReader.GetStream(bodyIndex)) + { + await stream.CopyToAsync(outStream).ConfigureAwait(false); + return outStream.ToArray(); + } + } + + static Task GetNonStreamBody(SqlDataReader dataReader, int bodyIndex) { return Task.FromResult((byte[])dataReader[bodyIndex]); } diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs index 76414d532..114431d9a 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs @@ -16,7 +16,7 @@ class TableBasedQueue { public string Name { get; } - public TableBasedQueue(string qualifiedTableName, string queueName) + public TableBasedQueue(string qualifiedTableName, string queueName, bool isStreamSupported) { #pragma warning disable 618 this.qualifiedTableName = qualifiedTableName; @@ -28,6 +28,7 @@ public TableBasedQueue(string qualifiedTableName, string queueName) checkExpiresIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName); checkNonClusteredRowVersionIndexCommand = Format(SqlConstants.CheckIfNonClusteredRowVersionIndexIsPresent, this.qualifiedTableName); checkHeadersColumnTypeCommand = Format(SqlConstants.CheckHeadersColumnType, this.qualifiedTableName); + this.isStreamSupported = isStreamSupported; #pragma warning restore 618 } @@ -54,7 +55,7 @@ public virtual async Task TryReceive(SqlConnection connection { using (var command = new SqlCommand(receiveCommand, connection, transaction)) { - return await ReadMessage(command).ConfigureAwait(false); + return await ReadMessage(command, isStreamSupported).ConfigureAwait(false); } } @@ -70,16 +71,22 @@ public Task Send(OutgoingMessage message, TimeSpan timeToBeReceived, SqlConnecti return SendRawMessage(messageRow, connection, transaction); } - static async Task ReadMessage(SqlCommand command) + static async Task ReadMessage(SqlCommand command, bool isStreamSupported) { - using (var dataReader = await command.ExecuteReaderAsync(CommandBehavior.SingleRow).ConfigureAwait(false)) + var behavior = CommandBehavior.SingleRow; + if (isStreamSupported) + { + behavior |= CommandBehavior.SequentialAccess; + } + + using (var dataReader = await command.ExecuteReaderAsync(behavior).ConfigureAwait(false)) { if (!await dataReader.ReadAsync().ConfigureAwait(false)) { return MessageReadResult.NoMessage; } - return await MessageRow.Read(dataReader).ConfigureAwait(false); + return await MessageRow.Read(dataReader, isStreamSupported).ConfigureAwait(false); } } @@ -176,5 +183,6 @@ public override string ToString() string checkExpiresIndexCommand; string checkNonClusteredRowVersionIndexCommand; string checkHeadersColumnTypeCommand; + bool isStreamSupported; } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueueCache.cs b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueueCache.cs index cf78165b2..832aa231e 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueueCache.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueueCache.cs @@ -5,21 +5,23 @@ namespace NServiceBus.Transport.SqlServer class TableBasedQueueCache { - public TableBasedQueueCache(QueueAddressTranslator addressTranslator) + public TableBasedQueueCache(QueueAddressTranslator addressTranslator, bool isStreamSupported) { this.addressTranslator = addressTranslator; + this.isStreamSupported = isStreamSupported; } public TableBasedQueue Get(string destination) { var address = addressTranslator.Parse(destination); var key = Tuple.Create(address.QualifiedTableName, address.Address); - var queue = cache.GetOrAdd(key, x => new TableBasedQueue(x.Item1, x.Item2)); + var queue = cache.GetOrAdd(key, x => new TableBasedQueue(x.Item1, x.Item2, isStreamSupported)); return queue; } QueueAddressTranslator addressTranslator; ConcurrentDictionary, TableBasedQueue> cache = new ConcurrentDictionary, TableBasedQueue>(); + bool isStreamSupported; } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SqlServer/SqlServerTransport.cs b/src/NServiceBus.Transport.SqlServer/SqlServerTransport.cs index ea8b63e80..f26ce5369 100644 --- a/src/NServiceBus.Transport.SqlServer/SqlServerTransport.cs +++ b/src/NServiceBus.Transport.SqlServer/SqlServerTransport.cs @@ -43,8 +43,9 @@ static bool LegacyMultiInstanceModeTurnedOn(SettingsHolder settings) public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString) { var catalog = GetDefaultCatalog(settings, connectionString); + var isEncrypted = IsEncrypted(connectionString); - return new SqlServerTransportInfrastructure(catalog, settings, connectionString, settings.LocalAddress, settings.LogicalAddress); + return new SqlServerTransportInfrastructure(catalog, settings, connectionString, settings.LocalAddress, settings.LogicalAddress, isEncrypted); } static string GetDefaultCatalog(SettingsHolder settings, string connectionString) @@ -71,5 +72,20 @@ static string GetDefaultCatalog(SettingsHolder settings, string connectionString } throw new Exception("Initial Catalog property is mandatory in the connection string."); } + + static bool IsEncrypted(string connectionString) + { + var parser = new DbConnectionStringBuilder + { + ConnectionString = connectionString + }; + + if (parser.TryGetValue("Column Encryption Setting", out var enabled)) + { + return ((string)enabled).Equals("enabled", StringComparison.InvariantCultureIgnoreCase); + } + + return false; + } } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs index 7e9bd76a5..591a6f41f 100644 --- a/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs @@ -22,12 +22,13 @@ namespace NServiceBus.Transport.SqlServer /// class SqlServerTransportInfrastructure : TransportInfrastructure { - internal SqlServerTransportInfrastructure(string catalog, SettingsHolder settings, string connectionString, Func localAddress, Func logicalAddress) + internal SqlServerTransportInfrastructure(string catalog, SettingsHolder settings, string connectionString, Func localAddress, Func logicalAddress, bool isEncrypted) { this.settings = settings; this.connectionString = connectionString; this.localAddress = localAddress; this.logicalAddress = logicalAddress; + this.isEncrypted = isEncrypted; if (settings.HasSetting(SettingsKeys.DisableNativePubSub)) { @@ -42,7 +43,7 @@ internal SqlServerTransportInfrastructure(string catalog, SettingsHolder setting var queueSchemaSettings = settings.GetOrDefault(); addressTranslator = new QueueAddressTranslator(catalog, "dbo", defaultSchemaOverride, queueSchemaSettings); - tableBasedQueueCache = new TableBasedQueueCache(addressTranslator); + tableBasedQueueCache = new TableBasedQueueCache(addressTranslator, !isEncrypted); connectionFactory = CreateConnectionFactory(); //Configure the schema and catalog for logical endpoint-based routing @@ -155,7 +156,7 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() var schemaVerification = new SchemaInspector(queue => connectionFactory.OpenNewConnection(), validateExpiredIndex); - Func queueFactory = queueName => new TableBasedQueue(addressTranslator.Parse(queueName).QualifiedTableName, queueName); + Func queueFactory = queueName => new TableBasedQueue(addressTranslator.Parse(queueName).QualifiedTableName, queueName, !isEncrypted); //Create delayed delivery infrastructure CanonicalQueueAddress delayedQueueCanonicalAddress = null; @@ -353,5 +354,6 @@ public override string MakeCanonicalForm(string transportAddress) ISubscriptionStore subscriptionStore; IDelayedMessageStore delayedMessageStore = new SendOnlyDelayedMessageStore(); TableBasedQueueCache tableBasedQueueCache; + bool isEncrypted; } } \ No newline at end of file From 096c0cbe18139a1a9baa40ae693c90b64e1f28df Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Fri, 6 Nov 2020 09:56:00 +0200 Subject: [PATCH 3/5] Revert comment removal --- src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs b/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs index bbd114fa8..86ff2b1ca 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs @@ -96,6 +96,7 @@ static async Task GetHeaders(SqlDataReader dataReader, int headersIndex) static async Task GetBody(SqlDataReader dataReader, int bodyIndex) { + // Null values will be returned as an empty (zero bytes) Stream. using (var outStream = new MemoryStream()) using (var stream = dataReader.GetStream(bodyIndex)) { From 9c4be9e8fa380e74ccb33d0357ee53859d1a8e75 Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Fri, 6 Nov 2020 10:45:02 +0200 Subject: [PATCH 4/5] Fix connection factory override --- .../SqlServerTransport.cs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.Transport.SqlServer/SqlServerTransport.cs b/src/NServiceBus.Transport.SqlServer/SqlServerTransport.cs index f26ce5369..6638b6301 100644 --- a/src/NServiceBus.Transport.SqlServer/SqlServerTransport.cs +++ b/src/NServiceBus.Transport.SqlServer/SqlServerTransport.cs @@ -43,7 +43,7 @@ static bool LegacyMultiInstanceModeTurnedOn(SettingsHolder settings) public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString) { var catalog = GetDefaultCatalog(settings, connectionString); - var isEncrypted = IsEncrypted(connectionString); + var isEncrypted = IsEncrypted(settings, connectionString); return new SqlServerTransportInfrastructure(catalog, settings, connectionString, settings.LocalAddress, settings.LogicalAddress, isEncrypted); } @@ -73,8 +73,16 @@ static string GetDefaultCatalog(SettingsHolder settings, string connectionString throw new Exception("Initial Catalog property is mandatory in the connection string."); } - static bool IsEncrypted(string connectionString) + static bool IsEncrypted(SettingsHolder settings, string connectionString) { + if (settings.TryGet(SettingsKeys.ConnectionFactoryOverride, out Func> factoryOverride)) + { + using (var connection = factoryOverride().GetAwaiter().GetResult()) + { + connectionString = connection.ConnectionString; + } + } + var parser = new DbConnectionStringBuilder { ConnectionString = connectionString From c351d04fc954d93ad3a6f3e2dcfc60c087a2eca2 Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Fri, 6 Nov 2020 12:38:51 +0200 Subject: [PATCH 5/5] Change ReadMessage to be instance method --- .../Queuing/TableBasedQueue.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs index 114431d9a..5a812e397 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs @@ -55,7 +55,7 @@ public virtual async Task TryReceive(SqlConnection connection { using (var command = new SqlCommand(receiveCommand, connection, transaction)) { - return await ReadMessage(command, isStreamSupported).ConfigureAwait(false); + return await ReadMessage(command).ConfigureAwait(false); } } @@ -71,7 +71,7 @@ public Task Send(OutgoingMessage message, TimeSpan timeToBeReceived, SqlConnecti return SendRawMessage(messageRow, connection, transaction); } - static async Task ReadMessage(SqlCommand command, bool isStreamSupported) + async Task ReadMessage(SqlCommand command) { var behavior = CommandBehavior.SingleRow; if (isStreamSupported)