From 1f02cee594666ab0827eba189271f62697e06f0c Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Fri, 11 Aug 2023 20:09:54 +0400 Subject: [PATCH 1/8] Test request handler (#56) --- README.md | 8 +- build.gradle | 8 +- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 88 ++++++++++++++----- .../com/exactpro/th2/FixHandlerSettings.java | 10 +-- .../com/exactpro/th2/constants/Constants.java | 2 + 6 files changed, 84 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 341df80..943996b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.2.0) +# th2-conn-dirty-fix (1.3.0) This microservice allows sending and receiving messages via FIX protocol @@ -49,6 +49,7 @@ This microservice allows sending and receiving messages via FIX protocol + *useNextExpectedSeqNum* - session management based on next expected sequence number. (`false` by default) + *saveAdminMessages* - defines if admin messages will be saved to internal outgoing buffer. (`false` by default) + *resetStateOnServerReset* - whether to reset the server sequence after receiving logout with text `Next Expected MSN too high, MSN to be sent is x but received y`. ++ *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message. ### Security settings @@ -328,6 +329,11 @@ spec: # Changelog +## 1.3.0 +* Added handling for incoming test request messages +* Fixed resetSeqNum flag handling on incoming logon messages. +* Added option to automatically reset server sequence when internal conn sequence doesn't match with sequence that server sent. + ## 1.2.0 * loading requested messages from cradle. diff --git a/build.gradle b/build.gradle index ab58b29..6d44354 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,8 @@ plugins { id 'java' id 'org.jetbrains.kotlin.jvm' version '1.6.21' id 'com.palantir.docker' version '0.25.0' - id "org.owasp.dependencycheck" version "8.1.2" + id "org.owasp.dependencycheck" version "8.2.1" + id "com.gorylenko.gradle-git-properties" version "2.4.1" } apply plugin: 'application' @@ -45,9 +46,9 @@ repositories { } dependencies { - api platform('com.exactpro.th2:bom:4.2.0') + api platform('com.exactpro.th2:bom:4.4.0') - implementation 'com.exactpro.th2:common:5.2.0-dev' + implementation 'com.exactpro.th2:common:5.2.2-dev' implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' implementation('com.exactpro.th2:conn-dirty-tcp-core:3.0.0-dev') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' @@ -57,6 +58,7 @@ dependencies { implementation 'org.slf4j:slf4j-api' implementation 'io.github.microutils:kotlin-logging:2.1.23' + implementation 'org.apache.commons:commons-lang3:3.13.0' implementation 'io.netty:netty-all' implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.6.21' diff --git a/gradle.properties b/gradle.properties index 994869d..a192dcd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.2.0 +release_version=1.3.0 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 24765d2..edd1490 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -86,6 +86,7 @@ import static com.exactpro.th2.constants.Constants.GAP_FILL_FLAG_TAG; import static com.exactpro.th2.constants.Constants.HEART_BT_INT; import static com.exactpro.th2.constants.Constants.IS_POSS_DUP; +import static com.exactpro.th2.constants.Constants.IS_SEQUENCE_RESET_FLAG; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.MSG_TYPE; @@ -103,11 +104,11 @@ import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUM; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUMBER_TAG; import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME; -import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.PASSWORD; import static com.exactpro.th2.constants.Constants.POSS_DUP; import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM; +import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.SENDER_SUB_ID; @@ -120,6 +121,7 @@ import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID_TAG; +import static com.exactpro.th2.constants.Constants.TEXT; import static com.exactpro.th2.constants.Constants.TEXT_TAG; import static com.exactpro.th2.constants.Constants.USERNAME; import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.indexOf; @@ -352,11 +354,24 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu int receivedMsgSeqNum = Integer.parseInt(requireNonNull(msgSeqNumValue.getValue())); + if(msgTypeValue.equals(MSG_TYPE_LOGON) && receivedMsgSeqNum < serverMsgSeqNum.get()) { + FixField resetSeqNumFlagField = findField(message, RESET_SEQ_NUM_TAG); + if(resetSeqNumFlagField != null && Objects.equals(resetSeqNumFlagField.getValue(), IS_SEQUENCE_RESET_FLAG)) { + serverMsgSeqNum.set(0); + } + } + if(receivedMsgSeqNum < serverMsgSeqNum.get() && !isDup) { - sendLogout(); - reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + if(settings.isLogoutOnIncorrectServerSequence()) { + context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get()))); + sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum)); + reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get() + 1, message.toString(US_ASCII)); + } else { + context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1))); + serverMsgSeqNum.set(receivedMsgSeqNum - 1); + } metadata.put(REJECT_REASON, "SeqNum is less than expected."); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get(), message.toString(US_ASCII)); return metadata; } @@ -425,6 +440,10 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII)); resetSequence(message); break; + case MSG_TYPE_TEST_REQUEST: + if (LOGGER.isInfoEnabled()) LOGGER.info("Test request received - {}", message.toString(US_ASCII)); + handleTestRequest(message, metadata); + break; } resetTestRequestTask(); @@ -434,26 +453,32 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu return metadata; } + private Map handleTestRequest(ByteBuf message, Map metadata) { + FixField testReqId = findField(message, TEST_REQ_ID_TAG); + if(testReqId == null || testReqId.getValue() == null) { + metadata.put(REJECT_REASON, "Test Request message hasn't got TestReqId field."); + return metadata; + } + + sendHeartbeatTestReqId(testReqId.getValue()); + + return null; + } + private void handleLogout(@NotNull ByteBuf message) { if (LOGGER.isInfoEnabled()) LOGGER.info("Logout received - {}", message.toString(US_ASCII)); - FixField sessionStatus = findField(message, SESSION_STATUS_TAG); boolean isSequenceChanged = false; - if(sessionStatus != null) { - int statusCode = Integer.parseInt(Objects.requireNonNull(sessionStatus.getValue())); - if(statusCode != SUCCESSFUL_LOGOUT_CODE) { - FixField text = findField(message, TEXT_TAG); - if (text != null) { - LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); - String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); - if (wrongClientSequence != null) { - msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1); - isSequenceChanged = true; - } - String wrongClientNextExpectedSequence = StringUtils.substringBetween(text.getValue(), "MSN to be sent is ", " but received"); - if(wrongClientNextExpectedSequence != null && settings.getResetStateOnServerReset()) { - serverMsgSeqNum.set(Integer.parseInt(wrongClientNextExpectedSequence)); - } - } + FixField text = findField(message, TEXT_TAG); + if (text != null) { + LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); + String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); + if (wrongClientSequence != null) { + msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1); + isSequenceChanged = true; + } + String wrongClientNextExpectedSequence = StringUtils.substringBetween(text.getValue(), "MSN to be sent is ", " but received"); + if(wrongClientNextExpectedSequence != null && settings.getResetStateOnServerReset()) { + serverMsgSeqNum.set(Integer.parseInt(wrongClientNextExpectedSequence)); } } @@ -778,11 +803,16 @@ public void onOpen(@NotNull IChannel channel) { sendLogon(); } - public void sendHeartbeat() { + public void sendHeartbeat() {sendHeartbeatTestReqId(null);} + + private void sendHeartbeatTestReqId(String testReqId) { StringBuilder heartbeat = new StringBuilder(); int seqNum = msgSeqNum.incrementAndGet(); setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum, null); + if(testReqId != null) { + heartbeat.append(TEST_REQ_ID).append(testReqId); + } setChecksumAndBodyLength(heartbeat); if (enabled.get()) { @@ -818,8 +848,11 @@ public void sendLogon() { } StringBuilder logon = new StringBuilder(); Boolean reset; - if (!connStarted.get()) reset = settings.getResetSeqNumFlag(); - else reset = settings.getResetOnLogon(); + if (!connStarted.get()) { + reset = settings.getResetSeqNumFlag(); + } else { + reset = settings.getResetOnLogon(); + } if (reset) msgSeqNum.getAndSet(0); setHeader(logon, MSG_TYPE_LOGON, msgSeqNum.get() + 1, null); @@ -850,9 +883,16 @@ public void sendLogon() { } private void sendLogout() { + sendLogout(null); + } + + private void sendLogout(String text) { if (enabled.get()) { StringBuilder logout = new StringBuilder(); setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null); + if(text != null) { + logout.append(TEXT).append(text); + } setChecksumAndBodyLength(logout); LOGGER.debug("Sending logout - {}", logout); diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 0aaff99..3417160 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -53,10 +53,10 @@ public class FixHandlerSettings implements IHandlerSettings { private Boolean resetSeqNumFlag = false; private Boolean resetOnLogon = false; private Boolean useNextExpectedSeqNum = false; - private Boolean saveAdminMessages = false; private Boolean loadSequencesFromCradle = false; private Boolean loadMissedMessagesFromCradle = false; private Boolean resetStateOnServerReset = false; + private Boolean logoutOnIncorrectServerSequence = false; @JsonDeserialize(using = LocalTimeDeserializer.class) private LocalTime sessionStartTime; @@ -259,12 +259,12 @@ public void setUseNextExpectedSeqNum(Boolean useNextExpectedSeqNum) { this.useNextExpectedSeqNum = useNextExpectedSeqNum; } - public Boolean isSaveAdminMessages() { - return saveAdminMessages; + public Boolean isLogoutOnIncorrectServerSequence() { + return logoutOnIncorrectServerSequence; } - public void setSaveAdminMessages(Boolean saveAdminMessages) { - this.saveAdminMessages = saveAdminMessages; + public void setLogoutOnIncorrectServerSequence(Boolean logoutOnIncorrectServerSequence) { + this.logoutOnIncorrectServerSequence = logoutOnIncorrectServerSequence; } public LocalTime getSessionStartTime() { diff --git a/src/main/java/com/exactpro/th2/constants/Constants.java b/src/main/java/com/exactpro/th2/constants/Constants.java index 35252d7..de4beed 100644 --- a/src/main/java/com/exactpro/th2/constants/Constants.java +++ b/src/main/java/com/exactpro/th2/constants/Constants.java @@ -82,6 +82,7 @@ public class Constants { public static final String NEXT_EXPECTED_SEQ_NUM = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; public static final String POSS_DUP = SOH + POSS_DUP_TAG + "="; public static final String ORIG_SENDING_TIME = SOH + ORIG_SENDING_TIME_TAG + "="; + public static final String TEXT = SOH + TEXT_TAG + "="; //message types public static final String MSG_TYPE_LOGON = "A"; @@ -100,5 +101,6 @@ public class Constants { ); public static final String IS_POSS_DUP = "Y"; + public static final String IS_SEQUENCE_RESET_FLAG = "Y"; public static final int SUCCESSFUL_LOGOUT_CODE = 4; } From 5a62f039eec8a7b62a5d678edb7e3eaa4a8fcfe3 Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Tue, 15 Aug 2023 14:58:54 +0400 Subject: [PATCH 2/8] Fix recovery (#57) --- README.md | 4 +++- gradle.properties | 2 +- src/main/java/com/exactpro/th2/FixHandler.java | 9 +++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 943996b..ea5f10e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.3.0) +# th2-conn-dirty-fix (1.3.1) This microservice allows sending and receiving messages via FIX protocol @@ -328,6 +328,8 @@ spec: ``` # Changelog +# 1.3.1 +* fix multiple consequent SOH characters ## 1.3.0 * Added handling for incoming test request messages diff --git a/gradle.properties b/gradle.properties index a192dcd..81dbf81 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.3.0 +release_version=1.3.1 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index edd1490..60c5ffe 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -104,6 +104,7 @@ import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUM; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUMBER_TAG; import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME; +import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.PASSWORD; import static com.exactpro.th2.constants.Constants.POSS_DUP; import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; @@ -585,7 +586,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { if(ADMIN_MESSAGES.contains(msgType)) return true; FixField possDup = findField(buf, POSS_DUP_TAG); - if(possDup != null && possDup.getValue() == IS_POSS_DUP) return true; + if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; if(sequence - 1 != lastProcessedSequence.get() ) { int newSeqNo = sequence; @@ -1019,16 +1020,16 @@ private void setTime(ByteBuf buf) { String time = getTime(); if (sendingTime == null) { - seqNum.insertNext(SENDING_TIME, time).insertNext(ORIG_SENDING_TIME, time); + seqNum.insertNext(SENDING_TIME_TAG, time).insertNext(SENDING_TIME_TAG, time); } else { String value = sendingTime.getValue(); if (value == null || value.isEmpty() || value.equals("null")) { sendingTime.setValue(time); - sendingTime.insertNext(ORIG_SENDING_TIME, time); + sendingTime.insertNext(ORIG_SENDING_TIME_TAG, time); } else { sendingTime.setValue(time); - sendingTime.insertNext(ORIG_SENDING_TIME, value); + sendingTime.insertNext(ORIG_SENDING_TIME_TAG, value); } } } From 226b57b57a6bfd16f8cf643ea885ec870cd72d71 Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Tue, 29 Aug 2023 18:57:59 +0400 Subject: [PATCH 3/8] [TH2-5026] Improve logging. (#55) --- README.md | 5 +- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 84 +++++++++++-------- 3 files changed, 56 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index ea5f10e..16490a7 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.3.1) +# th2-conn-dirty-fix (1.3.2) This microservice allows sending and receiving messages via FIX protocol @@ -328,6 +328,9 @@ spec: ``` # Changelog +### 1.3.2 +* Improve logging: log session group and session alias for each log message. + # 1.3.1 * fix multiple consequent SOH characters diff --git a/gradle.properties b/gradle.properties index 81dbf81..56657b9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.3.1 +release_version=1.3.2 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 60c5ffe..23829f0 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -237,7 +237,7 @@ public void onStart() { channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE); if(settings.isLoadSequencesFromCradle()) { SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionAlias()); - LOGGER.info("Loaded sequences are: client - {}, server - {}", sequences.getClientSeq(), sequences.getServerSeq()); + info("Loaded sequences are: client - %d, server - %d", sequences.getClientSeq(), sequences.getServerSeq()); msgSeqNum.set(sequences.getClientSeq()); serverMsgSeqNum.set(sequences.getServerSeq()); } @@ -260,11 +260,11 @@ public CompletableFuture send(@NotNull RawMessage rawMessage) { } while (channel.isOpen() && !enabled.get()) { - if (LOGGER.isWarnEnabled()) LOGGER.warn("Session is not yet logged in: {}", channel.getSessionAlias()); + warn("Session is not yet logged in"); try { Thread.sleep(1000); } catch (InterruptedException e) { - LOGGER.error("Error while sleeping."); + error("Error while sleeping.", null); } } @@ -329,14 +329,14 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu FixField msgSeqNumValue = findField(message, MSG_SEQ_NUM_TAG); if (msgSeqNumValue == null) { metadata.put(REJECT_REASON, "No msgSeqNum Field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgSeqNum in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgSeqNum in message: %s", null, message.toString(US_ASCII)); return metadata; } FixField msgType = findField(message, MSG_TYPE_TAG); if (msgType == null) { metadata.put(REJECT_REASON, "No msgType Field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgType in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgType in message: %s", null, message.toString(US_ASCII)); return metadata; } @@ -367,7 +367,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get()))); sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum)); reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get() + 1, message.toString(US_ASCII)); + if (LOGGER.isErrorEnabled()) error("Invalid message. SeqNum is less than expected %d: %s", null, serverMsgSeqNum.get(), message.toString(US_ASCII)); } else { context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1))); serverMsgSeqNum.set(receivedMsgSeqNum - 1); @@ -385,18 +385,18 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu switch (msgTypeValue) { case MSG_TYPE_HEARTBEAT: - if (LOGGER.isInfoEnabled()) LOGGER.info("Heartbeat received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Heartbeat received - %s", message.toString(US_ASCII)); checkHeartbeat(message); break; case MSG_TYPE_LOGON: - if (LOGGER.isInfoEnabled()) LOGGER.info("Logon received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Logon received - %s", message.toString(US_ASCII)); boolean connectionSuccessful = checkLogon(message); if (connectionSuccessful) { if(settings.useNextExpectedSeqNum()) { FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG); if(nextExpectedSeqField == null) { metadata.put(REJECT_REASON, "No NextExpectedSeqNum field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No NextExpectedSeqNum in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No NextExpectedSeqNum in message: %s", null, message.toString(US_ASCII)); return metadata; } @@ -430,15 +430,13 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); } break; - case MSG_TYPE_LOGOUT: //extract logout reason - handleLogout(message); - break; + //extract logout reason case MSG_TYPE_RESEND_REQUEST: - if (LOGGER.isInfoEnabled()) LOGGER.info("Resend request received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Resend request received - %s", message.toString(US_ASCII)); handleResendRequest(message); break; case MSG_TYPE_SEQUENCE_RESET: //gap fill - if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Sequence reset received - %s", message.toString(US_ASCII)); resetSequence(message); break; case MSG_TYPE_TEST_REQUEST: @@ -467,7 +465,7 @@ private Map handleTestRequest(ByteBuf message, Map processMessage = (buf) -> { FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); @@ -633,7 +631,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { resetHeartbeatTask(); } catch (Exception e) { - LOGGER.error("Error while loading messages for recovery", e); + error("Error while loading messages for recovery", e); String seqReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); channel.send( @@ -690,9 +688,7 @@ private boolean checkLogon(ByteBuf message) { public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map metadata) { onOutgoingUpdateTag(message, metadata); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Outgoing message: {}", message.toString(US_ASCII)); - } + if(LOGGER.isDebugEnabled()) debug("Outgoing message: %s", message.toString(US_ASCII)); } public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map metadata) { @@ -715,9 +711,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map> future) { future.get().cancel(false); } + + private void info(String message, Object... args) { + if(LOGGER.isInfoEnabled()) { + LOGGER.info("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } + + private void error(String message, Throwable throwable, Object... args) { + if(LOGGER.isErrorEnabled()) { + LOGGER.error("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args), throwable); + } + } + + private void warn(String message, Object... args) { + if(LOGGER.isWarnEnabled()) { + LOGGER.warn("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } + + private void debug(String message, Object... args) { + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } } \ No newline at end of file From 689bf6c76223df7749163549eb3ee0613267273d Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:29:27 +0400 Subject: [PATCH 4/8] [TH2-5014] Ungracefull disconnect based on message property. (#62) * [TH2-5014] Ungraceful disconnect. --- README.md | 6 ++- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 42 ++++++++++++++++++- 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 16490a7..0e9c4c6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.3.2) +# th2-conn-dirty-fix (1.4.0) This microservice allows sending and receiving messages via FIX protocol @@ -328,6 +328,10 @@ spec: ``` # Changelog +### 1.4.0 +* Ungraceful session disconnect support. +* Removed NPE when session is reset by schedule. + ### 1.3.2 * Improve logging: log session group and session alias for each log message. diff --git a/gradle.properties b/gradle.properties index 56657b9..dd536e5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.3.2 +release_version=1.4.0 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 23829f0..d61deed 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -117,7 +118,6 @@ import static com.exactpro.th2.constants.Constants.SENDING_TIME; import static com.exactpro.th2.constants.Constants.SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.SESSION_STATUS_TAG; -import static com.exactpro.th2.constants.Constants.SUCCESSFUL_LOGOUT_CODE; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID; @@ -144,6 +144,7 @@ public class FixHandler implements AutoCloseable, IHandler { private static final byte BYTE_SOH = 1; private static final String STRING_MSG_TYPE = "MsgType"; private static final String REJECT_REASON = "Reject reason"; + private static final String UNGRACEFUL_DISCONNECT_PROPERTY = "ungracefulDisconnect"; private static final String STUBBING_VALUE = "XXX"; private final AtomicInteger msgSeqNum = new AtomicInteger(0); @@ -251,6 +252,31 @@ public CompletableFuture send(@NotNull RawMessage rawMessage) { throw new IllegalStateException("Session is not active. It is not possible to send messages."); } + ByteBuf buf = Unpooled.copiedBuffer(rawMessage.getBody().toByteArray()); + Map props = rawMessage.getMetadata().getPropertiesMap(); + + FixField msgType = findField(buf, MSG_TYPE_TAG); + boolean isLogout = msgType != null && Objects.equals(msgType.getValue(), MSG_TYPE_LOGOUT); + if(isLogout && !channel.isOpen()) { + String message = String.format("%s - %s: Logout ignored as channel is already closed.", channel.getSessionGroup(), channel.getSessionAlias()); + LOGGER.warn(message); + context.send(CommonUtil.toEvent(message)); + return CompletableFuture.completedFuture(null); + } + + boolean isUngracefulDisconnect = Boolean.parseBoolean(props.get(UNGRACEFUL_DISCONNECT_PROPERTY)); + if(isLogout) { + context.send(CommonUtil.toEvent(String.format("Closing session %s. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect))); + try { + disconnect(!isUngracefulDisconnect); + enabled.set(false); + channel.open().get(); + } catch (Exception e) { + context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect), e)); + } + return CompletableFuture.completedFuture(null); + } + if (!channel.isOpen()) { try { channel.open().get(); @@ -510,7 +536,9 @@ private void reset() { msgSeqNum.set(0); serverMsgSeqNum.set(0); sessionActive.set(true); - messageLoader.updateTime(); + if(messageLoader != null) { + messageLoader.updateTime(); + } channel.open(); } @@ -941,6 +969,16 @@ private void waitLogoutResponse() { } } + private void disconnect(Boolean graceful) throws ExecutionException, InterruptedException { + if(graceful) { + sendLogout(); + waitLogoutResponse(); + } + resetHeartbeatTask(); + resetTestRequestTask(); + channel.close().get(); + } + private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time) { stringBuilder.append(BEGIN_STRING_TAG).append("=").append(settings.getBeginString()); stringBuilder.append(MSG_TYPE).append(msgType); From 867290cd52f8ae1dd17e5134fa27e236a4f20363 Mon Sep 17 00:00:00 2001 From: Denis Plotnikov Date: Fri, 15 Sep 2023 14:24:16 +0400 Subject: [PATCH 5/8] Reset heartbeats task on business message --- build.gradle | 2 +- src/main/java/com/exactpro/th2/FixHandler.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index af0b033..a7807a2 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ dependencies { } implementation "com.exactpro.th2:common-utils:2.2.2-dev" implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' - implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.0-dev' + implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-mangler_fix_3.2.1+' implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-separate-executor-reverse-orders-917aa0b-SNAPSHOT' implementation 'org.slf4j:slf4j-api' diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index f072dc4..ca75e48 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -746,6 +746,8 @@ public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @Not onOutgoingUpdateTag(message, metadata); if(LOGGER.isDebugEnabled()) debug("Outgoing message: %s", message.toString(US_ASCII)); + + if(enabled.get()) resetHeartbeatTask(); } public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map metadata) { From 992283731184919922f7c4a04d8f118ce9e874f5 Mon Sep 17 00:00:00 2001 From: Oleg Date: Mon, 18 Sep 2023 15:20:09 +0400 Subject: [PATCH 6/8] Correct common util version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index a7807a2..d799d53 100644 --- a/build.gradle +++ b/build.gradle @@ -50,7 +50,7 @@ dependencies { implementation("com.exactpro.th2:common:5.4.0-dev") { exclude group: 'com.exactpro.th2', module: 'task-utils' } - implementation "com.exactpro.th2:common-utils:2.2.2-dev" + implementation "com.exactpro.th2:common-utils:2.2.0-dev" implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-mangler_fix_3.2.1+' implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-separate-executor-reverse-orders-917aa0b-SNAPSHOT' From 0cfd35fe8f00f42e246cb5eef5811970e8b6b50e Mon Sep 17 00:00:00 2001 From: Oleg Date: Mon, 18 Sep 2023 16:20:39 +0400 Subject: [PATCH 7/8] Use timeout for opening connection after termination on manual logout send --- src/main/java/com/exactpro/th2/FixHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index ca75e48..df821be 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -276,7 +276,7 @@ private CompletableFuture send(@NotNull ByteBuf body, @NotNull Map Date: Tue, 19 Sep 2023 12:26:36 +0400 Subject: [PATCH 8/8] Remove duplicated dependency --- build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/build.gradle b/build.gradle index d799d53..61b30a0 100644 --- a/build.gradle +++ b/build.gradle @@ -58,7 +58,6 @@ dependencies { implementation 'org.slf4j:slf4j-api' implementation 'io.github.microutils:kotlin-logging:3.0.0' // The last version bases on kotlin 1.6.0 implementation 'org.apache.commons:commons-lang3' - implementation 'org.apache.commons:commons-lang3:3.13.0' implementation 'io.netty:netty-all' implementation 'com.google.auto.service:auto-service:1.0.1'