-
Notifications
You must be signed in to change notification settings - Fork 685
[3.5 February 2014] Scatter Gather
[DESIGN CLOSED]
- Jira: https://www.mulesoft.org/jira/browse/MULE-7207
- Forum discussion: http://forum.mulesoft.org/mulesoft/topics/_mule_3_5_february_2014_scatter_gather
Mule implements Multicasting through the <all> router. Basically, this router has a list of different routes and sends a different copy of the current mule message through each one of them. This is done sequentially which is fine for use cases in which:
- route (n) depends on side effects generated by route (n-1)
- route (n+1) should be skipped if an exception is thrown by route (n)
The downside of processing sequentially if lack of efficiency when none of the above conditions are met. In that case, the best is to execute all routes concurrently.
A new element needs to be added so that all routes can be executed in parallel.
- As a developer, I want to be able to multicast a message in parallel. The thread executing the flow that owns the router should wait until all routes finish.
- As a developer, I want to be able to configure a timeout so that an exception is thrown if a route is not completed in a certain amount of time
- As a developer, I want exceptions to be grouped in case of routes failing
The router will execute all routes concurrently. The thread executing the flow that owns the router will wait until all routes complete or time out.
If there's no failure, results are aggregated into a MessageCollection. Failure in one route will not stop the other ones from being executed, which opens the possibility to many routes failing. The behaviour in that case will be to aggregate the resulting messages in a MessageCollection as usual, setting each aggregated message's exception payload accordingly. After setting this MessageCollection into the payload, a CompositeRoutingException will be thrown.
CompositeRoutingException will be a new type of exception extending MessagingException. It will map each exception to a route by using a sequential route id.
For each scattered event, a timeout will be used to make sure that no routes takes more than a certain amount of milliseconds in being completed. If the timeout is exceeded, a ResponseTimeoutException will be attached to that route
There're some scenarios in which a custom gathering strategy might be needed, for example:
- To merge message properties added by different routes.
- To discard failed messages without throwing exception
- To only select one response
- Etc
To do this, the following interface will be provided
public interface AggregationStrategy {
public MuleEvent aggregate(MuleEvent originalEvent, List<MuleEvent> events) throws MuleException;
}
The aggregate method will receive an ordered list in which each event's position matches the order of its corresponding route alongside the original event that was scattered.
Additionaly, you might want to also specify a custom way to handle exceptions. For those cases, this other interface will be provided:
public interface AggregationErrorHandler {
public MuleEvent handle(MuleEvent originalEvent, List<MuleEvent> successfulResponses, List<MuleEvent> failedResponses) throws MuleException;
}
NOTE: In case you're wondering, YES! in the future, we will be extending the use of these interfaces to other aggregation components.
At the beginning of this spec we already discussed the difference between doing sequential and parallel multicasting. The question you might be asking is "why a new element over adding a parallel='true' attribute on the <all> router?"
Short answer is behaviour. Parallel processing would force the <all> router to drastically change its behaviour and responses between sequential and parallel modes. We believe that's not a really good practice since it means that:
- Behaviour is not isolated: Changing the router mode will also affect any other MP or exception strategy expecting the router's output
- Backwards compatibility: Maintaining compatibility and feature parity between two modes of one single component is way more difficult than maintaining two separate elements, which might be a limitation in the future
Following is a comprehensive list of behaviour differences between <all> and <scatter-gather>
- When using <all> changes to the payload performed in route n are visible in route (n+1)
- When using <scatter-gather>, each route has different shallow copies of the original event
- <all> throws CouldNotRouteOutboundMessageException upon route failure and stops processing. When catching the exception, you'll have no information about the result of any prior routes
- <scatter-gather> will process all routes no matter what. It will also aggregate the results of all routes into a MuleMessageCollection in which each entry has the ExceptionPayload set accordingly and then will throw a CompositeRoutingException which will give you visibility over the output of other routes.
<scatter-gather>
<flow-ref name="A" />
<vm:outbound-endpoint path="foo" />
<sfdc:upsert ..... />
</scatter-gather>
This syntax assumes a default threading profile -the one returned by muleContext.getDefaultThreadingProfile() and without timeout
<scatter-gather timeout="5000">
<flow-ref name="A" />
<vm:outbound-endpoint path="foo" />
<sfdc:upsert ..... />
<custom-aggregation-strategy [class="org.my.CustomAggregationStrategy" | ref="beanReference"] />
<custom-aggregation-error-handler [class="org.my.CustomAggregationErrorHandler" | ref="beanReference"] />
<threading-profile maxThreadsActive="5" />
</scatter-gather>
###timeout
This attribute is in milliseconds and defaults to zero. Any value lower or equals to zero means no timeout.
###threading-profile
This element is optional. Allows configuring the underlying thread pool.
This optional element allows setting a custom gathering strategy. You can either set the attribute:
- class: A String with the canonical name of a class that implements AggregationStrategy. That class needs to have a default constructor
- ref: the name of a registered bean that implements AggregationStrategy.
This optional element allows setting a custom exception handle. You can either set the attribute:
- class: A String with the canonical name of a class that implements AggregationErrorHandler. That class needs to have a default constructor
- ref: the name of a registered bean that implements AggregationErrorHandler.
You can't set name and ref at the same time. Doing so will result in an exception when starting the application
User confusion regarding differences between <all> and <scatter-gather> and when to use which.
Studio needs to update their editors to support this new feature
No impact
No impact
No impact
No impact
No impact since this is a new feature
- Update docs reflecting this new feature
- Update training documents