From 5403fa1a6aebb5ba38ed47a566aabd419dcefcf0 Mon Sep 17 00:00:00 2001 From: Oleg Smirnov Date: Tue, 24 Oct 2023 10:47:48 +0400 Subject: [PATCH 1/3] [TH2-5050] Decrease timeout for sending on each attempt (#67) * Descrease timeout for sending on each attempt * Set min timeout in tests * Add event notification when sending successful after failed attempts * Migrate to atomics for holding current state * Use locks under the hood * Move handler to the core part * Use dev release for tcp-dirty-core * Update version and readme --- README.md | 9 +++++++- build.gradle | 2 +- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 23 ++++++++++++------- .../com/exactpro/th2/FixHandlerSettings.java | 10 ++++++++ .../th2/FixHandlerSendTimeoutTest.java | 2 ++ 6 files changed, 37 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 8d43f35..b023e73 100644 --- a/README.md +++ b/README.md @@ -52,8 +52,10 @@ This microservice allows sending and receiving messages via FIX protocol + *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message. + *connectionTimeoutOnSend* - timeout in milliseconds for sending message from queue thread (please read about [acknowledgment timeout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) to understand the problem). - _Default, 30000 mls._ + _Default, 30000 mls._ Each failed sending attempt decreases the timeout in half (but not less than _minConnectionTimeoutOnSend_). + The timeout is reset to the original value after a successful sending attempt. If connection is not established within the specified timeout an error will be reported. ++ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._ ### Security settings @@ -333,6 +335,11 @@ spec: # Changelog +## 1.5.0 + +* `minConnectionTimeoutOnSend` parameter is added. +* Sending timeout now decreases in half on each failed attempt (but not less than `minConnectionTimeoutOnSend`). + ## 1.4.2 * Ungraceful session disconnect support. * Removed NPE when session is reset by schedule. diff --git a/build.gradle b/build.gradle index 2cd4ecb..4d1d377 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ dependencies { } implementation "com.exactpro.th2:common-utils:2.2.0-dev" implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' - implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-dev' + implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.3.0-dev' implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-dev' implementation 'org.slf4j:slf4j-api' diff --git a/gradle.properties b/gradle.properties index 66a2be3..e60bbc1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version=1.4.2 \ No newline at end of file +release_version=1.5.0 \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 40e4edc..21261bb 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -24,6 +24,7 @@ import com.exactpro.th2.common.utils.event.transport.EventUtilsKt; import com.exactpro.th2.conn.dirty.fix.FixField; import com.exactpro.th2.conn.dirty.fix.MessageLoader; +import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; @@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; + import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -165,6 +167,8 @@ public class FixHandler implements AutoCloseable, IHandler { private final AtomicReference> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); private final AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); + + private final SendingTimeoutHandler sendingTimeoutHandler; private Future reconnectRequestTimer = CompletableFuture.completedFuture(null); private volatile IChannel channel; protected FixHandlerSettings settings; @@ -235,9 +239,11 @@ public FixHandler(IHandlerContext context) { if (settings.getHeartBtInt() <= 0) throw new IllegalArgumentException("HeartBtInt cannot be negative or zero"); if (settings.getTestRequestDelay() <= 0) throw new IllegalArgumentException("TestRequestDelay cannot be negative or zero"); if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero"); - if (settings.getConnectionTimeoutOnSend() <= 0) { - throw new IllegalArgumentException("connectionTimeoutOnSend must be greater than zero"); - } + this.sendingTimeoutHandler = SendingTimeoutHandler.create( + settings.getMinConnectionTimeoutOnSend(), + settings.getConnectionTimeoutOnSend(), + context::send + ); } @Override @@ -276,7 +282,7 @@ private CompletableFuture send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map deadline) { // The method should have checked exception in signature... ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls", - settings.getConnectionTimeoutOnSend()))); + currentTimeout))); } } diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 99a21ce..4e7e0a2 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -75,6 +75,8 @@ public class FixHandlerSettings implements IHandlerSettings { */ private long connectionTimeoutOnSend = DEFAULT_CONNECTION_TIMEOUT_ON_SEND; + private long minConnectionTimeoutOnSend = 1_000; + @JsonDeserialize(using = DateTimeFormatterDeserializer.class) private DateTimeFormatter sendingDateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS"); @@ -317,4 +319,12 @@ public long getConnectionTimeoutOnSend() { public void setConnectionTimeoutOnSend(long connectionTimeoutOnSend) { this.connectionTimeoutOnSend = connectionTimeoutOnSend; } + + public long getMinConnectionTimeoutOnSend() { + return minConnectionTimeoutOnSend; + } + + public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) { + this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend; + } } diff --git a/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java b/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java index 4353258..5e81642 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java @@ -59,6 +59,7 @@ void sendTimeoutOnConnectionOpen() { settings.setPort(42); settings.setHost("localhost"); settings.setConnectionTimeoutOnSend(300); // 300 millis + settings.setMinConnectionTimeoutOnSend(100); Mockito.when(contextMock.getSettings()) .thenReturn(settings); var fixHandler = new FixHandler(contextMock); @@ -100,6 +101,7 @@ void sendTimeoutOnSessionEnabled() { settings.setPort(42); settings.setHost("localhost"); settings.setConnectionTimeoutOnSend(300); // 300 millis + settings.setMinConnectionTimeoutOnSend(100); LocalTime currentTime = LocalTime.now(ZoneOffset.UTC); int deltaMinutes = currentTime.isAfter(LocalTime.NOON) ? -1 From f79f830cde9f9e38960a4f38fcf92f219a675c2d Mon Sep 17 00:00:00 2001 From: Oleg Smirnov Date: Mon, 30 Oct 2023 11:54:05 +0400 Subject: [PATCH 2/3] [TH2-5102] Add message property with operation time (#66) * Use mutable map when sending message * Update version and readme * Update changelog --- README.md | 8 +++++- .../java/com/exactpro/th2/FixHandler.java | 28 +++++++++++-------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index b023e73..911da04 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.4.2) +# th2-conn-dirty-fix (1.5.1) This microservice allows sending and receiving messages via FIX protocol @@ -335,6 +335,12 @@ spec: # Changelog +## 1.5.1 + +* Property `th2.operation_timestamp` is added to metadata to each message +* Use mutable map for metadata when sending a messages from the handler + * Fix error when new property with operation timestamp added to the immutable map + ## 1.5.0 * `minConnectionTimeoutOnSend` parameter is added. diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 21261bb..cb3b06e 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -586,7 +586,7 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo).append(SOH); resendRequest.append(END_SEQ_NO).append(endSeqNo).append(SOH); setChecksumAndBodyLength(resendRequest); - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); resetHeartbeatTask(); } @@ -598,7 +598,7 @@ void sendResendRequest(int beginSeqNo) { //do private setChecksumAndBodyLength(resendRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); resetHeartbeatTask(); } else { sendLogon(); @@ -655,7 +655,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { if(sequence - 1 != lastProcessedSequence.get() ) { StringBuilder sequenceReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE); resetHeartbeatTask(); } @@ -663,7 +663,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { setPossDup(buf); updateLength(buf); updateChecksum(buf); - channel.send(buf, Collections.emptyMap(), null, SendMode.MANGLE); + channel.send(buf, createMetadataMap(), null, SendMode.MANGLE); resetHeartbeatTask(); @@ -681,7 +681,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), null, SendMode.MANGLE + createMetadataMap(), null, SendMode.MANGLE ); } } else { @@ -689,7 +689,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), null, SendMode.MANGLE + createMetadataMap(), null, SendMode.MANGLE ); } resetHeartbeatTask(); @@ -700,7 +700,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); channel.send( Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), null, SendMode.MANGLE + createMetadataMap(), null, SendMode.MANGLE ); } finally { recoveryLock.unlock(); @@ -716,7 +716,7 @@ private void sendSequenceReset() { setChecksumAndBodyLength(sequenceReset); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); resetHeartbeatTask(); } else { sendLogon(); @@ -880,7 +880,7 @@ private void sendHeartbeatTestReqId(String testReqId) { if (enabled.get()) { info("Send Heartbeat to server - %s", heartbeat); - channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); resetHeartbeatTask(); } else { @@ -894,7 +894,7 @@ public void sendTestRequest() { //do private testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet()); setChecksumAndBodyLength(testRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); info("Send TestRequest to server - %s", testRequest); resetTestRequestTask(); resetHeartbeatTask(); @@ -942,7 +942,7 @@ public void sendLogon() { setChecksumAndBodyLength(logon); info("Send logon - %s", logon); - channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); } private void sendLogout() { @@ -963,7 +963,7 @@ private void sendLogout(String text) { try { channel.send( Unpooled.wrappedBuffer(logout.toString().getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), + createMetadataMap(), null, IChannel.SendMode.MANGLE ).get(); @@ -1174,4 +1174,8 @@ private void debug(String message, Object... args) { LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); } } + + private Map createMetadataMap() { + return new HashMap<>(2); + } } \ No newline at end of file From 65e20329e6576351e418973f4b7d6ed0ae5ac03d Mon Sep 17 00:00:00 2001 From: Oleg Smirnov Date: Mon, 30 Oct 2023 13:24:42 +0400 Subject: [PATCH 3/3] Increment version (#68) --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index e60bbc1..08c693d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version=1.5.0 \ No newline at end of file +release_version=1.5.1 \ No newline at end of file