Skip to content

Commit

Permalink
Implement TransactionalSession as part of the persistence (#353)
Browse files Browse the repository at this point in the history
* Update to Core 7.8 beta

* Make the session a first class citizen

* Drop no longer required target framework

* Make sure the persistence assembly is loaded

* Remove dotsettings

* Tweak readme

* prevent exceptions when calling dispose multiple times (#347)

(cherry picked from commit 357e7ae)

* Add test to verify synchronized storage integration works

* Switch to the stable version

* backport transaction session support

* add missing di test for tx session acceptance tests

* Align the namespace (#359)

* Align session option name

* update to tx session rtm package

Co-authored-by: Tim Bussmann <[email protected]>
  • Loading branch information
danielmarbach and timbussmann authored Sep 13, 2022
1 parent bf4a903 commit 49c6951
Show file tree
Hide file tree
Showing 50 changed files with 1,139 additions and 661 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ jobs:
- name: Setup .NET SDK
uses: actions/[email protected]
with:
dotnet-version: |
6.0.x
3.1.x
dotnet-version: 6.0.x
- name: Build
run: dotnet build src --configuration Release
- name: Upload packages
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ For developers using Docker containers, the following docker command will quickl

Once started, initialize the replication set (required for transaction support) by connecting to the database using a mongo shell. You can connect directly from your local machine using `mongo.exe` or use the following docker command to start a mongo shell inside the container and initialize the replication set:

`docker exec -it TestMongoDB mongo --eval 'rs.initiate()'`
`docker exec -it TestMongoDB mongosh --eval 'rs.initiate()'`
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using MongoDB.Driver;
using MongoDB.Driver.Core.Events;
using NServiceBus;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Configuration.AdvancedExtensibility;

class ConfigureEndpointMongoPersistence : IConfigureEndpointTestExecution
{
const string databaseName = "AcceptanceTests";
public const string DatabaseName = "AcceptanceTests";
public const string InterceptedCommands = "MongoDB.AcceptanceTests.InterceptedCommands";
IMongoClient client;

public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
{
var containerConnectionString = Environment.GetEnvironmentVariable("NServiceBusStorageMongoDB_ConnectionString");

client = string.IsNullOrWhiteSpace(containerConnectionString) ? new MongoClient() : new MongoClient(containerConnectionString);
var commands = new ConcurrentQueue<string>();
configuration.GetSettings().Set(InterceptedCommands, commands);

configuration.UsePersistence<MongoPersistence>().MongoClient(client).DatabaseName(databaseName);
var mongoClientSettings = string.IsNullOrWhiteSpace(containerConnectionString)
? new MongoClientSettings()
: MongoClientSettings.FromConnectionString(containerConnectionString);
mongoClientSettings.ClusterConfigurator = cb =>
{
cb.Subscribe<CommandSucceededEvent>(commandSucceededEvent =>
{
commands.Enqueue($"{commandSucceededEvent.RequestId}-{commandSucceededEvent.CommandName.ToUpper()}");
});
};

client = new MongoClient(mongoClientSettings);

configuration.UsePersistence<MongoPersistence>().MongoClient(client).DatabaseName(DatabaseName);

return Task.FromResult(0);
}
Expand All @@ -24,7 +42,7 @@ public async Task Cleanup()
{
try
{
await client.DropDatabaseAsync(databaseName);
await client.DropDatabaseAsync(DatabaseName);
}
// ReSharper disable once EmptyGeneralCatchClause
catch (Exception)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net461;netcoreapp3.1;net6.0</TargetFrameworks>
<TargetFrameworks>net461;net6.0</TargetFrameworks>
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
<NoWarn>$(NoWarn);SYSLIB0021</NoWarn>
</PropertyGroup>
Expand All @@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="7.5.0" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="7.8.0" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1" />
<PackageReference Include="GitHubActionsTestLogger" Version="1.2.0" />
Expand Down
11 changes: 11 additions & 0 deletions src/NServiceBus.Storage.MongoDB.AcceptanceTests/TestSetup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using NServiceBus;
using NUnit.Framework;

[SetUpFixture]
public class TestSetup
{
[OneTimeSetUp]
public void SetUp() =>
// ensure the persistence assembly is loaded into the AppDomain because it needs its features to be scanned to work properly.
typeof(MongoPersistence).ToString();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
namespace NServiceBus.AcceptanceTests
{
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;
using Storage.MongoDB;

[TestFixture]
public partial class When_using_outbox_synchronized_session_via_container : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_inject_synchronized_session_into_handler()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b => b.When(s => s.SendLocal(new MyMessage())))
.Done(c => c.Done)
.Run()
.ConfigureAwait(false);

Assert.True(context.RepositoryHasMongoSession);
AssertPartitionPart(context);
}

partial void AssertPartitionPart(Context scenarioContext);

public class Context : ScenarioContext
{
public bool Done { get; set; }
public bool RepositoryHasMongoSession { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(config =>
{
config.ConfigureTransport().Transactions(TransportTransactionMode.ReceiveOnly);

config.EnableOutbox();
config.RegisterComponents(c =>
{
c.ConfigureComponent<MyRepository>(DependencyLifecycle.InstancePerUnitOfWork);
});
});
}

public class MyMessageHandler : IHandleMessages<MyMessage>
{
public MyMessageHandler(MyRepository repository, Context context)
{
this.context = context;
this.repository = repository;
}


public Task Handle(MyMessage message, IMessageHandlerContext handlerContext)
{
repository.DoSomething();
context.Done = true;
return Task.CompletedTask;
}

Context context;
MyRepository repository;
}
}

public class MyRepository
{
public MyRepository(IMongoSynchronizedStorageSession storageSession, Context context)
{
this.storageSession = storageSession;
this.context = context;
}

public void DoSomething() => context.RepositoryHasMongoSession = storageSession.MongoSession != null;

IMongoSynchronizedStorageSession storageSession;
Context context;
}

public class MyMessage : IMessage
{
public string Property { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace NServiceBus.AcceptanceTests
{
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;
using Storage.MongoDB;

[TestFixture]
public class When_using_synchronized_session_via_container : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_inject_synchronized_session_into_handler()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b => b.When(s => s.SendLocal(new MyMessage())))
.Done(c => c.Done)
.Run()
.ConfigureAwait(false);

Assert.True(context.HandlerHasMongoSession);
}

public class Context : ScenarioContext
{
public bool Done { get; set; }
public bool HandlerHasMongoSession { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint() => EndpointSetup<DefaultServer>();

public class MyHandler : IHandleMessages<MyMessage>
{
public MyHandler(IMongoSynchronizedStorageSession session, Context context)
{
this.session = session;
this.context = context;
}

public Task Handle(MyMessage message, IMessageHandlerContext handlerContext)
{
context.Done = true;
context.HandlerHasMongoSession = session.MongoSession != null;

return Task.CompletedTask;
}

Context context;
IMongoSynchronizedStorageSession session;
}
}

public class MyMessage : IMessage
{
public string Property { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
namespace NServiceBus.AcceptanceTests
{
using System.Collections.Concurrent;
using System.Threading.Tasks;
using AcceptanceTesting;
using Configuration.AdvancedExtensibility;
using EndpointTemplates;
using NUnit.Framework;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using Storage.MongoDB;

[TestFixture]
public class When_using_synchronized_session_via_container_and_storage_session_extension : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_commit_all_operations_using_the_same_batch()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b =>
{
b.CustomConfig((cfg, ctx) =>
{
ctx.InterceptedCommands = cfg.GetSettings()
.Get<ConcurrentQueue<string>>(ConfigureEndpointMongoPersistence.InterceptedCommands);
});
b.When(s => s.SendLocal(new MyMessage()));
})
.Done(c => c.FirstHandlerIsDone && c.SecondHandlerIsDone)
.Run()
.ConfigureAwait(false);

Assert.That(context.InterceptedCommands, Has.One.Items.Match("COMMITTRANSACTION"));
}

public class Context : ScenarioContext
{
public bool FirstHandlerIsDone { get; set; }
public bool SecondHandlerIsDone { get; set; }
public ConcurrentQueue<string> InterceptedCommands { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint() => EndpointSetup<DefaultServer>();

public class MyHandlerUsingStorageSession : IHandleMessages<MyMessage>
{
public MyHandlerUsingStorageSession(IMongoSynchronizedStorageSession session, Context context)
{
this.session = session;
this.context = context;
}

public async Task Handle(MyMessage message, IMessageHandlerContext handlerContext)
{
var entity = new MyEntity
{
Id = ObjectId.GenerateNewId(),
Data = "MyCustomData"
};
var collection = session.MongoSession.Client.GetDatabase(ConfigureEndpointMongoPersistence.DatabaseName).GetCollection<MyEntity>("myentity");
await collection.InsertOneAsync(session.MongoSession, entity);
context.FirstHandlerIsDone = true;
}

Context context;
IMongoSynchronizedStorageSession session;
}

public class MyHandlerUsingExtensionMethod : IHandleMessages<MyMessage>
{
public MyHandlerUsingExtensionMethod(Context context)
{
this.context = context;
}

public async Task Handle(MyMessage message, IMessageHandlerContext handlerContext)
{
var session = handlerContext.SynchronizedStorageSession.MongoPersistenceSession();

var entity = new MyEntity
{
Id = ObjectId.GenerateNewId(),
Data = "MyCustomData"
};
var collection = session.MongoSession.Client.GetDatabase(ConfigureEndpointMongoPersistence.DatabaseName).GetCollection<MyEntity>("myentity");
await collection.InsertOneAsync(session.MongoSession, entity);
context.SecondHandlerIsDone = true;
}

Context context;
}
}

public class MyEntity
{
[BsonId]
public ObjectId Id { get; set; }
public string Data { get; set; }
}

public class MyMessage : IMessage
{
public string Property { get; set; }
}
}
}
Loading

0 comments on commit 49c6951

Please sign in to comment.