From cbc327535a8a11ebe7d47df5bdf3d2180ee4dc90 Mon Sep 17 00:00:00 2001 From: Denis Plotnikov Date: Fri, 15 Dec 2023 13:01:32 +0400 Subject: [PATCH] [TH2-5146] Reconnect if logon ack is not received in user defined timeout --- README.md | 6 ++- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 24 +++++++++--- .../com/exactpro/th2/FixHandlerSettings.java | 14 +++++++ .../java/com/exactpro/th2/FixHandlerTest.java | 37 +++++++++++++++++++ 5 files changed, 75 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 911da04..6b160b6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.5.1) +# th2-conn-dirty-fix (1.6.0) This microservice allows sending and receiving messages via FIX protocol @@ -56,6 +56,7 @@ This microservice allows sending and receiving messages via FIX protocol The timeout is reset to the original value after a successful sending attempt. If connection is not established within the specified timeout an error will be reported. + *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._ ++ *logonAckTimeout* - timeout before reconnect will be issued if no logon ack is received. ### Security settings @@ -335,6 +336,9 @@ spec: # Changelog +## 1.6.0 +* Session will be reconnected if logon ack isn't received during user defined timeout. + ## 1.5.1 * Property `th2.operation_timestamp` is added to metadata to each message diff --git a/gradle.properties b/gradle.properties index 08c693d..c603e2f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version=1.5.1 \ No newline at end of file +release_version=1.6.0 \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index cb3b06e..0fe9c29 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -43,7 +43,6 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -169,7 +168,7 @@ public class FixHandler implements AutoCloseable, IHandler { private final AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); private final SendingTimeoutHandler sendingTimeoutHandler; - private Future reconnectRequestTimer = CompletableFuture.completedFuture(null); + private AtomicReference> reconnectRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); private volatile IChannel channel; protected FixHandlerSettings settings; @@ -430,7 +429,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu if(settings.isLogoutOnIncorrectServerSequence()) { context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get()))); sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum)); - reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + reconnectRequestTimer.set(executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS)); if (LOGGER.isErrorEnabled()) error("Invalid message. SeqNum is less than expected %d: %s", null, serverMsgSeqNum.get(), message.toString(US_ASCII)); } else { context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1))); @@ -456,6 +455,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu if(LOGGER.isInfoEnabled()) info("Logon received - %s", message.toString(US_ASCII)); boolean connectionSuccessful = checkLogon(message); if (connectionSuccessful) { + cancelFuture(reconnectRequestTimer); if(settings.useNextExpectedSeqNum()) { FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG); if(nextExpectedSeqField == null) { @@ -491,7 +491,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu resetTestRequestTask(); } else { enabled.set(false); - reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + reconnectRequestTimer.set(executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS)); } break; //extract logout reason @@ -729,7 +729,7 @@ private void checkHeartbeat(ByteBuf message) { if (receivedTestReqID != null) { if (Objects.equals(receivedTestReqID.getValue(), Integer.toString(testReqID.get()))) { - reconnectRequestTimer.cancel(false); + cancelFuture(reconnectRequestTimer); } } } @@ -901,7 +901,7 @@ public void sendTestRequest() { //do private } else { sendLogon(); } - reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + reconnectRequestTimer.set(executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS)); } public void sendLogon() { @@ -943,6 +943,7 @@ public void sendLogon() { setChecksumAndBodyLength(logon); info("Send logon - %s", logon); channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); + reconnectRequestTimer.set(executorService.schedule(this::onMissingLogonAck, settings.getLogonAckTimeout(), TimeUnit.MILLISECONDS)); } private void sendLogout() { @@ -989,6 +990,7 @@ public void onClose(@NotNull IChannel channel) { enabled.set(false); cancelFuture(heartbeatTimer); cancelFuture(testRequestTimer); + cancelFuture(reconnectRequestTimer); } @Override @@ -1019,6 +1021,16 @@ private void disconnect(Boolean graceful) throws ExecutionException, Interrupted channel.close().get(); } + private void onMissingLogonAck() { + info("Logon was not acknowledged. Reconnecting."); + try { + disconnect(false); + channel.open(); + } catch (Exception e) { + error("Error while disconnecting on missing logon ack", e); + } + } + private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time) { stringBuilder.append(BEGIN_STRING_TAG).append("=").append(settings.getBeginString()); stringBuilder.append(MSG_TYPE).append(msgType); diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 4e7e0a2..2a1f48c 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -69,6 +69,12 @@ public class FixHandlerSettings implements IHandlerSettings { private int reconnectDelay = 5; private int disconnectRequestDelay = 5; + /** + * Timeout in milliseconds during which the connection should receive logon acknowledgement message. + * Otherwise reconnect will be done. + */ + private long logonAckTimeout = 5_000; + /** * Timeout in milliseconds during which the connection should be opened and session is logged in. * Otherwise, the send operation will be interrupted @@ -327,4 +333,12 @@ public long getMinConnectionTimeoutOnSend() { public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) { this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend; } + + public long getLogonAckTimeout() { + return logonAckTimeout; + } + + public void setLogonAckTimeout(long logonTimeout) { + this.logonAckTimeout = logonTimeout; + } } diff --git a/src/test/java/com/exactpro/th2/FixHandlerTest.java b/src/test/java/com/exactpro/th2/FixHandlerTest.java index 0a61f8a..6527a0f 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerTest.java @@ -225,6 +225,41 @@ void onConnectionTest() { new String(channel.getQueue().get(0).array())); } + @Test + void reconnectOnMissingLogonAck() throws InterruptedException { + FixHandlerSettings settings = createHandlerSettings(); + Channel channel = new Channel(settings, null); + FixHandler fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + assertEquals(channel.getQueue().size(), 1); + assertEquals("8=FIXT.1.1\u00019=105\u000135=A\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=203\u0001", + new String(channel.getQueue().get(0).array())); + channel.getQueue().clear(); + Thread.sleep(settings.getLogonAckTimeout() + 2000); + assertEquals(channel.getQueue().size(), 1); + assertEquals("8=FIXT.1.1\u00019=105\u000135=A\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=203\u0001", + new String(channel.getQueue().get(0).array())); + channel.close(); + } + + @Test + void noReconnectIfAckReceived() throws InterruptedException { + FixHandlerSettings settings = createHandlerSettings(); + settings.setHeartBtInt(1000); + settings.setTestRequestDelay(1000); + Channel channel = new Channel(settings, null); + FixHandler fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + assertEquals(channel.getQueue().size(), 1); + assertEquals("8=FIXT.1.1\u00019=107\u000135=A\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=1000\u00011137=9\u0001553=username\u0001554=pass\u000110=043\u0001", + new String(channel.getQueue().get(0).array())); + channel.getQueue().clear(); + Thread.sleep(settings.getLogonAckTimeout() * 2); + assertEquals(channel.getQueue().size(), 0); + channel.close(); + } + @Test void logoutDisconnectTest() { channel.clearQueue(); @@ -427,6 +462,7 @@ class Channel implements IChannel { @Override public CompletableFuture open() { + fixHandler.onOpen(this); return CompletableFuture.completedFuture(Unit.INSTANCE); } @@ -444,6 +480,7 @@ public boolean isOpen() { @Override public CompletableFuture close() { + fixHandler.onClose(this); return CompletableFuture.completedFuture(Unit.INSTANCE); }