Services of this group provide tools for runtime analysis of data. For example, we can provide OLAP like instrument for analyse of data, groped by multi level hierarchy (group of meters, district, city) and different time windows (day, week, month, year).
For example, if have only two items in each dimension (day, week x group, city), after receiving each next measurement of some meter we have to increase values for 4 keys with delta, calculate by previous and current values of meter’s value
-
group_<G>_day_<D>
-
city_<C>_day_<D>
-
group_<G>_weak_<W>
-
city_<C>_weak_<W>
Values of <D> and <W> can be calculated as int(currentTime/timeWindowLength), so by discarding the remainder of the division, all data inside some time windows is aggregated in the same key.
Thus, we have to store in memory grid:
-
keys with current values for all meters
-
keys for aggregated data
fortunately, keys are independent of each other and Memory grid (for example, Infinispan or clustered REDIS) usually provides us the next features:
-
atomic execution of incremental operation with the key
-
atomic execution batch of incremental operation
-
automatic key distribution in the grid for reliable storage
The only question is atomic execution of the composite operation: "increment meter’s key with getting the old value" + "increment the value of aggregating keys". The reliable processing of data requires such atomicity, because instance of service can die between first and second operation.
To achieve it we can follow the next approach:
-
to avoid concurrent operations with one meter’s key split topic into partitions based on meter’s UUID (we already have id as a property of message due to outcome of Regrouping Service)
-
read the value of meter’s key and calculate the difference
-
atomic batch update of several keys at once:
-
update meter’s key with new value
-
increment all aggregating keys with difference
-
-
Implementation of this feature requires additional properties in the each message, describing the meter. This if the responsibility if the Enrichment Service
Implementation of runtime analysis feature is based on two services:
-
Enrichment servic
-
Runtime Analysis Service
Enrichment service is quite simple. It performs the only several operations with each message:
-
receive message (usually from Regrouping Service)
-
read meter’s UUID
-
read description of meter from some dictionary (and cache it locally, of course)
-
enrich the message with properties of the meter (group, district, city and so on)
-
send messsage to the Runtime Analysis Service
Use cases, related with meter’s disctionary are out of the scope of this document
Runtime Analysis Service is responsible for reliable update of Memory Grid. It performs the next operations:
-
receive message (usually from Enrichment Service)
-
read current meter’s value from Memory Grid
-
create batch update of several keys
-
apply batch update to Memory Grid
The key point of this service implementation is partitioning input topic by meter’s UUID for preventing concurent update of meter’s key.
The next diagram can illustrate this process:
database "Input\nTopic" control "Enrichment\nservice" database "Meter's Dictionary" database "Partitioned\nTopic" control "Analysis\nService" database "Dictionary of\ntime dimensions" database "Memory\nGrid" "Enrichment\nservice" -> "Input\nTopic": read message "Enrichment\nservice" -> "Meter's Dictionary": read meter's data "Enrichment\nservice" -> "Enrichment\nservice": enrich message with meter's data "Enrichment\nservice" -> "Partitioned\nTopic": send to analysis "Analysis\nService" -> "Partitioned\nTopic": read enriched message "Analysis\nService" -> "Memory\nGrid": read current value for meter "Analysis\nService" -> "Dictionary of\ntime dimensions": read list of demensions "Analysis\nService" -> "Analysis\nService": create batch\nupdate of several keys "Analysis\nService" -> "Memory\nGrid": apply batch update to grid