Skip to content

Commit

Permalink
Fix broken forward verification rule
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Nov 15, 2019
1 parent b2300eb commit e2ce1cb
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
/// </summary>
public class ForwardPublishContext : BaseForwardRuleContext
{
internal bool Forwarded;
internal bool Dropped;

/// <summary>
/// Creates new instance.
/// </summary>
Expand Down Expand Up @@ -38,5 +41,22 @@ public ForwardPublishContext(string outgoingInterface, Type rootType, PublishPre
/// The headers associated with the received message.
/// </summary>
public byte[] ReceivedBody { get; }

/// <summary>
/// Mark this message as forwarded.
/// </summary>
/// <returns></returns>
public void MarkForwarded()
{
Forwarded = true;
}

/// <summary>
/// Marks this message as OK to be dropped if no chain terminator forwards it.
/// </summary>
public void DoNotRequireThisMessageToBeForwarded()
{
Dropped = true;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,23 @@ protected override async Task<bool> Terminate(PublishPreroutingContext context)
{
var outgoingInterfaces = allInterfaces.Where(i => i != context.IncomingInterface);

var verificationState = new PostroutingVerificationRule.State(context.MessageId);

var interfaces = context.Extensions.Get<IInterfaceChains>();
var forkTasks = outgoingInterfaces
.Select(iface =>
.Select(async iface =>
{
var chains = interfaces.GetChainsFor(iface);
var chain = chains.Get<ForwardPublishContext>();
var forwardPublishContext = new ForwardPublishContext(iface, typeGenerator.GetType(context.Types.First()), context);
forwardPublishContext.Extensions.Set(verificationState);
return chain.Invoke(forwardPublishContext);
});
await chain.Invoke(forwardPublishContext).ConfigureAwait(false);

await Task.WhenAll(forkTasks).ConfigureAwait(false);
return forwardPublishContext.Dropped || forwardPublishContext.Forwarded;
});

if (!verificationState.HasBeenForwarded)
{
var results = await Task.WhenAll(forkTasks).ConfigureAwait(false);
if (!results.Any())
{
throw new UnforwardableMessageException($"The incoming message {context.MessageId} has not been forwarded. This might indicate a configuration problem.");
}

return true;
}
}
1 change: 0 additions & 1 deletion src/NServiceBus.Router/Router.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public static IRouter Create(RouterConfiguration config)
chains.AddChain(cb => cb.Begin<PublishPreroutingContext>().Terminate());
chains.AddRule(c => new PublishPreroutingTerminator(interfaces.Select(i => i.Name).ToArray(), c.TypeGenerator));
chains.AddChain(cb => cb.Begin<ForwardPublishContext>().Terminate());
chains.AddRule(c => new PostroutingVerificationRule());

chains.AddRule(_ => new PreroutingToReplyPreroutingFork());
chains.AddChain(cb => cb.Begin<ReplyPreroutingContext>().Terminate());
Expand Down

0 comments on commit e2ce1cb

Please sign in to comment.