Skip to content

Commit

Permalink
Single router tests working
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Oct 18, 2023
1 parent 35064fa commit 3859dd0
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 135 deletions.
2 changes: 1 addition & 1 deletion src/AcceptanceTesting/AcceptanceTesting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<ItemGroup>
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="3.0.*" />
<PackageReference Include="NServiceBus.AcceptanceTesting" Version="8.0.*" />
<PackageReference Include="NServiceBus.AcceptanceTesting" Version="8.*" />
<PackageReference Include="NUnit" Version="3.13.3" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,48 @@
namespace NServiceBus.AcceptanceTests.EndpointTemplates
{
using Configuration.AdvancedExtensibility;
using Transport;
using System.Threading.Tasks;
using AcceptanceTesting.Support;
using ObjectBuilder;
using NServiceBus.AcceptanceTesting.Support;

public static class ConfigureExtensions
{
public static RoutingSettings ConfigureRouting(this EndpointConfiguration configuration) =>
new RoutingSettings(configuration.GetSettings());

// This is kind of a hack because the acceptance testing framework doesn't give any access to the transport definition to individual tests.
public static TransportDefinition ConfigureTransport(this EndpointConfiguration configuration) =>
configuration.GetSettings().Get<TransportDefinition>();

public static TTransportDefinition ConfigureTransport<TTransportDefinition>(
this EndpointConfiguration configuration)
where TTransportDefinition : TransportDefinition =>
(TTransportDefinition)configuration.GetSettings().Get<TransportDefinition>();

public static async Task DefineTransport(this EndpointConfiguration config, RunDescriptor runDescriptor, EndpointCustomizationConfiguration endpointCustomizationConfiguration)
{
var transportConfiguration = TestSuiteConstraints.Current.CreateTransportConfiguration();
await transportConfiguration.Configure(endpointCustomizationConfiguration.EndpointName, config, runDescriptor.Settings, endpointCustomizationConfiguration.PublisherMetadata);
runDescriptor.OnTestCompleted(_ => transportConfiguration.Cleanup());
}

public static async Task DefineTransport(this EndpointConfiguration config, IConfigureEndpointTestExecution transportConfiguration, RunDescriptor runDescriptor, EndpointCustomizationConfiguration endpointCustomizationConfiguration)
{
await transportConfiguration.Configure(endpointCustomizationConfiguration.EndpointName, config, runDescriptor.Settings, endpointCustomizationConfiguration.PublisherMetadata);
runDescriptor.OnTestCompleted(_ => transportConfiguration.Cleanup());
}

public static async Task DefinePersistence(this EndpointConfiguration config, RunDescriptor runDescriptor, EndpointCustomizationConfiguration endpointCustomizationConfiguration)
{
var persistenceConfiguration = TestSuiteConstraints.Current.CreatePersistenceConfiguration();
await persistenceConfiguration.Configure(endpointCustomizationConfiguration.EndpointName, config, runDescriptor.Settings, endpointCustomizationConfiguration.PublisherMetadata);
runDescriptor.OnTestCompleted(_ => persistenceConfiguration.Cleanup());
}

public static void RegisterComponentsAndInheritanceHierarchy(this EndpointConfiguration builder, RunDescriptor runDescriptor)
public static async Task DefinePersistence(this EndpointConfiguration config, IConfigureEndpointTestExecution persistenceConfiguration, RunDescriptor runDescriptor, EndpointCustomizationConfiguration endpointCustomizationConfiguration)
{
builder.RegisterComponents(r => { RegisterInheritanceHierarchyOfContextOnContainer(runDescriptor, r); });
}

static void RegisterInheritanceHierarchyOfContextOnContainer(RunDescriptor runDescriptor, IConfigureComponents r)
{
var type = runDescriptor.ScenarioContext.GetType();
while (type != typeof(object))
{
r.RegisterSingleton(type, runDescriptor.ScenarioContext);
type = type.BaseType;
}
await persistenceConfiguration.Configure(endpointCustomizationConfiguration.EndpointName, config, runDescriptor.Settings, endpointCustomizationConfiguration.PublisherMetadata);
runDescriptor.OnTestCompleted(_ => persistenceConfiguration.Cleanup());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,58 +1,35 @@
namespace NServiceBus.AcceptanceTests.EndpointTemplates
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AcceptanceTesting.Customization;
using AcceptanceTesting.Support;
using Configuration.AdvancedExtensibility;
using Features;

public class DefaultServer : IEndpointSetupTemplate
{
public DefaultServer()
{
typesToInclude = new List<Type>();
}

public DefaultServer(List<Type> typesToInclude)
{
this.typesToInclude = typesToInclude;
}
public IConfigureEndpointTestExecution TransportConfiguration { get; set; } = TestSuiteConstraints.Current.CreateTransportConfiguration();

#pragma warning disable CS0618
public async Task<EndpointConfiguration> GetConfiguration(RunDescriptor runDescriptor, EndpointCustomizationConfiguration endpointConfiguration, Action<EndpointConfiguration> configurationBuilderCustomization)
#pragma warning restore CS0618
public async Task<EndpointConfiguration> GetConfiguration(RunDescriptor runDescriptor, EndpointCustomizationConfiguration endpointConfiguration, Func<EndpointConfiguration, Task> configurationBuilderCustomization)
{
var types = endpointConfiguration.GetTypesScopedByTestClass();

typesToInclude.AddRange(types);

var configuration = new EndpointConfiguration(endpointConfiguration.EndpointName);

configuration.TypesToIncludeInScan(typesToInclude);
configuration.EnableInstallers();

configuration.DisableFeature<TimeoutManager>();

var recoverability = configuration.Recoverability();
recoverability.Delayed(delayed => delayed.NumberOfRetries(0));
recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
configuration.SendFailedMessagesTo("error");
configuration.UseSerialization<NewtonsoftSerializer>();
var builder = new EndpointConfiguration(endpointConfiguration.EndpointName);
builder.EnableInstallers();

configuration.RegisterComponentsAndInheritanceHierarchy(runDescriptor);
builder.Recoverability()
.Delayed(delayed => delayed.NumberOfRetries(0))
.Immediate(immediate => immediate.NumberOfRetries(0));
builder.SendFailedMessagesTo("error");
builder.UseSerialization<NewtonsoftJsonSerializer>();

await configuration.DefinePersistence(runDescriptor, endpointConfiguration).ConfigureAwait(false);
await builder.DefineTransport(TransportConfiguration, runDescriptor, endpointConfiguration).ConfigureAwait(false);

configuration.GetSettings().SetDefault("ScaleOut.UseSingleBrokerQueue", true);
configurationBuilderCustomization(configuration);
await configurationBuilderCustomization(builder).ConfigureAwait(false);

configuration.Pipeline.Register(new TestRunMarker(runDescriptor.ScenarioContext.TestRunId.ToString()), "Marks messages with test run ID.");
builder.Pipeline.Register(new TestRunMarker(runDescriptor.ScenarioContext.TestRunId.ToString()), "Marks messages with test run ID.");

return configuration;
}
// scan types at the end so that all types used by the configuration have been loaded into the AppDomain
builder.ScanTypesForTest(endpointConfiguration);

List<Type> typesToInclude;
return builder;
}
}
}
9 changes: 5 additions & 4 deletions src/LoadTests.Receiver/LoadTests.Receiver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net461</TargetFramework>
<TargetFramework>net48</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\NServiceBus.Router.Connector\NServiceBus.Router.Connector.csproj" />
<ProjectReference Include="..\LoadTests.Shared\LoadTests.Shared.csproj" />

<PackageReference Include="NServiceBus" Version="[7.4.0, 8.0.0)" />
<PackageReference Include="NServiceBus.SqlServer" Version="[4.1.0, 5.0.0)" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="[2.0.0, 3.0.0)" />
<PackageReference Include="NServiceBus" Version="8.*" />
<PackageReference Include="NServiceBus.SqlServer" Version="7.*" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="3.*" />
<PackageReference Include="NServiceBus.Persistence.NonDurable" Version="1.*" />
<PackageReference Include="Metrics.NET" Version="0.5.5" />
</ItemGroup>
</Project>
17 changes: 8 additions & 9 deletions src/LoadTests.Receiver/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
using System;
using System.Threading.Tasks;
using Metrics;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
using NServiceBus.Transport.SQLServer;

class Program
{
Expand All @@ -18,26 +18,25 @@ static void Main(string[] args)
static async Task Start()
{
var config = new EndpointConfiguration("Receiver");
config.UseSerialization<NewtonsoftSerializer>();
config.UseSerialization<NewtonsoftJsonSerializer>();
config.SendFailedMessagesTo("error");
config.EnableInstallers();
config.UsePersistence<InMemoryPersistence>();
config.UsePersistence<NonDurablePersistence>();

Metric.Config.WithReporting(r =>
{
r.WithCSVReports(".", TimeSpan.FromSeconds(5));
});

config.RegisterComponents(c => c.RegisterSingleton(new Statistics()));
config.RegisterComponents(c => c.AddSingleton(new Statistics()));

var connectionString = SettingsReader<string>.Read("SqlConnectionString", "data source=(local); initial catalog=loadtest; integrated security=true");

var senderTransport = config.UseTransport<SqlServerTransport>();
senderTransport.UseNativeDelayedDelivery().DisableTimeoutManagerCompatibility();
senderTransport.ConnectionString(connectionString);
senderTransport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
var transport = new SqlServerTransport(connectionString);
transport.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;
var routing = config.UseTransport(transport);

senderTransport.Routing().RouteToEndpoint(typeof(ProcessingReport), "Sender");
routing.RouteToEndpoint(typeof(ProcessingReport), "Sender");

endpointInstance = await Endpoint.Start(config);
Console.WriteLine("Press <enter> to exit.");
Expand Down
4 changes: 2 additions & 2 deletions src/LoadTests.Receiver/ReportProcessingStatistics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public ReportProcessingStatistics(Statistics statistics)
this.statistics = statistics;
}

protected override Task OnStart(IMessageSession session)
protected override Task OnStart(IMessageSession session, CancellationToken token)
{
tokenSource = new CancellationTokenSource();
reportTask = Task.Run(async () =>
Expand All @@ -40,7 +40,7 @@ protected override Task OnStart(IMessageSession session)
return Task.CompletedTask;
}

protected override async Task OnStop(IMessageSession session)
protected override async Task OnStop(IMessageSession session, CancellationToken token)
{
tokenSource?.Cancel();
if (reportTask != null)
Expand Down
12 changes: 2 additions & 10 deletions src/LoadTests.Shared/LoadTests.Shared.csproj
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net461</TargetFramework>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<LangVersion>7.1</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<LangVersion>7.1</LangVersion>
<TargetFramework>net48</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NServiceBus" Version="[7.4.0, 8.0.0)" />
<PackageReference Include="NServiceBus" Version="8.*" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public virtual async Task<EndpointConfiguration> GetConfiguration(RunDescriptor
await configurationBuilderCustomization(builder).ConfigureAwait(false);

// scan types at the end so that all types used by the configuration have been loaded into the AppDomain
builder.TypesToIncludeInScan(endpointConfiguration.GetTypesScopedByTestClass());
builder.ScanTypesForTest(endpointConfiguration);
//builder.TypesToIncludeInScan(endpointConfiguration.GetTypesScopedByTestClass());

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ public class InMemorySubscriptionPersistence : Feature
{
internal InMemorySubscriptionPersistence()
{
#pragma warning disable 618
DependsOn<MessageDrivenSubscriptions>();
#pragma warning restore 618
}

protected override void Setup(FeatureConfigurationContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<PackageReference Include="NServiceBus.Metrics.ServiceControl" Version="4.*" />
<PackageReference Include="NServiceBus.SqlServer" Version="7.*" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="3.*" />
<PackageReference Include="NServiceBus.AcceptanceTesting" Version="8.1.1" />
<PackageReference Include="NServiceBus.AcceptanceTesting" Version="8.*" />
</ItemGroup>
<ItemGroup>
<!--<ProjectReference Include="..\AcceptanceTesting\AcceptanceTesting.csproj" />-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task It_should_deliver_the_message_to_both_subscribers()
a.Broker().Alpha();

var b = cfg.AddInterface("B", false);
b.EnableMessageDrivenPublishSubscribe(alphaSubscriptionStore);
b.EnableMessageDrivenPublishSubscribe(bravoSubscriptionStore);
b.Broker().Bravo();

cfg.AddInterface("C").Broker().Yankee();
Expand Down Expand Up @@ -73,7 +73,7 @@ public Publisher()
var routing = c.ConfigureRouting();

routing.ConnectToRouter("Router", false, true);
});
}).IncludeType<InMemorySubscriptionPersistence>();
}
}

Expand All @@ -89,7 +89,7 @@ public BaseEventSubscriber()
var routing = c.ConfigureRouting();

routing.ConnectToRouter("Router", true, false);
});
}).IncludeType<InMemorySubscriptionPersistence>();
}

class BaseEventHandler : IHandleMessages<MyBaseEvent2>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ namespace NServiceBus.Router.AcceptanceTests.SingleRouter
using AcceptanceTesting.Customization;

[TestFixture]
public class When_subscribing_from_native_and_message_driven_endpoints : NServiceBusAcceptanceTest
public class When_subscribing_from_native_and_message_driven : NServiceBusAcceptanceTest
{
static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(Publisher));

[Test]
public async Task It_should_deliver_the_message_to_both_subscribers()
public async Task It_should_deliver_the_message_to_both()
{
var result = await Scenario.Define<Context>()
.WithRouter("Router", cfg =>
Expand All @@ -25,7 +25,7 @@ public async Task It_should_deliver_the_message_to_both_subscribers()
var a = cfg.AddInterface("A", false);
a.Broker().Alpha();

//DerivedEventSubscriber - Broker C`
//DerivedEventSubscriber - Broker C
cfg.AddInterface("C").Broker().Zulu();

var routeTable = cfg.UseStaticRoutingProtocol();
Expand Down
3 changes: 2 additions & 1 deletion src/NServiceBus.Router.Connector/RouterConnectionFeature.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Linq;
using System.Runtime;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
using NServiceBus.Features;
Expand Down Expand Up @@ -30,7 +31,7 @@ protected override void Setup(FeatureConfigurationContext context)
unicastRouteTable.AddOrReplaceRoutes("NServiceBus.Router_"+connection.RouterAddress, routes);
}

if (!nativePubSub)
if (!nativePubSub || (context.Settings.TryGet("NServiceBus.Subscriptions.EnableMigrationMode", out bool enabled) && enabled))
{
//Register the auto-publish-to-router behavior

Expand Down
Loading

0 comments on commit 3859dd0

Please sign in to comment.