Skip to content

Commit

Permalink
Add ability to limit outgoing message rate
Browse files Browse the repository at this point in the history
  • Loading branch information
cordwelt committed Jul 12, 2022
1 parent 2916f38 commit 410d9ae
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 50 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Connect (3.10.2)
# Connect (3.11.0)

The "Connect" component is responsible for the communication with a target system.
This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively.
Expand Down Expand Up @@ -28,6 +28,7 @@ Parameters:
+ name - the service name that will be displayed in the events inside the report;
+ settings - the parameters that will be transformed to the actual service's settings specified in the **services.xml** file.
+ maxMessageBatchSize - the limitation for message batch size which connect sends to the first and to the second publish pins with. The default value is set to 100.
+ maxMessageRate - max outgoing message rate in messages per second
+ enableMessageSendingEvent - if this option is set to `true`, connect sends a separate event for every message sent which incomes from the pin with the send attribute. The default value is set to true

## Metrics
Expand Down Expand Up @@ -102,23 +103,28 @@ spec:
type: "th2_service:Your_Service_Type"
name: "your_service"
maxMessageBatchSize: 100
maxMessageRate: 100000
enableMessageSendingEvent: true
settings:
param1: "value1"
pins:
- name: in_raw
connection-type: mq
attributes: ["first", "raw", "publish", "store"]
attributes: [ "first", "raw", "publish", "store" ]
- name: out_raw
connection-type: mq
attributes: ["second", "raw", "publish", "store"]
attributes: [ "second", "raw", "publish", "store" ]
- name: to_send
connection-type: mq
attributes: ["send", "raw", "subscribe"]
attributes: [ "send", "raw", "subscribe" ]
```

## Release notes

### 3.11.0

+ Outgoing message rate now can be limited via `maxMessageRate` setting

### 3.10.2

+ Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event.
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = 3.10.2
release_version=3.11.0
29 changes: 19 additions & 10 deletions src/main/java/com/exactpro/th2/conn/MessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
*/
package com.exactpro.th2.conn;

import static java.util.Objects.requireNonNull;

import java.io.IOException;
import java.util.Base64;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.exactpro.sf.common.messages.IMetadata;
import com.exactpro.sf.common.messages.MetadataExtensions;
import com.exactpro.sf.common.messages.impl.Metadata;
Expand All @@ -40,8 +31,15 @@
import com.exactpro.th2.conn.events.EventHolder;
import com.exactpro.th2.conn.utility.EventStoreExtensions;
import com.exactpro.th2.conn.utility.SailfishMetadataExtensions;

import io.reactivex.rxjava3.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Base64;
import java.util.Map;

import static java.util.Objects.requireNonNull;

public class MessageSender {
private static final String SEND_ATTRIBUTE = "send";
Expand All @@ -50,16 +48,26 @@ public class MessageSender {
private final MessageRouter<RawMessageBatch> router;
private final EventDispatcher eventDispatcher;
private final EventID untrackedMessagesRoot;
private final RateLimiter rateLimiter;
private volatile SubscriberMonitor subscriberMonitor;

public MessageSender(IServiceProxy serviceProxy,
MessageRouter<RawMessageBatch> router,
EventDispatcher eventDispatcher,
EventID untrackedMessagesRoot) {
this(serviceProxy, router, eventDispatcher, untrackedMessagesRoot, Integer.MAX_VALUE);
}

public MessageSender(IServiceProxy serviceProxy,
MessageRouter<RawMessageBatch> router,
EventDispatcher eventDispatcher,
EventID untrackedMessagesRoot,
int maxMessageRate) {
this.serviceProxy = requireNonNull(serviceProxy, "Service proxy can't be null");
this.router = requireNonNull(router, "Message router can't be null");
this.eventDispatcher = requireNonNull(eventDispatcher, "'Event dispatcher' can't be null");
this.untrackedMessagesRoot = requireNonNull(untrackedMessagesRoot, "'untrackedMessagesRoot' can't be null");
this.rateLimiter = new RateLimiter(maxMessageRate);
}

public void start() {
Expand All @@ -86,6 +94,7 @@ public void stop() throws IOException {
private void handle(String consumerTag, RawMessageBatch messageBatch) {
for (RawMessage protoMessage : messageBatch.getMessagesList()) {
try {
rateLimiter.acquire();
sendMessage(protoMessage);
} catch (InterruptedException e) {
logger.error("Send message operation interrupted. Consumer tag {}", consumerTag, e);
Expand Down
65 changes: 33 additions & 32 deletions src/main/java/com/exactpro/th2/conn/MicroserviceMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,6 @@
*/
package com.exactpro.th2.conn;

import static com.exactpro.th2.conn.utility.EventStoreExtensions.storeEvent;
import static com.exactpro.th2.conn.utility.MetadataProperty.PARENT_EVENT_ID;
import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.contains;
import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.getParentEventID;
import static io.reactivex.rxjava3.plugins.RxJavaPlugins.createSingleScheduler;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import static org.apache.commons.lang.StringUtils.repeat;
import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.time.Instant;
import java.util.Deque;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.exactpro.sf.common.messages.IMessage;
import com.exactpro.sf.common.services.ServiceName;
import com.exactpro.sf.comparison.conversion.ConversionException;
Expand Down Expand Up @@ -71,7 +42,6 @@
import com.exactpro.th2.conn.events.EventDispatcher;
import com.exactpro.th2.conn.events.EventType;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
Expand All @@ -80,6 +50,34 @@
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.subscribers.DisposableSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.time.Instant;
import java.util.Deque;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.exactpro.th2.conn.utility.EventStoreExtensions.storeEvent;
import static com.exactpro.th2.conn.utility.MetadataProperty.PARENT_EVENT_ID;
import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.contains;
import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.getParentEventID;
import static io.reactivex.rxjava3.plugins.RxJavaPlugins.createSingleScheduler;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import static org.apache.commons.lang.StringUtils.repeat;
import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;

public class MicroserviceMain {
private static final Logger LOGGER = LoggerFactory.getLogger(MicroserviceMain.class);
Expand Down Expand Up @@ -174,8 +172,11 @@ public static void main(String[] args) {

MessageRouter<RawMessageBatch> rawMessageRouter = factory.getMessageRouterRawBatch();

MessageSender messageSender = new MessageSender(serviceProxy, rawMessageRouter, eventDispatcher,
EventID.newBuilder().setId(untrackedSentMessages.getId()).build()
MessageSender messageSender = new MessageSender(serviceProxy,
rawMessageRouter,
eventDispatcher,
EventID.newBuilder().setId(untrackedSentMessages.getId()).build(),
configuration.getMaxMessageRate()
);
disposer.register(() -> {
LOGGER.info("Stop 'message send' listener");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@

package com.exactpro.th2.conn.configuration;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Map;

public class ConnectivityConfiguration {
private boolean enableMessageSendingEvent = true;

private int maxMessageBatchSize = 100;

private int maxMessageRate = Integer.MAX_VALUE;

@JsonProperty(value = "session-alias", required = true)
private String sessionAlias;

@JsonProperty(value = "workspace",required = true)
@JsonProperty(value = "workspace", required = true)
private String workspaceFolder;

@JsonProperty(required = true)
Expand All @@ -45,6 +47,10 @@ public int getMaxMessageBatchSize() {
return maxMessageBatchSize;
}

public int getMaxMessageRate() {
return maxMessageRate;
}

public String getSessionAlias() {
return sessionAlias;
}
Expand Down
64 changes: 64 additions & 0 deletions src/main/kotlin/com/exactpro/th2/conn/RateLimiter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2022-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.conn

import java.lang.Double.min
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.locks.LockSupport
import javax.annotation.concurrent.ThreadSafe

@ThreadSafe
class RateLimiter(rate: Int) {
init {
require(rate > 0) { "rate must be positive" }
}

private val maxPermits = rate.toDouble()
private val permitDuration = SECONDS.toNanos(1) / maxPermits
private var freePermits = 0.0
private var syncTime = 0L

fun acquire() = acquire(1)

fun acquire(permits: Int) {
var currentTime = System.nanoTime()
val waitUntilTime = getWaitUntilTime(permits, currentTime)

while (waitUntilTime > currentTime) {
LockSupport.parkNanos(waitUntilTime - currentTime)
currentTime = System.nanoTime()
}
}

private fun getWaitUntilTime(permits: Int, currentTime: Long): Long = synchronized(this) {
if (currentTime > syncTime) {
val newPermits = (currentTime - syncTime) / permitDuration
freePermits = min(maxPermits, freePermits + newPermits)
syncTime = currentTime
}

val waitUntilTime = syncTime
val stalePermits = min(permits.toDouble(), freePermits)
val freshPermits = permits - stalePermits
val syncTimeOffset = (freshPermits * permitDuration).toLong()

syncTime += syncTimeOffset
freePermits -= stalePermits

return waitUntilTime
}
}

0 comments on commit 410d9ae

Please sign in to comment.