diff --git a/README.md b/README.md index 2dd2fa7..3ac6876 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Connect (3.10.2) +# Connect (3.11.0) The "Connect" component is responsible for the communication with a target system. This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively. @@ -28,6 +28,7 @@ Parameters: + name - the service name that will be displayed in the events inside the report; + settings - the parameters that will be transformed to the actual service's settings specified in the **services.xml** file. + maxMessageBatchSize - the limitation for message batch size which connect sends to the first and to the second publish pins with. The default value is set to 100. ++ maxMessageRate - max outgoing message rate in messages per second + enableMessageSendingEvent - if this option is set to `true`, connect sends a separate event for every message sent which incomes from the pin with the send attribute. The default value is set to true ## Metrics @@ -102,23 +103,28 @@ spec: type: "th2_service:Your_Service_Type" name: "your_service" maxMessageBatchSize: 100 + maxMessageRate: 100000 enableMessageSendingEvent: true settings: param1: "value1" pins: - name: in_raw connection-type: mq - attributes: ["first", "raw", "publish", "store"] + attributes: [ "first", "raw", "publish", "store" ] - name: out_raw connection-type: mq - attributes: ["second", "raw", "publish", "store"] + attributes: [ "second", "raw", "publish", "store" ] - name: to_send connection-type: mq - attributes: ["send", "raw", "subscribe"] + attributes: [ "send", "raw", "subscribe" ] ``` ## Release notes +### 3.11.0 + ++ Outgoing message rate now can be limited via `maxMessageRate` setting + ### 3.10.2 + Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event. diff --git a/gradle.properties b/gradle.properties index 36f4a18..332e86d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version = 3.10.2 \ No newline at end of file +release_version=3.11.0 \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/conn/MessageSender.java b/src/main/java/com/exactpro/th2/conn/MessageSender.java index 044bf70..8905090 100644 --- a/src/main/java/com/exactpro/th2/conn/MessageSender.java +++ b/src/main/java/com/exactpro/th2/conn/MessageSender.java @@ -15,15 +15,6 @@ */ package com.exactpro.th2.conn; -import static java.util.Objects.requireNonNull; - -import java.io.IOException; -import java.util.Base64; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.exactpro.sf.common.messages.IMetadata; import com.exactpro.sf.common.messages.MetadataExtensions; import com.exactpro.sf.common.messages.impl.Metadata; @@ -40,8 +31,15 @@ import com.exactpro.th2.conn.events.EventHolder; import com.exactpro.th2.conn.utility.EventStoreExtensions; import com.exactpro.th2.conn.utility.SailfishMetadataExtensions; - import io.reactivex.rxjava3.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Base64; +import java.util.Map; + +import static java.util.Objects.requireNonNull; public class MessageSender { private static final String SEND_ATTRIBUTE = "send"; @@ -50,16 +48,26 @@ public class MessageSender { private final MessageRouter router; private final EventDispatcher eventDispatcher; private final EventID untrackedMessagesRoot; + private final RateLimiter rateLimiter; private volatile SubscriberMonitor subscriberMonitor; public MessageSender(IServiceProxy serviceProxy, MessageRouter router, EventDispatcher eventDispatcher, EventID untrackedMessagesRoot) { + this(serviceProxy, router, eventDispatcher, untrackedMessagesRoot, Integer.MAX_VALUE); + } + + public MessageSender(IServiceProxy serviceProxy, + MessageRouter router, + EventDispatcher eventDispatcher, + EventID untrackedMessagesRoot, + int maxMessageRate) { this.serviceProxy = requireNonNull(serviceProxy, "Service proxy can't be null"); this.router = requireNonNull(router, "Message router can't be null"); this.eventDispatcher = requireNonNull(eventDispatcher, "'Event dispatcher' can't be null"); this.untrackedMessagesRoot = requireNonNull(untrackedMessagesRoot, "'untrackedMessagesRoot' can't be null"); + this.rateLimiter = new RateLimiter(maxMessageRate); } public void start() { @@ -86,6 +94,7 @@ public void stop() throws IOException { private void handle(String consumerTag, RawMessageBatch messageBatch) { for (RawMessage protoMessage : messageBatch.getMessagesList()) { try { + rateLimiter.acquire(); sendMessage(protoMessage); } catch (InterruptedException e) { logger.error("Send message operation interrupted. Consumer tag {}", consumerTag, e); diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index 141c3f4..146eff5 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -15,35 +15,6 @@ */ package com.exactpro.th2.conn; -import static com.exactpro.th2.conn.utility.EventStoreExtensions.storeEvent; -import static com.exactpro.th2.conn.utility.MetadataProperty.PARENT_EVENT_ID; -import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.contains; -import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.getParentEventID; -import static io.reactivex.rxjava3.plugins.RxJavaPlugins.createSingleScheduler; -import static java.util.Objects.requireNonNull; -import static org.apache.commons.lang.StringUtils.containsIgnoreCase; -import static org.apache.commons.lang.StringUtils.repeat; -import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper; -import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.time.Instant; -import java.util.Deque; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.exactpro.sf.common.messages.IMessage; import com.exactpro.sf.common.services.ServiceName; import com.exactpro.sf.comparison.conversion.ConversionException; @@ -71,7 +42,6 @@ import com.exactpro.th2.conn.events.EventDispatcher; import com.exactpro.th2.conn.events.EventType; import com.google.common.util.concurrent.ThreadFactoryBuilder; - import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Scheduler; @@ -80,6 +50,34 @@ import io.reactivex.rxjava3.processors.FlowableProcessor; import io.reactivex.rxjava3.processors.UnicastProcessor; import io.reactivex.rxjava3.subscribers.DisposableSubscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.time.Instant; +import java.util.Deque; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.exactpro.th2.conn.utility.EventStoreExtensions.storeEvent; +import static com.exactpro.th2.conn.utility.MetadataProperty.PARENT_EVENT_ID; +import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.contains; +import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.getParentEventID; +import static io.reactivex.rxjava3.plugins.RxJavaPlugins.createSingleScheduler; +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang.StringUtils.containsIgnoreCase; +import static org.apache.commons.lang.StringUtils.repeat; +import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper; +import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; public class MicroserviceMain { private static final Logger LOGGER = LoggerFactory.getLogger(MicroserviceMain.class); @@ -174,8 +172,11 @@ public static void main(String[] args) { MessageRouter rawMessageRouter = factory.getMessageRouterRawBatch(); - MessageSender messageSender = new MessageSender(serviceProxy, rawMessageRouter, eventDispatcher, - EventID.newBuilder().setId(untrackedSentMessages.getId()).build() + MessageSender messageSender = new MessageSender(serviceProxy, + rawMessageRouter, + eventDispatcher, + EventID.newBuilder().setId(untrackedSentMessages.getId()).build(), + configuration.getMaxMessageRate() ); disposer.register(() -> { LOGGER.info("Stop 'message send' listener"); diff --git a/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java b/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java index 691017b..558ac94 100644 --- a/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java +++ b/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java @@ -13,19 +13,21 @@ package com.exactpro.th2.conn.configuration; -import java.util.Map; - import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; + public class ConnectivityConfiguration { private boolean enableMessageSendingEvent = true; private int maxMessageBatchSize = 100; + private int maxMessageRate = Integer.MAX_VALUE; + @JsonProperty(value = "session-alias", required = true) private String sessionAlias; - @JsonProperty(value = "workspace",required = true) + @JsonProperty(value = "workspace", required = true) private String workspaceFolder; @JsonProperty(required = true) @@ -45,6 +47,10 @@ public int getMaxMessageBatchSize() { return maxMessageBatchSize; } + public int getMaxMessageRate() { + return maxMessageRate; + } + public String getSessionAlias() { return sessionAlias; } diff --git a/src/main/kotlin/com/exactpro/th2/conn/RateLimiter.kt b/src/main/kotlin/com/exactpro/th2/conn/RateLimiter.kt new file mode 100644 index 0000000..802f954 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/RateLimiter.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2022-2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.conn + +import java.lang.Double.min +import java.util.concurrent.TimeUnit.SECONDS +import java.util.concurrent.locks.LockSupport +import javax.annotation.concurrent.ThreadSafe + +@ThreadSafe +class RateLimiter(rate: Int) { + init { + require(rate > 0) { "rate must be positive" } + } + + private val maxPermits = rate.toDouble() + private val permitDuration = SECONDS.toNanos(1) / maxPermits + private var freePermits = 0.0 + private var syncTime = 0L + + fun acquire() = acquire(1) + + fun acquire(permits: Int) { + var currentTime = System.nanoTime() + val waitUntilTime = getWaitUntilTime(permits, currentTime) + + while (waitUntilTime > currentTime) { + LockSupport.parkNanos(waitUntilTime - currentTime) + currentTime = System.nanoTime() + } + } + + private fun getWaitUntilTime(permits: Int, currentTime: Long): Long = synchronized(this) { + if (currentTime > syncTime) { + val newPermits = (currentTime - syncTime) / permitDuration + freePermits = min(maxPermits, freePermits + newPermits) + syncTime = currentTime + } + + val waitUntilTime = syncTime + val stalePermits = min(permits.toDouble(), freePermits) + val freshPermits = permits - stalePermits + val syncTimeOffset = (freshPermits * permitDuration).toLong() + + syncTime += syncTimeOffset + freePermits -= stalePermits + + return waitUntilTime + } +} \ No newline at end of file