Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[th2-2552] backpressure: added check for queue size limit #184

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 common library (Java) (3.33.0)
# th2 common library (Java) (3.34.0)

## Usage

Expand Down Expand Up @@ -80,6 +80,8 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10.
* messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100
* secondsToCheckVirtualQueueLimit - this option defines an interval in seconds between size check attempts, default = 10
* batchesToCheckVirtualQueueLimit - this option defines the number of batches between size check attempts, default = 10000

```json
{
Expand All @@ -95,7 +97,9 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
"minConnectionRecoveryTimeout": 10000,
andrew-drobynin marked this conversation as resolved.
Show resolved Hide resolved
"maxConnectionRecoveryTimeout": 60000,
"prefetchCount": 10,
"messageRecursionLimit": 100
"messageRecursionLimit": 100,
"secondsToCheckVirtualQueueLimit": 10,
"batchesToCheckVirtualQueueLimit": 10000
andrew-drobynin marked this conversation as resolved.
Show resolved Hide resolved
}
```

Expand All @@ -117,6 +121,7 @@ The `CommonFactory` reads a message's router configuration from the `mq.json` fi
* filters - pin's message's filters
* metadata - a metadata filters
* message - a message's fields filters
* virtualQueueLimit - MQ router calculates destination queues and compares their current size to this value. The router blocks the current thread to repeat the comparison if the size of any destination queues exceeds the virtual limit

Filters format:
* fieldName - a field's name
Expand Down Expand Up @@ -154,7 +159,8 @@ Filters format:
"operation": "WILDCARD"
}
]
}
},
"virtualQueueLimit": 10000
andrew-drobynin marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -288,6 +294,12 @@ dependencies {

## Release notes

### 3.34.0

+ Added backpressure support: lock sending if queue virtual size limit is exceeded
+ Added parameter `virtualQueueLimit` to `mq.json`
+ Added parameters `secondsToCheckVirtualQueueLimit` and `batchesToCheckVirtualQueueLimit` to `mq_router.json`

### 3.33.0

+ Added ability to read dictionaries by aliases and as group of all available aliases
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ dependencies {
implementation "io.grpc:grpc-netty"

implementation "com.rabbitmq:amqp-client"
implementation 'com.rabbitmq:http-client:4.0.0'

implementation "org.jetbrains:annotations"

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
# Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -13,7 +13,7 @@
# limitations under the License.
#

release_version=3.33.0
release_version=3.34.0

description = 'th2 common library (Java)'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,12 @@ protected PrometheusConfiguration loadPrometheusConfiguration() {
}

protected ConnectionManager createRabbitMQConnectionManager() {
return new ConnectionManager(getRabbitMqConfiguration(), getConnectionManagerConfiguration(), livenessMonitor::disable);
return new ConnectionManager(
getRabbitMqConfiguration(),
getConnectionManagerConfiguration(),
getMessageRouterConfiguration(),
livenessMonitor::disable
);
}

protected ConnectionManager getRabbitMqConnectionManager() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@ public abstract class AbstractRabbitSender<T> implements MessageSender<T> {
private final AtomicReference<String> exchangeName = new AtomicReference<>();
private final AtomicReference<ConnectionManager> connectionManager = new AtomicReference<>();
private final String th2Type;
private long sentBeforeQueueSizeCheck;

public AbstractRabbitSender(
@NotNull ConnectionManager connectionManager,
Expand All @@ -81,15 +82,20 @@ public void send(T value) throws IOException {
requireNonNull(value, "Value for send can not be null");

try {
ConnectionManager connection = this.connectionManager.get();
ConnectionManager connectionManager = this.connectionManager.get();
byte[] bytes = valueToBytes(value);
MESSAGE_SIZE_PUBLISH_BYTES
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
.inc(bytes.length);
MESSAGE_PUBLISH_TOTAL
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
.inc();
connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);
sentBeforeQueueSizeCheck++;
if (sentBeforeQueueSizeCheck > connectionManager.getConnectionManagerConfiguration().getBatchesToCheckVirtualQueueLimit()) {
connectionManager.lockSendingIfSizeLimitExceeded(routingKey.get());
sentBeforeQueueSizeCheck = 0;
}
connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'",
Expand Down
Loading