Skip to content

Commit

Permalink
Fixes #22
Browse files Browse the repository at this point in the history
- Initialize router separate from Starting
- Do not drop unrecognized messages silently
  • Loading branch information
SzymonPobiega committed Jan 16, 2020
1 parent 1c82320 commit 8a7951f
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 37 deletions.
17 changes: 11 additions & 6 deletions src/AcceptanceTesting/Infrastructure/RouterComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,28 @@ Interface NewFactory()

class Runner : ComponentRunner
{
IRouter @switch;
IRouter router;

public Runner(IRouter @switch, string name)
public Runner(IRouter router, string name)
{
this.@switch = @switch;
this.router = router;
Name = name;
}

public override Task Start(CancellationToken token)
{
return @switch.Start();
return router.Initialize();
}

public override Task ComponentsStarted(CancellationToken token)
{
return router.Start();
}

public override Task Stop()
{
return @switch != null
? @switch.Stop()
return router != null
? router.Stop()
: Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ string FindSolutionRoot()

public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
{
return new TransportReceiveInfrastructure(() => new TestTransportMessagePump(storagePath), () => new TestTransportQueueCreator(), () => Task.FromResult(StartupCheckResult.Success));
return new TransportReceiveInfrastructure(() => new TestTransportMessagePump(storagePath),
() => new TestTransportQueueCreator(storagePath), () => Task.FromResult(StartupCheckResult.Success));
}

public override TransportSendInfrastructure ConfigureSendInfrastructure()
Expand Down
25 changes: 24 additions & 1 deletion src/AcceptanceTesting/TestTransport/TestTransportQueueCreator.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
namespace NServiceBus
{
using System.IO;
using System.Threading.Tasks;
using Transport;

class TestTransportQueueCreator : ICreateQueues
{
public Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity) => Task.CompletedTask;
readonly string storagePath;

public TestTransportQueueCreator(string storagePath)
{
this.storagePath = storagePath;
}

public Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity)
{
foreach (var queueBinding in queueBindings.ReceivingAddresses)
{
var queuePath = Path.Combine(storagePath, queueBinding);
Directory.CreateDirectory(queuePath);
}

foreach (var queueBinding in queueBindings.SendingAddresses)
{
var queuePath = Path.Combine(storagePath, queueBinding);
Directory.CreateDirectory(queuePath);
}

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System.Threading.Tasks;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

namespace NServiceBus.Router.AcceptanceTests.SingleRouter
{
using System;
using Pipeline;

[TestFixture]
public class When_intentionally_dropping_messages_in_prerouting : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_not_complain()
{
var result = await Scenario.Define<Context>()
.WithRouter("Router", (ctx, cfg) =>
{
cfg.AddInterface<TestTransport>("Left", t => t.BrokerAlpha()).InMemorySubscriptions();
cfg.AddInterface<TestTransport>("Right", t => t.BrokerBravo()).InMemorySubscriptions();

cfg.UseStaticRoutingProtocol().AddForwardRoute("Left", "Right");
cfg.AddRule(_ => new DropMessagesRule(ctx));
})
.WithEndpoint<Sender>(c => c.When(s => s.Send(new MyRequest())))
.Done(c => c.Dropped)
.Run(TimeSpan.FromSeconds(20));

Assert.IsTrue(result.Dropped);
}

class DropMessagesRule : IRule<PreroutingContext, PreroutingContext>
{
Context scenarioContext;

public DropMessagesRule(Context scenarioContext)
{
this.scenarioContext = scenarioContext;
}

public async Task Invoke(PreroutingContext context, Func<PreroutingContext, Task> next)
{
context.DoNotRequireThisMessageToBeForwarded();
await next(context);
scenarioContext.Dropped = true;
}
}

class Context : ScenarioContext
{
public bool Dropped { get; set; }
}

class Sender : EndpointConfigurationBuilder
{
public Sender()
{
EndpointSetup<DefaultServer>(c =>
{
var routing = c.UseTransport<TestTransport>().BrokerAlpha().Routing();
var router = routing.ConnectToRouter("Router");
router.DelegateRouting(typeof(MyRequest));
c.Pipeline.Register(new RemoveIntentBehavior(), "Remove message intent header");
});
}

class RemoveIntentBehavior : Behavior<IDispatchContext>
{
public override Task Invoke(IDispatchContext context, Func<Task> next)
{
foreach (var operation in context.Operations)
{
operation.Message.Headers.Remove(Headers.MessageIntent);
}

return next();
}
}
}

class MyRequest : IMessage
{
}
}
}
8 changes: 7 additions & 1 deletion src/NServiceBus.Router/IRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ namespace NServiceBus.Router
public interface IRouter
{
/// <summary>
/// Starts the router.
/// Initializes the router.
/// </summary>
/// <returns></returns>
Task Initialize();

/// <summary>
/// Initializes and starts the router.
/// </summary>
Task Start();

Expand Down
30 changes: 25 additions & 5 deletions src/NServiceBus.Router/Pipeline/Prerouting/2_PreroutingContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@
/// </summary>
public class PreroutingContext : BasePreroutingContext
{
internal bool Forwarded;
internal bool Dropped;

internal PreroutingContext(RawContext parent) : base(parent)
{
Body = parent.Body;
Intent = GetMesssageIntent(parent.Headers);
Intent = GetMessageIntent(parent.Headers);
}
static MessageIntentEnum? GetMesssageIntent(IReadOnlyDictionary<string, string> headers)
static MessageIntentEnum? GetMessageIntent(IReadOnlyDictionary<string, string> headers)
{
var messageIntent = default(MessageIntentEnum);
if (headers.TryGetValue(NServiceBus.Headers.MessageIntent, out var messageIntentString))
{
Enum.TryParse(messageIntentString, true, out messageIntent);
Enum.TryParse<MessageIntentEnum>(messageIntentString, true, out var messageIntent);
return messageIntent;
}
return messageIntent;
return null;
}

/// <summary>
Expand All @@ -34,5 +37,22 @@ internal PreroutingContext(RawContext parent) : base(parent)
/// The body of the received message.
/// </summary>
public byte[] Body { get; set; }

/// <summary>
/// Mark this message as forwarded.
/// </summary>
/// <returns></returns>
public void MarkForwarded()
{
Forwarded = true;
}

/// <summary>
/// Marks this message as OK to be dropped if no chain terminator forwards it.
/// </summary>
public void DoNotRequireThisMessageToBeForwarded()
{
Dropped = true;
}
}
}
14 changes: 14 additions & 0 deletions src/NServiceBus.Router/Pipeline/Prerouting/PreroutingTerminator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Threading.Tasks;
using NServiceBus.Router;

class PreroutingTerminator : ChainTerminator<PreroutingContext>
{
protected override Task<bool> Terminate(PreroutingContext context)
{
if (!context.Dropped && !context.Forwarded)
{
throw new UnforwardableMessageException($"The incoming message {context.MessageId} has not been forwarded. This might indicate a configuration problem.");
}
return Task.FromResult(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using NServiceBus;
using NServiceBus.Router;

class PreroutingToPublishPreroutingFork : ChainTerminator<PreroutingContext>
class PreroutingToPublishPreroutingFork : IRule<PreroutingContext, PreroutingContext>
{
protected override async Task<bool> Terminate(PreroutingContext context)
public async Task Invoke(PreroutingContext context, Func<PreroutingContext, Task> next)
{
if (context.Intent == MessageIntentEnum.Publish)
{
Expand All @@ -20,10 +20,10 @@ await context.Chains.Get<PublishPreroutingContext>()
.Invoke(new PublishPreroutingContext(types, context))
.ConfigureAwait(false);

return true;
context.MarkForwarded();
}

return false;
await next(context).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Router;

class PreroutingToReplyPreroutingFork : ChainTerminator<PreroutingContext>
class PreroutingToReplyPreroutingFork : IRule<PreroutingContext, PreroutingContext>
{
protected override async Task<bool> Terminate(PreroutingContext context)
public async Task Invoke(PreroutingContext context, Func<PreroutingContext, Task> next)
{
if (context.Intent == MessageIntentEnum.Reply)
{
await context.Chains.Get<ReplyPreroutingContext>()
.Invoke(new ReplyPreroutingContext(context))
.ConfigureAwait(false);

return true;
context.MarkForwarded();
}

return false;
await next(context).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Router;

class PreroutingToSendPreroutingFork : ChainTerminator<PreroutingContext>
class PreroutingToSendPreroutingFork : IRule<PreroutingContext, PreroutingContext>
{
protected override async Task<bool> Terminate(PreroutingContext context)
public async Task Invoke(PreroutingContext context, Func<PreroutingContext, Task> next)
{
if (context.Intent == MessageIntentEnum.Send)
{
await context.Chains.Get<SendPreroutingContext>()
.Invoke(new SendPreroutingContext(context))
.ConfigureAwait(false);

return true;
context.MarkForwarded();
}

return false;
await next(context).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Router;

class PreroutingToSubscribePreroutingFork : ChainTerminator<PreroutingContext>
class PreroutingToSubscribePreroutingFork : IRule<PreroutingContext, PreroutingContext>
{
protected override async Task<bool> Terminate(PreroutingContext context)
public async Task Invoke(PreroutingContext context, Func<PreroutingContext, Task> next)
{
if (context.Intent == MessageIntentEnum.Subscribe
|| context.Intent == MessageIntentEnum.Unsubscribe)
Expand Down Expand Up @@ -41,10 +42,10 @@ await context.Chains.Get<UnsubscribePreroutingContext>()
.ConfigureAwait(false);
}

return true;
context.MarkForwarded();
}

return false;
await next(context).ConfigureAwait(false);
}

static string GetReplyToAddress(PreroutingContext message)
Expand Down
1 change: 1 addition & 0 deletions src/NServiceBus.Router/Router.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public static IRouter Create(RouterConfiguration config)
chains.AddChain(cb => cb.Begin<RawContext>().AddSection<PreroutingContext>().Terminate());
chains.AddRule(_ => new RawToPreroutingConnector());
chains.AddRule(c => new DetectCyclesRule(c.Endpoint.EndpointName));
chains.AddRule(_ => new PreroutingTerminator());

chains.AddRule(_ => new PreroutingToSubscribePreroutingFork());

Expand Down
Loading

0 comments on commit 8a7951f

Please sign in to comment.