-
Notifications
You must be signed in to change notification settings - Fork 685
[3.5] Collection Handling
Jira: http://www.mulesoft.org/jira/browse/MULE-6894, http://www.mulesoft.org/jira/browse/MULE-6896
Forum discussion:
Collection handling in mule has always been a problem in terms of usability and behaviour. With the introduction of foreach we simplified a lot of use cases but still there are several other use cases that weren't resolved.
We now face the need of handling messages in mule that contain a collection of items. In some cases they can be processed independently but in other cases they must be treated as whole for instance to create a summary of the result of processing each record or to execute operations on the whole initial collection or a subset of the initial collection. Sometimes we need to treat collections as a whole and by item in the same flow. Examples of this operations are splitting items in different collections or removing duplicate items.
We also need to simplify error handling for these kind of data integration use cases. Something desirable is that while processing items, in case there is an error, we can continue processing the remaining items instead of stopping everything.
One other aspect that we need to solve is how to handle collection with thousands (or millions) of elements which potentially won't fit into memory. So we need to add the capability to stream collection items during message processing.
As a SaaS developer I need to be able to retrieve a list, probably really big (collection streaming), of items from a provider, enrich each item and based on the enrichment split items in two different collections (collection splitting). After that, every X items I need to execute a bulk operation (items aggregation). At the end I need a summary of which record were correctly processed and which of them failed (per item error handling).
In order to solve this problem we want to create a new set of scopes and each of them will execute a different operation over the collection. For all of this scopes:
- Input is a payload with a collection or a streaming collection.
- Output is a streaming collection. An streaming collection will be a regular iterable / iterator (TBD) from the user perspective. (collection streaming)
- Output contains an Iterator of Record were T is the actual input collection item. Class record will keep track of the actual item and the state (recordVars) related to that item.
- All the scopes will execute child message processors on demand for handling collection streaming. This means that next message processor after the scope will iterate over the collection items, and on demand, each item will be executed through the scope. (collection streaming)
- After processing an item through one of these scopes we will have the result as payload. This elements will also add state to the record which can be an ignore flag or a failed flag. Ignore flag indicate that an item was filtered out. Failed flag will indicate there was an error processing the item. The idea around this is to simplify summary creation at the end of the flow so we know exactly what happend with each of the items. Default behaviour will remain as in any other mule scope but we are going to provide a configuration option to enable this behaviour.
- Each scope understands OOTB the state of an item (value, ignore, fail) and will skip or process them accordingly (per item error handling).
- This functionality will initially go in a separate module which name it's still TBD. Tentative name is collection handling
Collection Operations (names may change):
- Foraech: For each item do something. Similar to current foreach, but streams results.
- Split: Split collection items in sub-collections
- Summarise: Creates a summarisation of the result of each record processing
This component will allow to do something for every item inside the payload collection. i.e.: for each salesforce account I need to verify if it exists or not in google as a contact.
<colection-handling:foreach>
<google-contacts:insert/>
</colection-handling:foreach>
Input:
- Collection payload (collection, array, iterable, iterator, etc)
- Collection Record (collection, array, iterable, iterator, etc) Output:
- Iterator of Record.
Behaviour:
- Child message processors will be executed for each item in the collection.
- The result of executing those message processors for each item is the result value stored in the output collection.
<colection-handling:foreach onFailureContinue="true">
<google:contacts-insert/>
</colection-handling:foreach>
onFailureContinue attribute: if true then processing will continue regardless there is an error for a particular item. If false (default value) then during an error execution stops and exception strategy is called.
Data associated with each collection item
We need a mechanism to hold data related to each payload item. This is required in order to store result of operations executed per item basis and then be able to use that data in other mule components. This is the case were we want to split a collection of items base on the existence or not of this item in an external system.
<colection-handling:foreach>
<enricher target="recordVars['exists']">
<google:contacts-insert/>
</enricher>
</colection-handling:foreach>
recordVars is a variable context created by collection handling module that allow to maintain state for each item inside a collection. Every time the item is in context as the payload users will be able to access that data.
Why not to use MuleMessageCollection and store each item state in flow vars: Using MuleMessageCollection has several disadvantages:
- After a collection operation it would not be clear what is the content of the payload. It has been proved by the use of the foreach that having a payload with a collection of items as output it's much simple to understand by the suer.
- Message aggregation: If the result payload must be a MuleMessageCollection then it's not clear what would happen with the flowVars defined inside the scope. Should all of them be maintained?. What happens when you define the same var for two different items?. What happens with the root message flow vars? Should them be present in the MuleMessage inside the collection operation?.
- MuleMessage access inside MuleMessageCollection: It's not possible for some components (i.e: MEL) to access message parts (flowVars, inbound and outbound properties, etc) of a MuleMessage inside a MuleMessageCollection
This component will allow to split a collection into different collections and do different processing for each of them. i.e.: I want to split salesforce account in those which already exists as google contacts and those which don't. Then for those which already exists I want to group them together in groups of 100 and do a bulk contact update and for those which don't exists I want to group them together in groups of 100 and do a bulk contact insert
<colection-handling:split-collection>
<colection-handling:split-condition when="recordVars['exists']"" batchSize="100">
<google-contacts:bulk-update/>
</colection-handling:split-condition>
<colection-handling:default-condition batchSize="100">
<google-contacts:bulk-insert/>
</colection-handling:default-condition>
</colection-handling:split-collection>
batchSize attribute: Defines the maximum amount of elements to process through the child MPs of each condition. Default value is infinite.
Input:
- Collection payload (collection, array, iterable, iterator, etc)
- Collection Record (collection, array, iterable, iterator, etc) Output:
- Iterator of Record.
Behaviour:
- For each item in the input collection the expression of the split-condition will be evaluated. Once there's a condition matching then that item is stored in the sub-collection of that split-condition and no other condition will be evaluated.
- Once an item is stored in one of the sub-collection, if that collection has batchSize items, then that sub-collection is executed through the child message processors of the split-condition element.
- The result of executing those message processors for each item is the result value stored in the output collection.
- The result of the complete execution of the split-collection element has exactly the same amount of items as the input collection.
Collection splitting will allow to split the result of a collection operation base on the execution. For that we are going to create specific condition elements inside collection splitter for success, ignored and failed items.
<colection-handling:split-collection>
<colection-handling:ignored-item-condition>
<custom-processor class="org.mule.example.CreateIngoredItemsSummary"/>
</colection-handling:ignored-item-condition>
<colection-handling:failed-item-condition>
<custom-processor class="org.mule.example.CreateFailedItemsSummary"/>
</colection-handling:failed-item-condition>
<colection-handling:successful-item-condition>
<custom-processor class="org.mule.example.CreateSuccessfulItemsSummary"/>
</colection-handling:successful-item-condition>
</colection-handling:split-collection>
This new element allows you to filter records inside a collection using an expression.
<colection-handling:collection-record-filter expression="[payload['status']=='uptodate']/>
Why not to name it collection-handling:filter Because calling it just filter would suggest that it has the same behaviour as other mule filters when it doesn't. Any ideas for a better name?.
Input:
- Collection payload (collection, array, iterable, iterator, etc)
- Collection Record (collection, array, iterable, iterator, etc)
Output:
- Iterator of Record.
Behaviour:
- For each item in the input collection the expression of the record-filter will be evaluated. If expression is true then record will be accepted by the filter, therefore will remain as a successful item in the collection. Otherwise it will be marked as an ignored record.
For other use cases in which you need to gather information for each record in order to be able to filter it you can use existent mule filters.
Use case: Filter all the records that already exists in an external system.
<collection-handling:foreach>
<enrich target="recordVars['exists']">
<external-system:record-exists/>
<enrich>
<message-filter>
<expression-filter expression="#[recordVars['exists']]"/>
</message-filter>
</collection-handling:foreach>
What happens with filtered records
In the same way as records that cause error are marked as failed when using onFailureContinue="true", records that are filtered are not removed but marked as filtered. This allows you to decide later if you want to process them or not in other collection handling elements.
How to process filtered records
To enable process of filtered records you can must use the attribute processIgnoredRecords="true".
<collection-handling:foreach>
<choice>
<when expression="#[payload.status == 'complete']">
<message-filter>
<expression-filter expression="#[false]"/>
</message-filter>
</when>
<otherwise>
<logger/>
</otherwise>
</choice>
</collection-handling:foreach>
<collection-handling:foreach processIgnoredRecords="true">
<!-- here you will have filtered records and successful records -->
</collection-handling:foreach>
How to process failed records
To enable process of failed records you can must use the attribute processFailedRecords="true".
<collection-handling:foreach>
<some-operation-that-can-fail/>
</collection-handling:foreach>
<collection-handling:foreach processFailedRecords="true">
<!-- here you will have failed records and successful records -->
</collection-handling:foreach>
Collection handling module will include extensions for MEL:
- recordVars: This are variables at the record level
- function isSuccessfulRecord(): Returns true if the current record was not mark as failed or ignored, false otherwise.
- function isFailedRecord(): Returns true if the current record is marked as failed, false otherwise.
- function isIgnoredRecord(): Returns true if the current record is marked as ignored, false otherwise.
- function failureException(): Returns the exception that cause this record to be marked as failed. It will fail if the record was not marked as failed.
This component can wrap a set of collection operations and will cause that those operations be executed using streaming. This prevents memory starvation when consuming big data.
<colection-handling:streaming>
<collection-handling:foreach>
<enricher target="recordVars['exists']">
<choice>
<when expression="#[payload['id'].contains('e')]">
<set-payload value="#[true]"/>
</when>
<otherwise>
<set-payload value="#[false]"/>
</otherwise>
</choice>
</enricher>
<expression-component>recordVars['id'] = payload['id']</expression-component>
</collection-handling:foreach>
<collection-handling:split-collection>
<collection-handling:split-condition when="#[recordVars['exists']]" batchSize="100">
<custom-processor class="org.mule.module.collectionhandling.CollectionSplitTestCase$BulkInsert"/>
<!-- here bulk operation returns an object which contains status for each element that was inserted -->
<expression-component>recordVars['status'] = 'inserted'</expression-component>
</collection-handling:split-condition>
<collection-handling:default-condition batchSize="100">
<custom-processor class="org.mule.module.collectionhandling.CollectionSplitTestCase$BulkUpdate"/>
<expression-component>recordVars['status'] = 'updated'</expression-component>
</collection-handling:default-condition>
</collection-handling:split-collection>
<collection-handling:foreach>
<logger level="ERROR" message="Payload with id #[recordVars['status']] was #[recordVars['status']] and result was #[payload['status'].get(recordVars['id'])]"/>
</collection-handling:foreach>
</collection-handling:streaming>
Input:
- Collection payload (collection, array, iterable, iterator, etc)
- Collection Record (collection, array, iterable, iterator, etc)
Output:
- Consumed stream
Behaviour:
- Child message processors are executed doing streaming.
Why not do streaming alway Because streaming has certain limitations and in most of the use cases users won't need to do streaming thus they don't need to worry about these limitations.
Limitations
- Streaming delays the execution of MPs inside each collection handling scope (foreach, split, etc) until they are consumed from the collection stream. This requires that the collection stream must be consumed inside the context of the flow were the collection streaming is happening in order to avoid lossing the context of execution (java method call stack providing error handling, transaction management, intercepting calls, etc).
- The limitation described above also does not allow to include flow-ref, vm endpoints or async scope within streaming scope.
Why not to use an attribute in each collection handling scope Because when using streaming most likely there's going a be a set of collection handling scopes that will have to execute using streaming. So that would require adding the attribute in all of them which is cumbersome and error prone. It also does not provides a clear delimitation of which part of the flow is doing streaming.
How can I stream the collection as response of the Message Source
This is a limitation of using an scope for defining streaming boundaries. What we can do to overcome this problem is to add an attribute in the streaming scope to allow to leave the stream open. As this won't be the default behaviour it's unlikely that an user configures it without knowing the implications.
- Need to think clearly how to integrate this changes with Foreach and spliter aggregator to avoid breaking backward compatibility and making them work nice together
- Unforeseen issues may appear while implementing streaming capabilities
Requires changes in studio to support this new module. Mockups will be created an attached to spec.
Initially no impact. May require changes to add monitoring if we want to adapt this approach for batch processing
Initially no impact. May require changes to add monitoring if we want to adapt this approach for batch processing
No impact
No impact yet. Changes may be required after analysing foreach and splitter aggregator integration
Requires a new documentation section for this module.