Skip to content

Commit

Permalink
Transport transaction not committed when no message received (#961) (#…
Browse files Browse the repository at this point in the history
…963)

* transport transaction not committed when no message received

* read all results from the SqlDataReader not to miss any following errors

* fixing equality

* all SqlDataReader operations in one place

* TryGetMesage inlined into receive strategies

* initialize message property in the tx and native receive strategies

Co-authored-by: Tomek Masternak <[email protected]>

Co-authored-by: Tomek Masternak <[email protected]>
  • Loading branch information
tmasternak and Tomek Masternak authored Feb 10, 2022
1 parent ca40c86 commit 6571fec
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 47 deletions.
10 changes: 10 additions & 0 deletions src/NServiceBus.Transport.SqlServer/Queuing/MessageReadResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,15 @@ public static MessageReadResult Success(Message message)
{
return new MessageReadResult(message, null);
}

bool Equals(MessageReadResult other) => Equals(Message, other.Message) && Equals(PoisonMessage, other.PoisonMessage);

public override bool Equals(object obj) => obj is MessageReadResult other && Equals(other);

public override int GetHashCode() => Message.GetHashCode() ^ PoisonMessage.GetHashCode();

public static bool operator ==(MessageReadResult a, MessageReadResult b) => a.Equals(b);

public static bool operator !=(MessageReadResult a, MessageReadResult b) => !(a == b);
}
}
13 changes: 12 additions & 1 deletion src/NServiceBus.Transport.SqlServer/Queuing/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,18 @@ async Task<MessageReadResult> ReadMessage(SqlCommand command)
return MessageReadResult.NoMessage;
}

return await MessageRow.Read(dataReader, isStreamSupported).ConfigureAwait(false);
var readResult = await MessageRow.Read(dataReader, isStreamSupported).ConfigureAwait(false);

//HINT: Reading all pending results makes sure that any query execution error,
// sent after the first result, are thrown by the SqlDataReader as SqlExceptions.
// More details in: https://github.com/DapperLib/Dapper/issues/1210
while (await dataReader.ReadAsync().ConfigureAwait(false))
{ }

while (await dataReader.NextResultAsync().ConfigureAwait(false))
{ }

return readResult;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,30 @@ public override async Task ReceiveMessage(CancellationTokenSource receiveCancell
using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false))
using (var transaction = connection.BeginTransaction(isolationLevel))
{
message = await TryReceive(connection, transaction, receiveCancellationTokenSource).ConfigureAwait(false);
var receiveResult = await InputQueue.TryReceive(connection, transaction).ConfigureAwait(false);

if (message == null)
if (receiveResult == MessageReadResult.NoMessage)
{
receiveCancellationTokenSource.Cancel();
return;
}

if (receiveResult.IsPoison)
{
await ErrorQueue.DeadLetter(receiveResult.PoisonMessage, connection, transaction).ConfigureAwait(false);
transaction.Commit();
return;
}

if (await TryHandleDelayedMessage(receiveResult.Message, connection, transaction).ConfigureAwait(false))
{
// The message was received but is not fit for processing (e.g. was DLQd).
// In such a case we still need to commit the transport tx to remove message
// from the queue table.
transaction.Commit();
return;
}

if (!await TryProcess(message, PrepareTransportTransaction(connection, transaction)).ConfigureAwait(false))
message = receiveResult.Message;

if (!await TryProcess(receiveResult.Message, PrepareTransportTransaction(connection, transaction)).ConfigureAwait(false))
{
transaction.Rollback();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,44 @@ public override async Task ReceiveMessage(CancellationTokenSource receiveCancell
{
using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false))
{
Message message;
MessageReadResult receiveResult;
using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
message = await TryReceive(connection, transaction, receiveCancellationTokenSource).ConfigureAwait(false);
transaction.Commit();
}
receiveResult = await InputQueue.TryReceive(connection, transaction).ConfigureAwait(false);

if (message == null)
{
return;
if (receiveResult == MessageReadResult.NoMessage)
{
receiveCancellationTokenSource.Cancel();
return;
}

if (receiveResult.IsPoison)
{
await ErrorQueue.DeadLetter(receiveResult.PoisonMessage, connection, transaction).ConfigureAwait(false);
transaction.Commit();
return;
}

if (await TryHandleDelayedMessage(receiveResult.Message, connection, transaction).ConfigureAwait(false))
{
transaction.Commit();
return;
}

transaction.Commit();
}

var transportTransaction = new TransportTransaction();
transportTransaction.Set(SettingsKeys.TransportTransactionSqlConnectionKey, connection);

try
{
await TryProcessingMessage(message, transportTransaction).ConfigureAwait(false);
await TryProcessingMessage(receiveResult.Message, transportTransaction).ConfigureAwait(false);
}
catch (Exception exception)
catch (Exception ex)
{
await HandleError(exception, message, transportTransaction, 1).ConfigureAwait(false);
// Since this is TransactionMode.None, we don't care whether error handling says handled or retry. Message is gone either way.
_ = await HandleError(ex, receiveResult.Message, transportTransaction, 1).ConfigureAwait(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,32 @@ public override async Task ReceiveMessage(CancellationTokenSource receiveCancell
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions, TransactionScopeAsyncFlowOption.Enabled))
using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false))
{
message = await TryReceive(connection, null, receiveCancellationTokenSource).ConfigureAwait(false);
var receiveResult = await InputQueue.TryReceive(connection, null).ConfigureAwait(false);

if (message == null)
if (receiveResult == MessageReadResult.NoMessage)
{
receiveCancellationTokenSource.Cancel();
return;
}

if (receiveResult.IsPoison)
{
await ErrorQueue.DeadLetter(receiveResult.PoisonMessage, connection, null).ConfigureAwait(false);
scope.Complete();
return;
}

if (await TryHandleDelayedMessage(receiveResult.Message, connection, null).ConfigureAwait(false))
{
// The message was received but is not fit for processing (e.g. was DLQd).
// In such a case we still need to commit the transport tx to remove message
// from the queue table.
scope.Complete();
return;
}

message = receiveResult.Message;

connection.Close();

if (!await TryProcess(message, PrepareTransportTransaction()).ConfigureAwait(false))
if (!await TryProcess(receiveResult.Message, PrepareTransportTransaction()).ConfigureAwait(false))
{
return;
}
Expand Down
25 changes: 1 addition & 24 deletions src/NServiceBus.Transport.SqlServer/Receiving/ReceiveStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,6 @@ public void Init(TableBasedQueue inputQueue, TableBasedQueue errorQueue, Func<Me

public abstract Task ReceiveMessage(CancellationTokenSource receiveCancellationTokenSource);

protected async Task<Message> TryReceive(SqlConnection connection, SqlTransaction transaction, CancellationTokenSource receiveCancellationTokenSource)
{
var receiveResult = await InputQueue.TryReceive(connection, transaction).ConfigureAwait(false);

if (receiveResult.IsPoison)
{
await ErrorQueue.DeadLetter(receiveResult.PoisonMessage, connection, transaction).ConfigureAwait(false);
return null;
}

if (receiveResult.Successful)
{
if (await TryHandleDelayedMessage(receiveResult.Message, connection, transaction).ConfigureAwait(false))
{
return null;
}

return receiveResult.Message;
}
receiveCancellationTokenSource.Cancel();
return null;
}

protected async Task<bool> TryProcessingMessage(Message message, TransportTransaction transportTransaction)
{
if (message.Expired) //Do not process expired messages
Expand Down Expand Up @@ -99,7 +76,7 @@ protected async Task<ErrorHandleResult> HandleError(Exception exception, Message
}
}

async Task<bool> TryHandleDelayedMessage(Message message, SqlConnection connection, SqlTransaction transaction)
protected async Task<bool> TryHandleDelayedMessage(Message message, SqlConnection connection, SqlTransaction transaction)
{
if (message.Headers.TryGetValue(ForwardHeader, out var forwardDestination))
{
Expand Down

0 comments on commit 6571fec

Please sign in to comment.