From 6571fec83b19bc064354b14d1f2d38b2d7504d63 Mon Sep 17 00:00:00 2001 From: Tomasz Masternak Date: Thu, 10 Feb 2022 14:06:17 +0100 Subject: [PATCH] Transport transaction not committed when no message received (#961) (#963) * transport transaction not committed when no message received * read all results from the SqlDataReader not to miss any following errors * fixing equality * all SqlDataReader operations in one place * TryGetMesage inlined into receive strategies * initialize message property in the tx and native receive strategies Co-authored-by: Tomek Masternak Co-authored-by: Tomek Masternak --- .../Queuing/MessageReadResult.cs | 10 ++++++ .../Queuing/TableBasedQueue.cs | 13 ++++++- .../Receiving/ProcessWithNativeTransaction.cs | 24 +++++++++---- .../Receiving/ProcessWithNoTransaction.cs | 36 +++++++++++++------ .../Receiving/ProcessWithTransactionScope.cs | 24 +++++++++---- .../Receiving/ReceiveStrategy.cs | 25 +------------ 6 files changed, 85 insertions(+), 47 deletions(-) diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/MessageReadResult.cs b/src/NServiceBus.Transport.SqlServer/Queuing/MessageReadResult.cs index 875957a94..e33740ba9 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/MessageReadResult.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/MessageReadResult.cs @@ -27,5 +27,15 @@ public static MessageReadResult Success(Message message) { return new MessageReadResult(message, null); } + + bool Equals(MessageReadResult other) => Equals(Message, other.Message) && Equals(PoisonMessage, other.PoisonMessage); + + public override bool Equals(object obj) => obj is MessageReadResult other && Equals(other); + + public override int GetHashCode() => Message.GetHashCode() ^ PoisonMessage.GetHashCode(); + + public static bool operator ==(MessageReadResult a, MessageReadResult b) => a.Equals(b); + + public static bool operator !=(MessageReadResult a, MessageReadResult b) => !(a == b); } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs index 061597e9b..82849ff66 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs @@ -87,7 +87,18 @@ async Task ReadMessage(SqlCommand command) return MessageReadResult.NoMessage; } - return await MessageRow.Read(dataReader, isStreamSupported).ConfigureAwait(false); + var readResult = await MessageRow.Read(dataReader, isStreamSupported).ConfigureAwait(false); + + //HINT: Reading all pending results makes sure that any query execution error, + // sent after the first result, are thrown by the SqlDataReader as SqlExceptions. + // More details in: https://github.com/DapperLib/Dapper/issues/1210 + while (await dataReader.ReadAsync().ConfigureAwait(false)) + { } + + while (await dataReader.NextResultAsync().ConfigureAwait(false)) + { } + + return readResult; } } diff --git a/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithNativeTransaction.cs b/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithNativeTransaction.cs index aa0cecedb..ff095bc72 100644 --- a/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithNativeTransaction.cs +++ b/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithNativeTransaction.cs @@ -31,18 +31,30 @@ public override async Task ReceiveMessage(CancellationTokenSource receiveCancell using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) using (var transaction = connection.BeginTransaction(isolationLevel)) { - message = await TryReceive(connection, transaction, receiveCancellationTokenSource).ConfigureAwait(false); + var receiveResult = await InputQueue.TryReceive(connection, transaction).ConfigureAwait(false); - if (message == null) + if (receiveResult == MessageReadResult.NoMessage) + { + receiveCancellationTokenSource.Cancel(); + return; + } + + if (receiveResult.IsPoison) + { + await ErrorQueue.DeadLetter(receiveResult.PoisonMessage, connection, transaction).ConfigureAwait(false); + transaction.Commit(); + return; + } + + if (await TryHandleDelayedMessage(receiveResult.Message, connection, transaction).ConfigureAwait(false)) { - // The message was received but is not fit for processing (e.g. was DLQd). - // In such a case we still need to commit the transport tx to remove message - // from the queue table. transaction.Commit(); return; } - if (!await TryProcess(message, PrepareTransportTransaction(connection, transaction)).ConfigureAwait(false)) + message = receiveResult.Message; + + if (!await TryProcess(receiveResult.Message, PrepareTransportTransaction(connection, transaction)).ConfigureAwait(false)) { transaction.Rollback(); return; diff --git a/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithNoTransaction.cs b/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithNoTransaction.cs index 6d3d7ceca..c84a55eae 100644 --- a/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithNoTransaction.cs +++ b/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithNoTransaction.cs @@ -17,16 +17,31 @@ public override async Task ReceiveMessage(CancellationTokenSource receiveCancell { using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) { - Message message; + MessageReadResult receiveResult; using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted)) { - message = await TryReceive(connection, transaction, receiveCancellationTokenSource).ConfigureAwait(false); - transaction.Commit(); - } + receiveResult = await InputQueue.TryReceive(connection, transaction).ConfigureAwait(false); - if (message == null) - { - return; + if (receiveResult == MessageReadResult.NoMessage) + { + receiveCancellationTokenSource.Cancel(); + return; + } + + if (receiveResult.IsPoison) + { + await ErrorQueue.DeadLetter(receiveResult.PoisonMessage, connection, transaction).ConfigureAwait(false); + transaction.Commit(); + return; + } + + if (await TryHandleDelayedMessage(receiveResult.Message, connection, transaction).ConfigureAwait(false)) + { + transaction.Commit(); + return; + } + + transaction.Commit(); } var transportTransaction = new TransportTransaction(); @@ -34,11 +49,12 @@ public override async Task ReceiveMessage(CancellationTokenSource receiveCancell try { - await TryProcessingMessage(message, transportTransaction).ConfigureAwait(false); + await TryProcessingMessage(receiveResult.Message, transportTransaction).ConfigureAwait(false); } - catch (Exception exception) + catch (Exception ex) { - await HandleError(exception, message, transportTransaction, 1).ConfigureAwait(false); + // Since this is TransactionMode.None, we don't care whether error handling says handled or retry. Message is gone either way. + _ = await HandleError(ex, receiveResult.Message, transportTransaction, 1).ConfigureAwait(false); } } } diff --git a/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithTransactionScope.cs b/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithTransactionScope.cs index 41bb04781..497facfa9 100644 --- a/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithTransactionScope.cs +++ b/src/NServiceBus.Transport.SqlServer/Receiving/ProcessWithTransactionScope.cs @@ -23,20 +23,32 @@ public override async Task ReceiveMessage(CancellationTokenSource receiveCancell using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions, TransactionScopeAsyncFlowOption.Enabled)) using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) { - message = await TryReceive(connection, null, receiveCancellationTokenSource).ConfigureAwait(false); + var receiveResult = await InputQueue.TryReceive(connection, null).ConfigureAwait(false); - if (message == null) + if (receiveResult == MessageReadResult.NoMessage) + { + receiveCancellationTokenSource.Cancel(); + return; + } + + if (receiveResult.IsPoison) + { + await ErrorQueue.DeadLetter(receiveResult.PoisonMessage, connection, null).ConfigureAwait(false); + scope.Complete(); + return; + } + + if (await TryHandleDelayedMessage(receiveResult.Message, connection, null).ConfigureAwait(false)) { - // The message was received but is not fit for processing (e.g. was DLQd). - // In such a case we still need to commit the transport tx to remove message - // from the queue table. scope.Complete(); return; } + message = receiveResult.Message; + connection.Close(); - if (!await TryProcess(message, PrepareTransportTransaction()).ConfigureAwait(false)) + if (!await TryProcess(receiveResult.Message, PrepareTransportTransaction()).ConfigureAwait(false)) { return; } diff --git a/src/NServiceBus.Transport.SqlServer/Receiving/ReceiveStrategy.cs b/src/NServiceBus.Transport.SqlServer/Receiving/ReceiveStrategy.cs index 3e9b9d9c6..282f58d96 100644 --- a/src/NServiceBus.Transport.SqlServer/Receiving/ReceiveStrategy.cs +++ b/src/NServiceBus.Transport.SqlServer/Receiving/ReceiveStrategy.cs @@ -35,29 +35,6 @@ public void Init(TableBasedQueue inputQueue, TableBasedQueue errorQueue, Func TryReceive(SqlConnection connection, SqlTransaction transaction, CancellationTokenSource receiveCancellationTokenSource) - { - var receiveResult = await InputQueue.TryReceive(connection, transaction).ConfigureAwait(false); - - if (receiveResult.IsPoison) - { - await ErrorQueue.DeadLetter(receiveResult.PoisonMessage, connection, transaction).ConfigureAwait(false); - return null; - } - - if (receiveResult.Successful) - { - if (await TryHandleDelayedMessage(receiveResult.Message, connection, transaction).ConfigureAwait(false)) - { - return null; - } - - return receiveResult.Message; - } - receiveCancellationTokenSource.Cancel(); - return null; - } - protected async Task TryProcessingMessage(Message message, TransportTransaction transportTransaction) { if (message.Expired) //Do not process expired messages @@ -99,7 +76,7 @@ protected async Task HandleError(Exception exception, Message } } - async Task TryHandleDelayedMessage(Message message, SqlConnection connection, SqlTransaction transaction) + protected async Task TryHandleDelayedMessage(Message message, SqlConnection connection, SqlTransaction transaction) { if (message.Headers.TryGetValue(ForwardHeader, out var forwardDestination)) {