Skip to content

Commit

Permalink
Merge branch 'migration-spike'
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Jan 15, 2020
2 parents e2ce1cb + 5e1a645 commit 1c82320
Show file tree
Hide file tree
Showing 78 changed files with 3,256 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/AcceptanceTesting/AcceptanceTesting.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,6 @@ static void Configure(TransportExtensions<TestTransport> transportConfig, string
var storageDir = Path.Combine(tempDir, testRunId, brokerId);

transportConfig.StorageDirectory(storageDir);
transportConfig.BrokerName(brokerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@ public static IEnumerable<Type> GetTypesScopedByTestClass(this EndpointCustomiza

var types = assemblies.Assemblies
//exclude all test types by default
.Where(a =>
{
var references = a.GetReferencedAssemblies();

return references.All(an => an.Name != "nunit.framework");
})
.Where(a => !a.GetName().Name.EndsWith("AcceptanceTests"))
.SelectMany(a => a.GetTypes());

types = types.Union(GetNestedTypeRecursive(endpointConfiguration.BuilderType.DeclaringType, endpointConfiguration.BuilderType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,47 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Extensibility;
using NServiceBus.Features;
using NServiceBus.Persistence;
using NServiceBus.Unicast.Subscriptions;
using NServiceBus.Unicast.Subscriptions.MessageDrivenSubscriptions;

public class InMemoryPersistence : PersistenceDefinition
{
internal InMemoryPersistence()
{
Supports<StorageType.Subscriptions>(s =>
{
s.EnableFeatureByDefault<InMemorySubscriptionPersistence>();
});
}
}

public static class InMemoryPersistenceExtensions
{
public static void UseStorage(this PersistenceExtensions<InMemoryPersistence> extensions, InMemorySubscriptionStorage storageInstance)
{
extensions.GetSettings().Set("InMemoryPersistence.StorageInstance", storageInstance);
}
}

public class InMemorySubscriptionPersistence : Feature
{
internal InMemorySubscriptionPersistence()
{
DependsOn<MessageDrivenSubscriptions>();
}

protected override void Setup(FeatureConfigurationContext context)
{
var storageInstance = context.Settings.GetOrDefault<InMemorySubscriptionStorage>("InMemoryPersistence.StorageInstance");
context.Container.RegisterSingleton<ISubscriptionStorage>(storageInstance ?? new InMemorySubscriptionStorage());
}
}

public class InMemorySubscriptionStorage : ISubscriptionStorage
{
public Task Subscribe(Subscriber subscriber, MessageType messageType, ContextBag context)
Expand Down
2 changes: 1 addition & 1 deletion src/AcceptanceTesting/Infrastructure/RouterComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public RouterComponent(Func<ScenarioContext, RouterConfiguration> config)
public Task<ComponentRunner> CreateRunner(RunDescriptor run)
{
var config = configCallback(run.ScenarioContext);

config.Settings.Set<ScenarioContext>(run.ScenarioContext);
config.AutoCreateQueues();
var newFactories = new List<Func<Interface>>();

Expand Down
2 changes: 1 addition & 1 deletion src/AcceptanceTesting/Infrastructure/SpyComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ public Task<ComponentRunner> CreateRunner(RunDescriptor run)
{
var scenarioContext = (T)run.ScenarioContext;

return Task.FromResult<ComponentRunner>(new SpyComponentRunner(endpointName, transportConfiguration, (messageContext, messages) => onMessage(scenarioContext, messageContext, messages)));
return Task.FromResult<ComponentRunner>(new SpyComponentRunner(endpointName, transportConfiguration, (messageContext, messages) => onMessage(scenarioContext, messageContext, messages), scenarioContext));
}
}
6 changes: 5 additions & 1 deletion src/AcceptanceTesting/Infrastructure/SpyComponentRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Raw;
using NServiceBus.Transport;
Expand All @@ -10,14 +11,16 @@ class SpyComponentRunner : ComponentRunner
{
Action<TransportExtensions<TestTransport>> transportConfiguration;
Func<MessageContext, IDispatchMessages, Task> onMessage;
ScenarioContext scenarioContext;
string endpointName;
IReceivingRawEndpoint endpoint;

public SpyComponentRunner(string endpointName, Action<TransportExtensions<TestTransport>> transportConfiguration,
Func<MessageContext, IDispatchMessages, Task> onMessage)
Func<MessageContext, IDispatchMessages, Task> onMessage, ScenarioContext scenarioContext)
{
this.transportConfiguration = transportConfiguration;
this.onMessage = onMessage;
this.scenarioContext = scenarioContext;
this.endpointName = endpointName;
}

Expand All @@ -27,6 +30,7 @@ public override async Task Start(CancellationToken token)
{
var config = RawEndpointConfiguration.Create(endpointName, onMessage, "poison");
config.AutoCreateQueue();
config.Settings.Set<ScenarioContext>(scenarioContext);
config.CustomErrorHandlingPolicy(new IgnoreErrorsPolicy());
var transport = config.UseTransport<TestTransport>();
transportConfiguration(transport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
/// </summary>
public static class TestTransportConfigurationExtensions
{
public static void BrokerName(this TransportExtensions<TestTransport> transportExtensions, string brokerName)
{
transportExtensions.GetSettings().Set(TestTransportInfrastructure.BrokerNameKey, brokerName);
}

/// <summary>
/// Configures the location where message files are stored.
/// </summary>
Expand Down
51 changes: 50 additions & 1 deletion src/AcceptanceTesting/TestTransport/TestTransportDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ namespace NServiceBus
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AcceptanceTesting;
using DelayedDelivery;
using Extensibility;
using Performance.TimeToBeReceived;
using Transport;
using Unicast.Queuing;

class TestTransportDispatcher : IDispatchMessages
{
public TestTransportDispatcher(string basePath, int maxMessageSizeKB)
public TestTransportDispatcher(string basePath, int maxMessageSizeKB, string brokerName, ScenarioContext scenarioContext)
{
if (maxMessageSizeKB > int.MaxValue / 1024)
{
Expand All @@ -22,6 +24,7 @@ public TestTransportDispatcher(string basePath, int maxMessageSizeKB)

this.basePath = basePath;
this.maxMessageSizeKB = maxMessageSizeKB;
this.brokerName = brokerName;
}

public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, ContextBag context)
Expand All @@ -37,12 +40,23 @@ async Task DispatchMulticast(IEnumerable<MulticastTransportOperation> transportO

foreach (var transportOperation in transportOperations)
{
if (!transportOperation.Message.Headers.TryGetValue(Headers.MessageId, out var messageId))
{
messageId = transportOperation.Message.MessageId;
}

if (!transportOperation.Message.Headers.TryGetValue(Headers.MessageIntent, out var intent))
{
intent = "<Unknown>";
}

var subscribers = await GetSubscribersFor(transportOperation.MessageType)
.ConfigureAwait(false);

foreach (var subscriber in subscribers)
{
tasks.Add(WriteMessage(subscriber, transportOperation, transaction));
Console.WriteLine($"Publishing message {messageId} with intent {intent} sent to {subscriber}");
}
}

Expand All @@ -56,6 +70,17 @@ Task DispatchUnicast(IEnumerable<UnicastTransportOperation> operations, Transpor
{
PathChecker.ThrowForBadPath(operation.Destination, "message destination");

if (!operation.Message.Headers.TryGetValue(Headers.MessageId, out var messageId))
{
messageId = operation.Message.MessageId;
}

if (!operation.Message.Headers.TryGetValue(Headers.MessageIntent, out var intent))
{
intent = "<Unknown>";
}
Console.WriteLine($"Sending message {messageId} with intent {intent} sent to {operation.Destination}");

return WriteMessage(operation.Destination, operation, transaction);
}));
}
Expand All @@ -64,8 +89,31 @@ async Task WriteMessage(string destination, IOutgoingTransportOperation transpor
{
var message = transportOperation.Message;

if (destination.IndexOf("@", StringComparison.Ordinal) != -1)
{
var parts = destination.Split(new[] {'@'}, StringSplitOptions.RemoveEmptyEntries);

var broker = parts[1];

if (broker != brokerName)
{
throw new Exception($"Attempt to send a message to broker {broker} through transport configured for {brokerName}.");
}
}
else
{
//Default to sending to local broker
destination = $"{destination}@{brokerName}";
}

var nativeMessageId = Guid.NewGuid().ToString();
var destinationPath = Path.Combine(basePath, destination);

if (!Directory.Exists(destinationPath))
{
throw new QueueNotFoundException(destination, "Destination queue does not exist.", null);
}

var bodyDir = Path.Combine(destinationPath, TestTransportMessagePump.BodyDirName);

Directory.CreateDirectory(bodyDir);
Expand Down Expand Up @@ -190,6 +238,7 @@ static IEnumerable<Type> GetPotentialEventTypes(Type messageType)
static bool IsCoreMarkerInterface(Type type) => type == typeof(IMessage) || type == typeof(IEvent) || type == typeof(ICommand);

int maxMessageSizeKB;
string brokerName;
string basePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using DelayedDelivery;
using Performance.TimeToBeReceived;
using Routing;
Expand All @@ -23,6 +24,8 @@ public TestTransportInfrastructure(SettingsHolder settings)
storagePath = Path.Combine(solutionRoot, ".learningtransport");
}

brokerName = settings.GetOrDefault<string>(BrokerNameKey) ?? "";

var errorQueueAddress = settings.ErrorQueueAddress();
PathChecker.ThrowForBadPath(errorQueueAddress, "ErrorQueueAddress");

Expand Down Expand Up @@ -74,7 +77,9 @@ public override TransportSendInfrastructure ConfigureSendInfrastructure()
{
var maxPayloadSize = settings.GetOrDefault<bool>(NoPayloadSizeRestrictionKey) ? int.MaxValue / 1024 : 64; //64 kB is the max size of the ASQ transport

return new TransportSendInfrastructure(() => new TestTransportDispatcher(storagePath, maxPayloadSize), () => Task.FromResult(StartupCheckResult.Success));
//var scenarioContext = settings.Get<ScenarioContext>();

return new TransportSendInfrastructure(() => new TestTransportDispatcher(storagePath, maxPayloadSize, brokerName, null), () => Task.FromResult(StartupCheckResult.Success));
}

public override TransportSubscriptionInfrastructure ConfigureSubscriptionInfrastructure()
Expand Down Expand Up @@ -116,13 +121,15 @@ public override string ToTransportAddress(LogicalAddress logicalAddress)
address += "-" + qualifier;
}

return address;
return $"{address}@{brokerName}";
}

string storagePath;
SettingsHolder settings;
string brokerName;

public const string StorageLocationKey = "TestTransport.StoragePath";
public const string BrokerNameKey = "TestTransport.BrokerNameKey";
public const string NoPayloadSizeRestrictionKey = "TestTransport.NoPayloadSizeRestrictionKey";
public const string NoNativePubSub = "TestTransport.NoNativePubSub";
}
Expand Down
19 changes: 19 additions & 0 deletions src/Billing/AuthorizeTransactionResponseHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Logging;

class AuthorizeTransactionResponseHandler :
IHandleMessages<AuthorizeTransactionResponse>
{
public Task Handle(AuthorizeTransactionResponse message, IMessageHandlerContext context)
{
log.Info($"Payment transaction for order {message.OrderId} authorized.");

return context.Publish(new PaymentCompleted
{
OrderId = message.OrderId
});
}

static ILog log = LogManager.GetLogger<OrderAcceptedHandler>();
}
13 changes: 13 additions & 0 deletions src/Billing/Billing.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net461</TargetFramework>
<OutputType>Exe</OutputType>
<LangVersion>7.1</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\NServiceBus.Router.Migrator\NServiceBus.Router.Migrator.csproj" />
<ProjectReference Include="..\Shared\Shared.csproj" />
<PackageReference Include="NServiceBus.SqlServer" Version="5.*" />
<PackageReference Include="NServiceBus.Transport.Msmq" Version="1.*" />
</ItemGroup>
</Project>
19 changes: 19 additions & 0 deletions src/Billing/OrderAcceptedHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Logging;

class OrderAcceptedHandler :
IHandleMessages<OrderAccepted>
{
public Task Handle(OrderAccepted message, IMessageHandlerContext context)
{
log.Info($"Attempting to authorize the payment transaction for order {message.OrderId}.");

return context.Send(new AuthorizeTransaction
{
OrderId = message.OrderId
});
}

static ILog log = LogManager.GetLogger<OrderAcceptedHandler>();
}
Loading

0 comments on commit 1c82320

Please sign in to comment.