Skip to content
Mariano Gonzalez edited this page Nov 10, 2013 · 5 revisions

Right now, when a poll needs to deal with updated objects, we use watermarks as an approach to filter out elements that were already processed. Watermarks require a value to be persisted across flow invocations. Typically, this value will have a default the first time used, and will be used as part of a query or outbound request. Based on how the flow processes the results, the value may be updated.

This can be done using several steps:

  • Fetch a value form a persistent object store and save it as a flow variable
  • Refer to the flow variable when doing a query / outbound request
  • Update the variable based (either once - i.e. the watermark is the date of the query, or per record - i.e. the watermark is the highest id from an object)
  • At the end of the processing, persist the value to the persistent object store

This pattern is very common in synchronization flows. Enough common to deserve better support. To make this easier out of the box we will add a new optional parameter to that will provide the watermarking variable persistence out of the box.

Example:

<flow name="pollSalesforce">
      <poll frequency="10m">
         <watermark key="lastModifiedDate"
             default-expression="#[lastYear()]"
             object-store-ref="myOS" />
       <sfdc:search query="Select Id,Name from Account where LastModifiedDate >= #[flowVars['lastModifiedDate']]"/>
       <foreach>
             <set-variable name="accountLastModified" value="max(payload['LastModifiedDate']], flowVars['accountLastModified'])"/>
       </foreach>
  .....
</flow>

Properties

  • variable (required): This attribute signals both the object store key that will be used to store this watermark as also the name of the flowVar in which its value will be exposed to the user.
  • default-expression (required): In case that the key above can't be found on the object store, the default expression is used to generate a value. This is useful for the first run of the flow
  • update-expression (optional): The result of this expression will be used to update the watermark once the execution is finished. It defaults to the value of the variable parameter.
  • objectStore-ref (optional): A reference to the object store you wish to use to store the watermarks. If not provided, the default user object store will be used

Behaviour

  • It is bound to the poll
  • It is bound to synchronous flows only. If not then it execution must fail
  • When executed, it goes to the configured object store looking for a value of the given key. If found, that value is exposed through a flowVar using the same key.
  • If no value is found for that key in the object store, then the default-expression is executed and that value is exposed through the flowVar.
  • Watermark will also fire a notification interceptable by the event tracking module. The underlaying tracking event will be enriched with the watermark value.
  • The message processors will be executed.
  • If the flow was correctly executed, then the watermark will be updated.
  • After updating the object store, another notification is fired to the event tracking system so that the new value can also be recorded there
  • If the intercepted processors throw any exceptions, then the watermark will not be updated. Since the watermark is stored in the object store, the user can always use a exception strategy to put a custom value in case of failure.
  • In case the watermark value is not serializable then fail the store of the watermark in the object store

Watermark update strategies

There will be two ways of obtaining the new watermark value

Update expression

The watermark will have an update-expression parameter which is basically a MEL expression. The result of evaluating that expression will be used as the new watermark value

Selectors

Selectors are built-in aggregation functions that operate over payloads of type Collection, Iterator and Iterable. One of these functions is evaluated against each and every one of the values in the given payload and they select one of them to be the new value. The available functions are MIN, MAX, FIRST and LAST.