From 5403fa1a6aebb5ba38ed47a566aabd419dcefcf0 Mon Sep 17 00:00:00 2001 From: Oleg Smirnov Date: Tue, 24 Oct 2023 10:47:48 +0400 Subject: [PATCH] [TH2-5050] Decrease timeout for sending on each attempt (#67) * Descrease timeout for sending on each attempt * Set min timeout in tests * Add event notification when sending successful after failed attempts * Migrate to atomics for holding current state * Use locks under the hood * Move handler to the core part * Use dev release for tcp-dirty-core * Update version and readme --- README.md | 9 +++++++- build.gradle | 2 +- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 23 ++++++++++++------- .../com/exactpro/th2/FixHandlerSettings.java | 10 ++++++++ .../th2/FixHandlerSendTimeoutTest.java | 2 ++ 6 files changed, 37 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 8d43f35..b023e73 100644 --- a/README.md +++ b/README.md @@ -52,8 +52,10 @@ This microservice allows sending and receiving messages via FIX protocol + *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message. + *connectionTimeoutOnSend* - timeout in milliseconds for sending message from queue thread (please read about [acknowledgment timeout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) to understand the problem). - _Default, 30000 mls._ + _Default, 30000 mls._ Each failed sending attempt decreases the timeout in half (but not less than _minConnectionTimeoutOnSend_). + The timeout is reset to the original value after a successful sending attempt. If connection is not established within the specified timeout an error will be reported. ++ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._ ### Security settings @@ -333,6 +335,11 @@ spec: # Changelog +## 1.5.0 + +* `minConnectionTimeoutOnSend` parameter is added. +* Sending timeout now decreases in half on each failed attempt (but not less than `minConnectionTimeoutOnSend`). + ## 1.4.2 * Ungraceful session disconnect support. * Removed NPE when session is reset by schedule. diff --git a/build.gradle b/build.gradle index 2cd4ecb..4d1d377 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ dependencies { } implementation "com.exactpro.th2:common-utils:2.2.0-dev" implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' - implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-dev' + implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.3.0-dev' implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-dev' implementation 'org.slf4j:slf4j-api' diff --git a/gradle.properties b/gradle.properties index 66a2be3..e60bbc1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version=1.4.2 \ No newline at end of file +release_version=1.5.0 \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 40e4edc..21261bb 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -24,6 +24,7 @@ import com.exactpro.th2.common.utils.event.transport.EventUtilsKt; import com.exactpro.th2.conn.dirty.fix.FixField; import com.exactpro.th2.conn.dirty.fix.MessageLoader; +import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; @@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; + import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -165,6 +167,8 @@ public class FixHandler implements AutoCloseable, IHandler { private final AtomicReference> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); private final AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); + + private final SendingTimeoutHandler sendingTimeoutHandler; private Future reconnectRequestTimer = CompletableFuture.completedFuture(null); private volatile IChannel channel; protected FixHandlerSettings settings; @@ -235,9 +239,11 @@ public FixHandler(IHandlerContext context) { if (settings.getHeartBtInt() <= 0) throw new IllegalArgumentException("HeartBtInt cannot be negative or zero"); if (settings.getTestRequestDelay() <= 0) throw new IllegalArgumentException("TestRequestDelay cannot be negative or zero"); if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero"); - if (settings.getConnectionTimeoutOnSend() <= 0) { - throw new IllegalArgumentException("connectionTimeoutOnSend must be greater than zero"); - } + this.sendingTimeoutHandler = SendingTimeoutHandler.create( + settings.getMinConnectionTimeoutOnSend(), + settings.getConnectionTimeoutOnSend(), + context::send + ); } @Override @@ -276,7 +282,7 @@ private CompletableFuture send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map deadline) { // The method should have checked exception in signature... ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls", - settings.getConnectionTimeoutOnSend()))); + currentTimeout))); } } diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 99a21ce..4e7e0a2 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -75,6 +75,8 @@ public class FixHandlerSettings implements IHandlerSettings { */ private long connectionTimeoutOnSend = DEFAULT_CONNECTION_TIMEOUT_ON_SEND; + private long minConnectionTimeoutOnSend = 1_000; + @JsonDeserialize(using = DateTimeFormatterDeserializer.class) private DateTimeFormatter sendingDateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS"); @@ -317,4 +319,12 @@ public long getConnectionTimeoutOnSend() { public void setConnectionTimeoutOnSend(long connectionTimeoutOnSend) { this.connectionTimeoutOnSend = connectionTimeoutOnSend; } + + public long getMinConnectionTimeoutOnSend() { + return minConnectionTimeoutOnSend; + } + + public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) { + this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend; + } } diff --git a/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java b/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java index 4353258..5e81642 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java @@ -59,6 +59,7 @@ void sendTimeoutOnConnectionOpen() { settings.setPort(42); settings.setHost("localhost"); settings.setConnectionTimeoutOnSend(300); // 300 millis + settings.setMinConnectionTimeoutOnSend(100); Mockito.when(contextMock.getSettings()) .thenReturn(settings); var fixHandler = new FixHandler(contextMock); @@ -100,6 +101,7 @@ void sendTimeoutOnSessionEnabled() { settings.setPort(42); settings.setHost("localhost"); settings.setConnectionTimeoutOnSend(300); // 300 millis + settings.setMinConnectionTimeoutOnSend(100); LocalTime currentTime = LocalTime.now(ZoneOffset.UTC); int deltaMinutes = currentTime.isAfter(LocalTime.NOON) ? -1