Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[th2-2552] backpressure: added check for queue size limit #184

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
65 changes: 45 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 common library (Java) (3.33.0)
# th2 common library (Java) (3.34.0)

## Usage

Expand Down Expand Up @@ -35,8 +35,9 @@ Then you will create an instance of imported class, by choosing one of the follo
1. rabbitMq.json - configuration for RabbitMQ
2. mq.json - configuration for MessageRouter
3. grpc.json - configuration for GrpcRouter
4. cradle.json - configuration for cradle
5. custom.json - custom configuration
4. cradle.json - confidential configuration for cradle
5. cradle_manager.json - non confidential configuration for cradle
6. custom.json - custom configuration

The second group:
* --namespace - the namespace in Kubernetes to search config maps
Expand All @@ -62,7 +63,7 @@ Then you will create an instance of imported class, by choosing one of the follo

### Configuration formats

The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
The `CommonFactory` reads a RabbitMQ configuration from the `rabbitMQ.json` file.
* host - the required setting defines the RabbitMQ host.
* vHost - the required setting defines the virtual host that will be used for connecting to RabbitMQ.
Please see more details about the virtual host in RabbitMQ via [link](https://www.rabbitmq.com/vhosts.html)
Expand All @@ -72,14 +73,6 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
* password - the required setting defines the password that will be used for connecting to RabbitMQ.
* exchangeName - the required setting defines the exchange that will be used for sending/subscribing operation in MQ routers.
Please see more details about the exchanges in RabbitMQ via [link](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges)
* connectionTimeout - the connection TCP establishment timeout in milliseconds with its default value set to 60000. Use zero for infinite waiting.
* connectionCloseTimeout - the timeout in milliseconds for completing all the close-related operations, use -1 for infinity, the default value is set to 10000.
* maxRecoveryAttempts - this option defines the number of reconnection attempts to RabbitMQ, with its default value set to 5.
The `th2_readiness` probe is set to false and publishers are blocked after a lost connection to RabbitMQ. The `th2_readiness` probe is reverted to true if the connection will be recovered during specified attempts otherwise the `th2_liveness` probe will be set to false.
* minConnectionRecoveryTimeout - this option defines a minimal interval in milliseconds between reconnect attempts, with its default value set to 10000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10.
* messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100

```json
{
Expand All @@ -88,14 +81,34 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
"port": 5672,
"username": "<user name>",
"password": "<password>",
"exchangeName": "<exchange name>",
"exchangeName": "<exchange name>"
}
```

The `CommonFactory` reads a RabbitMQ connection configuration from the `mq_router.json` file.
* subscriberName - the client-generated consumer tag to establish context, default = (`rabbit_mq_subscriber.` + current time in milliseconds)
* connectionTimeout - the connection TCP establishment timeout in milliseconds with its default value set to 60000. Use zero for infinite waiting.
* connectionCloseTimeout - the timeout in milliseconds for completing all the close-related operations, use -1 for infinity, the default value is set to 10000.
* maxRecoveryAttempts - this option defines the number of reconnection attempts to RabbitMQ, with its default value set to 5.
The `th2_readiness` probe is set to false and publishers are blocked after a lost connection to RabbitMQ. The `th2_readiness` probe is reverted to true if the connection will be recovered during specified attempts otherwise the `th2_liveness` probe will be set to false.
* minConnectionRecoveryTimeout - this option defines a minimal interval in milliseconds between reconnect attempts, with its default value set to 10000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10.
* messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100
* virtualPublishLimit - MQ router requests actual queue set bound to routing key related to publish pin and then get maximum queue size to this value. The router blocks sending via pin where the limit is exceeded and execute regular check of the current state of the bound queues. The block will be lifted when the max size of queues go down less than the virtual publish limit.
* secondsToCheckVirtualPublishLimit - this option defines an interval in seconds between size check attempts, default = 10
```json
{
"subscriberName": "<subscriber name>",
"connectionTimeout": 60000,
"connectionCloseTimeout": 10000,
"maxRecoveryAttempts": 5,
"minConnectionRecoveryTimeout": 10000,
andrew-drobynin marked this conversation as resolved.
Show resolved Hide resolved
"maxConnectionRecoveryTimeout": 60000,
"prefetchCount": 10,
"messageRecursionLimit": 100
"messageRecursionLimit": 100,
"virtualPublishLimit": 10000,
"secondsToCheckVirtualPublishLimit": 10
}
```

Expand Down Expand Up @@ -160,18 +173,14 @@ Filters format:
}
```

The `CommonFactory` reads a Cradle configuration from the cradle.json file.
The `CommonFactory` reads a Cradle configuration from the `cradle.json` file.
* dataCenter - the required setting defines the data center in the Cassandra cluster.
* host - the required setting defines the Cassandra host.
* port - the required setting defines the Cassandra port.
* keyspace - the required setting defines the keyspace (top-level database object) in the Cassandra data center.
* username - the required setting defines the Cassandra username. The user must have permission to write data using a specified keyspace.
* password - the required setting defines the password that will be used for connecting to Cassandra.
* cradleInstanceName - this option defines a special identifier that divides data within one keyspace with infra set as the default value.
* cradleMaxEventBatchSize - this option defines the maximum event batch size in bytes with its default value set to 1048576.
* cradleMaxMessageBatchSize - this option defines the maximum message batch size in bytes with its default value set to 1048576.
* timeout - this option defines connection timeout in milliseconds. If set to 0 or ommited, the default value of 5000 is used.
* pageSize - this option defines the size of the result set to fetch at a time. If set to 0 or ommited, the default value of 5000 is used.

```json
{
Expand All @@ -181,7 +190,18 @@ The `CommonFactory` reads a Cradle configuration from the cradle.json file.
"keyspace": "<keyspace>",
"username": "<username>",
"password": "<password>",
"cradleInstanceName": "<cradle instance name>",
"cradleInstanceName": "<cradle instance name>"
}
```

The `CommonFactory` reads a Cradle configuration from the `cradle_manager.json` file.
* cradleMaxEventBatchSize - this option defines the maximum event batch size in bytes with its default value set to 1048576.
* cradleMaxMessageBatchSize - this option defines the maximum message batch size in bytes with its default value set to 1048576.
* timeout - this option defines connection timeout in milliseconds. If set to 0 or ommited, the default value of 5000 is used.
* pageSize - this option defines the size of the result set to fetch at a time. If set to 0 or ommited, the default value of 5000 is used.

```json
{
"cradleMaxEventBatchSize": 1048576,
"cradleMaxMessageBatchSize": 1048576,
"timeout": 5000,
Expand Down Expand Up @@ -288,6 +308,11 @@ dependencies {

## Release notes

### 3.34.0

+ Added backpressure support: lock sending if queue virtual size limit is exceeded
+ Added parameters `virtualPublishLimit` and `secondsToCheckVirtualPublishLimit` to `mq_router.json`

### 3.33.0

+ Added ability to read dictionaries by aliases and as group of all available aliases
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ dependencies {
implementation "io.grpc:grpc-netty"

implementation "com.rabbitmq:amqp-client"
implementation 'com.rabbitmq:http-client:4.0.0'

implementation "org.jetbrains:annotations"

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
# Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -13,7 +13,7 @@
# limitations under the License.
#

release_version=3.33.0
release_version=3.34.0

description = 'th2 common library (Java)'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@ public abstract class AbstractRabbitSender<T> implements MessageSender<T> {
private final AtomicReference<String> exchangeName = new AtomicReference<>();
private final AtomicReference<ConnectionManager> connectionManager = new AtomicReference<>();
private final String th2Type;
private long sentBeforeQueueSizeCheck;

public AbstractRabbitSender(
@NotNull ConnectionManager connectionManager,
Expand All @@ -81,15 +82,20 @@ public void send(T value) throws IOException {
requireNonNull(value, "Value for send can not be null");

try {
ConnectionManager connection = this.connectionManager.get();
ConnectionManager connectionManager = this.connectionManager.get();
byte[] bytes = valueToBytes(value);
MESSAGE_SIZE_PUBLISH_BYTES
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
.inc(bytes.length);
MESSAGE_PUBLISH_TOTAL
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
.inc();
connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);
sentBeforeQueueSizeCheck++;
if (sentBeforeQueueSizeCheck > connectionManager.getConnectionManagerConfiguration().getVirtualPublishLimit()) {
connectionManager.lockSendingIfSizeLimitExceeded();
sentBeforeQueueSizeCheck = 0;
}
connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'",
Expand Down
Loading