Message store (mstore) is an important th2 component responsible for storing raw messages into Cradle. Please refer to [Cradle repository] (https://github.com/th2-net/cradleapi/blob/master/README.md) for more details. This component has a pin for listening messages via MQ.
Users must mark a pin that produces protobuf raw messages in conn, read and hand boxes via the "store" attribute, in order to automatically connect that pin to mstore and to collect all messages into Cradle.
Protobuf raw message is a base entity of th2. All incoming / outgoing data is stored in this format Every protobuf raw message contains important parts:
- book - name of the book
- session alias - unique identifier of business session.
- session group - group id for this session
- direction - direction of message stream.
- sequence number - incremental identifier.
- data - byte representation of raw message
book, session alias, direction and sequence number are a compound unique identifier of protobuf raw messages within th2
Users must connect a pin that produces transport raw messages in conn, read and hand boxes via the general link approach, in order to manually connect that pin to mstore and to collect all messages into Cradle.
Transport raw message is a new entity of th2. You can read more details about th2 transport protocol by the [link] (https://exactpro.atlassian.net/wiki/spaces/TH2/pages/1048838145/TH2+Transport+Protocol). All incoming / outgoing data is stored in this format Every protobuf raw message contains important parts:
- book - name of the book
- session alias - unique identifier of business session.
- session group - group id for this session
- direction - direction of message stream.
- sequence number - incremental identifier.
- data - byte representation of raw message
book, session alias, direction and sequence number are a compound unique identifier of protobuf raw messages within th2
{
"maxTaskCount" : 256,
"maxTaskDataSize" : 256000000,
"maxRetryCount" : 5,
"retryDelayBase" : 5000,
"rebatching" : true,
"drain-interval" : 100,
"prefetchRatioToDrain" : 0.8,
"maxBatchSize" : 200000,
"termination-timeout" : 1000
}
- maxTaskCount - Maximum number of message batches that will be processed simultaneously
- maxTaskDataSize - Maximum total data size of messages during parallel processing
- maxRetryCount - Maximum number of retries that will be done in case of message batch persistence failure
- retryDelayBase - Constant that will be used to calculate next retry time(ms): retryDelayBase * retryNumber
- rebatching - Parameter determining if mstore should perform re-batching on incoming batches to avoid writing small batches
- drain-interval - Interval in milliseconds to drain all aggregated batches that are not stored yet. The default value is 1000.
- prefetchRatioToDrain - Threshold ratio of fetched messages to RabbitMQ prefetch size to force aggregated batch draining.
- maxBatchSize - Maximum aggregated batch size in case of re-batching
- termination-timeout - Timeout in milliseconds to await for the inner drain scheduler to finish all the tasks. The default value is 5000.
If some of these parameters are not provided, mstore will use default(undocumented) value. If maxTaskCount or maxTaskDataSize limits are reached during processing, mstore will pause processing new messages until some events are processed
Infra schema can only contain one mstore box description. It consists of one required option - docker image . Pin configuration is generated and managed by infra-operator.
General view of the component will look like this:
apiVersion: th2.exactpro.com/v1
kind: Th2Mstore
metadata:
name: mstore
spec:
image-name: ghcr.io/th2-net/th2-mstore
image-version: <image version>
custom-settings:
maxTaskCount: 256
maxTaskDataSize: 256000000
maxRetryCount: 5
retryDelayBase: 5000
rebatching: true
drain-interval: 100
prefetchRatioToDrain: 0.8
maxBatchSize: 200000
termination-timeout: 5000
pins:
mq:
subscribers:
- name: transport
attributes:
- transport-group
- subscribe
extended-settings:
service:
enabled: false
envVariables:
JAVA_TOOL_OPTIONS: "-XX:+ExitOnOutOfMemoryError -Ddatastax-java-driver.advanced.connection.init-query-timeout=\"5000 milliseconds\""
resources:
limits:
memory: 1024Mi
cpu: 2000m
requests:
memory: 512Mi
cpu: 1000m
This is a list of supported features provided by libraries. Please see more details about this feature via link.
- Updated th2 gradle plugin
0.1.1
- Updated common:
5.14.0-dev
- Updated cradle api:
5.4.1-dev
- Migrated to th2 gradle plugin
0.0.8
- Updated common:
5.12.0-dev
- Updated common-utils:
2.2.3-dev
- Migrated to th2 gradle plugin
0.0.6
- Updated bom:
4.6.1
- Updated common:
5.11.0-dev
- Updated cradle api:
5.3.0-dev
- Updated bom:
4.6.0
- Updated common:
5.9.1-dev
- Updated cradle api:
5.2.0-dev
- Updated common:
5.8.0-dev
- Add information about group and book into error message during batch processing
- Mstore publishes event with aggregated statistics about internal errors into event router periodically
- Updated common:
5.6.0-dev
- Added common-utils:
2.2.2-dev
- Migrated to the cradle version with fixed load pages where
removed
field is null problem. - Updated cradle:
5.1.4-dev
- Reverted cradle:
5.1.1-dev
because5.1.3-dev
can't work with pages whereremoved
field is null.
- Fixed the lost messages problem when mstore is restarted under a load
- Updated common:
5.4.1-dev
- Updated cradle:
5.1.3-dev
- Fixed the problem: batches for the same session group and different books are separate storing
- Updated bom:
4.5.0-dev
- Updated common:
5.4.0-dev
- Provided ability to process th2 transport messages
- Stores message properties to cradle
- Migration to books/pages cradle 5.0.0
- Added util methods from store-common
- Removed dependency to store-common
- Update common version from
3.18.0
to3.29.0
- Update store-common version from
3.1.0
to3.2.0
- Disable waiting for connection recovery when closing the
SubscribeMonitor
- Update Cradle version from
2.9.1
to2.13.0
- Rework logging for incoming and outgoing messages
- Resets embedded log4j configuration before configuring it from a file
- Compressed metadata for events
- Async API for storing messages