Skip to content

Commit

Permalink
Merge pull request #468 from Particular/outbox-txscope-fix
Browse files Browse the repository at this point in the history
Moving Outbox TransactionScope fix to the release branch
  • Loading branch information
danielmarbach authored Aug 20, 2020
2 parents 384acde + f915836 commit 19fcada
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
namespace NServiceBus.AcceptanceTests.Outbox
{
using System.Threading.Tasks;
using System.Transactions;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;

[TestFixture]
public class When_using_outbox_with_transaction_scope : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_float_transaction_scope_into_handler()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b => b.When(s => s.SendLocal(new MyMessage())))
.Done(c => c.Done)
.Run()
.ConfigureAwait(false);

Assert.NotNull(context.Transaction);
}

public class Context : ScenarioContext
{
public bool Done { get; set; }
public Transaction Transaction { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(c => { c.EnableOutbox().UseTransactionScope(); });
}

public class MyMessageHandler : IHandleMessages<MyMessage>
{
Context context;

public MyMessageHandler(Context context)
{
this.context = context;
}


public Task Handle(MyMessage message, IMessageHandlerContext handlerContext)
{
context.Transaction = Transaction.Current;
context.Done = true;
return Task.CompletedTask;
}
}
}

public class MyMessage : IMessage
{
public string Property { get; set; }
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Transactions;
using AcceptanceTesting;
using EndpointTemplates;
using Logging;
using NServiceBus.Pipeline;
using NUnit.Framework;

public class When_using_transaction_scope_outbox_and_wrapping_handlers : NServiceBusAcceptanceTest
Expand Down Expand Up @@ -46,12 +48,24 @@ public OutboxTransactionScopeSagaEndpoint()
{
EndpointSetup<DefaultServer>(b =>
{
b.Pipeline.Register(new BehaviorThatCreatesACustomScope(), "Creates a custom transaction scope");

var outbox = b.EnableOutbox();
outbox.UseTransactionScope();
b.UnitOfWork().WrapHandlersInATransactionScope();
});
}


class BehaviorThatCreatesACustomScope : Behavior<IIncomingLogicalMessageContext>
{
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
using (new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled))
{
await next().ConfigureAwait(false);
}
}
}

class OutboxTransactionScopeSaga : Saga<OutboxTransactionScopeSagaData>,
IAmStartedByMessages<StartSagaMessage>,
IAmStartedByMessages<CheckSagaMessage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public async Task Setup()
public async Task TearDown()
{
await sessionFactory.CloseAsync();
//await schema.DropAsync(false, true);
await schema.DropAsync(false, true);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
namespace NServiceBus.NHibernate.Tests.Outbox
{
using System;
using System.Threading.Tasks;
using global::NHibernate;
using global::NHibernate.Cfg;
using global::NHibernate.Mapping.ByCode;
using global::NHibernate.Tool.hbm2ddl;
using Extensibility;
using global::NHibernate.Dialect;
using NHibernate.Outbox;
using NServiceBus.Outbox.NHibernate;
using NUnit.Framework;

[TestFixture(false)]
[TestFixture(true)]
class When_using_transaction_scope
{
bool pessimistic;
INHibernateOutboxStorage persister;
ISessionFactory sessionFactory;
SchemaExport schema;
OutboxPersisterFactory<OutboxRecord> outboxPersisterFactory;

public When_using_transaction_scope(bool pessimistic)
{
this.pessimistic = pessimistic;
}

[SetUp]
public async Task Setup()
{
var mapper = new ModelMapper();
mapper.AddMapping(typeof(OutboxRecordMapping));

var cfg = new Configuration()
.DataBaseIntegration(x =>
{
x.Dialect<MsSql2012Dialect>();
x.ConnectionString = Consts.SqlConnectionString;
});

cfg.AddMapping(mapper.CompileMappingForAllExplicitlyAddedEntities());

schema = new SchemaExport(cfg);
await schema.DropAsync(false, true);
await schema.CreateAsync(false, true);

sessionFactory = cfg.BuildSessionFactory();
outboxPersisterFactory = new OutboxPersisterFactory<OutboxRecord>();
persister = outboxPersisterFactory.Create(sessionFactory, "TestEndpoint", pessimistic, true);
}

[TearDown]
public async Task TearDown()
{
await sessionFactory.CloseAsync();
await schema.DropAsync(false, true);
}

[Test]
public async Task Should_allow_the_transaction_to_flow_to_handling_code()
{
var messageId = Guid.NewGuid().ToString("N");

var contextBag = new ContextBag();
await persister.Get(messageId, contextBag);

using (var transaction = await persister.BeginTransaction(contextBag))
{
var ambientTransaction = System.Transactions.Transaction.Current;

Assert.IsNotNull(ambientTransaction);

await transaction.Commit();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ interface INHibernateOutboxTransaction : OutboxTransaction
{
ISession Session { get; }
void OnSaveChanges(Func<Task> callback);
// Prepare is deliberately kept sync to allow floating of TxScope where needed
void Prepare();
Task Begin(string endpointQualifiedMessageId);
Task Complete(string endpointQualifiedMessageId, OutboxMessage outboxMessage, ContextBag context);
void BeginSynchronizedSession(ContextBag context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public void OnSaveChanges(Func<Task> callback)
};
}


public async Task Commit()
{
await onSaveChangesCallback().ConfigureAwait(false);
Expand All @@ -57,12 +58,17 @@ public void Dispose()
Func<Task> onSaveChangesCallback = () => Task.CompletedTask;
ITransaction transaction;

public Task Begin(string endpointQualifiedMessageId)
public void Prepare()
{
//NOOP
}

public async Task Begin(string endpointQualifiedMessageId)
{
Session = sessionFactory.OpenSession();
transaction = Session.BeginTransaction();

return concurrencyControlStrategy.Begin(endpointQualifiedMessageId, Session);
await concurrencyControlStrategy.Begin(endpointQualifiedMessageId, Session).ConfigureAwait(false);
}

public Task Complete(string endpointQualifiedMessageId, OutboxMessage outboxMessage, ContextBag context)
Expand All @@ -77,7 +83,7 @@ public void BeginSynchronizedSession(ContextBag context)
Log.Warn("The endpoint is configured to use Outbox but a TransactionScope has been detected. " +
"In order to make the Outbox compatible with TransactionScope, use " +
"config.EnableOutbox().UseTransactionScope(). " +
"Do not use config.UnitOfWork().WrapHandlersInATransactionScope().");
"Remove any custom TransactionScope added to the pipeline.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,15 @@ public void Dispose()
}
}

public async Task Begin(string endpointQualifiedMessageId)
// Prepare is deliberately kept sync to allow floating of TxScope where needed
public void Prepare()
{
transactionScope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled);
ambientTransaction = Transaction.Current;
}

public async Task Begin(string endpointQualifiedMessageId)
{
Session = OpenSession();
await concurrencyControlStrategy.Begin(endpointQualifiedMessageId, Session).ConfigureAwait(false);
await Session.FlushAsync().ConfigureAwait(false);
Expand All @@ -95,7 +100,7 @@ public void BeginSynchronizedSession(ContextBag context)
{
Log.Warn("The endpoint is configured to use Outbox with TransactionScope but a different TransactionScope " +
"has been detected in the current context. " +
"Do not use config.UnitOfWork().WrapHandlersInATransactionScope().");
"Remove any custom TransactionScope added to the pipeline.");
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/NServiceBus.NHibernate/Outbox/OutboxPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,30 @@ await tx.CommitAsync()
}
}

public async Task<OutboxTransaction> BeginTransaction(ContextBag context)
public Task<OutboxTransaction> BeginTransaction(ContextBag context)
{
//Provided by Get
var endpointQualifiedMessageId = context.Get<string>(EndpointQualifiedMessageIdContextKey);

var result = outboxTransactionFactory();
result.Prepare();
// we always need to avoid using async/await in here so that the transaction scope can float!
return BeginTransactionInternal(result, endpointQualifiedMessageId);
}

private static async Task<OutboxTransaction> BeginTransactionInternal(INHibernateOutboxTransaction transaction, string endpointQualifiedMessageId)
{
try
{
await result.Begin(endpointQualifiedMessageId).ConfigureAwait(false);
return result;
await transaction.Begin(endpointQualifiedMessageId).ConfigureAwait(false);
return transaction;
}
catch (Exception e)
{
result.Dispose();
// A method that returns something that is disposable should not throw during the creation
// of the disposable resource. If it does the compiler generated code will not dispose anything
// therefore we need to dispose here to prevent the connection being returned to the pool being
// in a zombie state.
transaction.Dispose();
throw new Exception("Error while opening outbox transaction", e);
}
}
Expand Down

0 comments on commit 19fcada

Please sign in to comment.