diff --git a/README.md b/README.md index c7a9cf9..e40e41f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.4.1) +# th2-conn-dirty-fix (1.4.2) This microservice allows sending and receiving messages via FIX protocol @@ -49,6 +49,7 @@ This microservice allows sending and receiving messages via FIX protocol + *useNextExpectedSeqNum* - session management based on next expected sequence number. (`false` by default) + *saveAdminMessages* - defines if admin messages will be saved to internal outgoing buffer. (`false` by default) + *resetStateOnServerReset* - whether to reset the server sequence after receiving logout with text `Next Expected MSN too high, MSN to be sent is x but received y`. ++ *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message. + *connectionTimeoutOnSend* - timeout in milliseconds for sending message from queue thread (please read about [acknowledgment timeout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) to understand the problem). _Default, 30000 mls._ @@ -332,21 +333,34 @@ spec: # Changelog -# 1.4.1 +## 1.4.2 +* Ungraceful session disconnect support. +* Removed NPE when session is reset by schedule. + +## 1.4.1 * Timeout on send from queue thread * Parameter `connectionTimeoutOnSend` was added -# 1.4.0 +## 1.4.0 * Updated bom: `4.5.0-dev` * Updated common: `5.4.0-dev` * Updated common-utils: `2.2.0-dev` * Updated grpc-lw-data-provider: `2.1.0-dev` * Updated kotlin: `1.8.22` - -# 1.3.0 * Added support for th2 transport protocol -# 1.2.1 +## 1.3.2 +* Improve logging: log session group and session alias for each log message. + +## 1.3.1 +* fix multiple consequent SOH characters + +## 1.3.0 +* Added handling for incoming test request messages +* Fixed resetSeqNum flag handling on incoming logon messages. +* Added option to automatically reset server sequence when internal conn sequence doesn't match with sequence that server sent. + +## 1.2.1 * fix multiple consequent SOH characters ## 1.2.0 diff --git a/build.gradle b/build.gradle index c08b3f9..61b30a0 100644 --- a/build.gradle +++ b/build.gradle @@ -51,9 +51,8 @@ dependencies { exclude group: 'com.exactpro.th2', module: 'task-utils' } implementation "com.exactpro.th2:common-utils:2.2.0-dev" - implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' - implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.0-dev' + implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-mangler_fix_3.2.1+' implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-separate-executor-reverse-orders-917aa0b-SNAPSHOT' implementation 'org.slf4j:slf4j-api' diff --git a/gradle.properties b/gradle.properties index b74b0fc..66a2be3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1 @@ -release_version=1.4.1 - +release_version=1.4.2 \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index ad874a4..df821be 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -90,6 +91,7 @@ import static com.exactpro.th2.constants.Constants.GAP_FILL_FLAG_TAG; import static com.exactpro.th2.constants.Constants.HEART_BT_INT; import static com.exactpro.th2.constants.Constants.IS_POSS_DUP; +import static com.exactpro.th2.constants.Constants.IS_SEQUENCE_RESET_FLAG; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.MSG_TYPE; @@ -112,6 +114,7 @@ import static com.exactpro.th2.constants.Constants.POSS_DUP; import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM; +import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.SENDER_SUB_ID; @@ -119,11 +122,11 @@ import static com.exactpro.th2.constants.Constants.SENDING_TIME; import static com.exactpro.th2.constants.Constants.SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.SESSION_STATUS_TAG; -import static com.exactpro.th2.constants.Constants.SUCCESSFUL_LOGOUT_CODE; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID_TAG; +import static com.exactpro.th2.constants.Constants.TEXT; import static com.exactpro.th2.constants.Constants.TEXT_TAG; import static com.exactpro.th2.constants.Constants.USERNAME; import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.indexOf; @@ -145,6 +148,7 @@ public class FixHandler implements AutoCloseable, IHandler { private static final byte BYTE_SOH = 1; private static final String STRING_MSG_TYPE = "MsgType"; private static final String REJECT_REASON = "Reject reason"; + private static final String UNGRACEFUL_DISCONNECT_PROPERTY = "ungracefulDisconnect"; private static final String STUBBING_VALUE = "XXX"; private final AtomicInteger msgSeqNum = new AtomicInteger(0); @@ -241,7 +245,7 @@ public void onStart() { channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE); if(settings.isLoadSequencesFromCradle()) { SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionGroup(), channel.getSessionAlias()); - LOGGER.info("Loaded sequences are: client - {}, server - {}", sequences.getClientSeq(), sequences.getServerSeq()); + info("Loaded sequences are: client - %d, server - %d", sequences.getClientSeq(), sequences.getServerSeq()); msgSeqNum.set(sequences.getClientSeq()); serverMsgSeqNum.set(sequences.getServerSeq()); } @@ -257,6 +261,28 @@ private CompletableFuture send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map deadline) { // The method should have checked exception in signature... @@ -362,14 +386,14 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu FixField msgSeqNumValue = findField(message, MSG_SEQ_NUM_TAG); if (msgSeqNumValue == null) { metadata.put(REJECT_REASON, "No msgSeqNum Field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgSeqNum in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgSeqNum in message: %s", null, message.toString(US_ASCII)); return metadata; } FixField msgType = findField(message, MSG_TYPE_TAG); if (msgType == null) { metadata.put(REJECT_REASON, "No msgType Field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgType in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgType in message: %s", null, message.toString(US_ASCII)); return metadata; } @@ -388,11 +412,24 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu int receivedMsgSeqNum = Integer.parseInt(requireNonNull(msgSeqNumValue.getValue())); + if(msgTypeValue.equals(MSG_TYPE_LOGON) && receivedMsgSeqNum < serverMsgSeqNum.get()) { + FixField resetSeqNumFlagField = findField(message, RESET_SEQ_NUM_TAG); + if(resetSeqNumFlagField != null && Objects.equals(resetSeqNumFlagField.getValue(), IS_SEQUENCE_RESET_FLAG)) { + serverMsgSeqNum.set(0); + } + } + if(receivedMsgSeqNum < serverMsgSeqNum.get() && !isDup) { - sendLogout(); - reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + if(settings.isLogoutOnIncorrectServerSequence()) { + context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get()))); + sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum)); + reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + if (LOGGER.isErrorEnabled()) error("Invalid message. SeqNum is less than expected %d: %s", null, serverMsgSeqNum.get(), message.toString(US_ASCII)); + } else { + context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1))); + serverMsgSeqNum.set(receivedMsgSeqNum - 1); + } metadata.put(REJECT_REASON, "SeqNum is less than expected."); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get(), message.toString(US_ASCII)); return metadata; } @@ -405,18 +442,18 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu switch (msgTypeValue) { case MSG_TYPE_HEARTBEAT: - if (LOGGER.isInfoEnabled()) LOGGER.info("Heartbeat received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Heartbeat received - %s", message.toString(US_ASCII)); checkHeartbeat(message); break; case MSG_TYPE_LOGON: - if (LOGGER.isInfoEnabled()) LOGGER.info("Logon received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Logon received - %s", message.toString(US_ASCII)); boolean connectionSuccessful = checkLogon(message); if (connectionSuccessful) { if(settings.useNextExpectedSeqNum()) { FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG); if(nextExpectedSeqField == null) { metadata.put(REJECT_REASON, "No NextExpectedSeqNum field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No NextExpectedSeqNum in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No NextExpectedSeqNum in message: %s", null, message.toString(US_ASCII)); return metadata; } @@ -450,17 +487,19 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); } break; - case MSG_TYPE_LOGOUT: //extract logout reason - handleLogout(message); - break; + //extract logout reason case MSG_TYPE_RESEND_REQUEST: - if (LOGGER.isInfoEnabled()) LOGGER.info("Resend request received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Resend request received - %s", message.toString(US_ASCII)); handleResendRequest(message); break; case MSG_TYPE_SEQUENCE_RESET: //gap fill - if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Sequence reset received - %s", message.toString(US_ASCII)); resetSequence(message); break; + case MSG_TYPE_TEST_REQUEST: + if (LOGGER.isInfoEnabled()) LOGGER.info("Test request received - {}", message.toString(US_ASCII)); + handleTestRequest(message, metadata); + break; } resetTestRequestTask(); @@ -470,26 +509,32 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu return metadata; } + private Map handleTestRequest(ByteBuf message, Map metadata) { + FixField testReqId = findField(message, TEST_REQ_ID_TAG); + if(testReqId == null || testReqId.getValue() == null) { + metadata.put(REJECT_REASON, "Test Request message hasn't got TestReqId field."); + return metadata; + } + + sendHeartbeatTestReqId(testReqId.getValue()); + + return null; + } + private void handleLogout(@NotNull ByteBuf message) { - if (LOGGER.isInfoEnabled()) LOGGER.info("Logout received - {}", message.toString(US_ASCII)); - FixField sessionStatus = findField(message, SESSION_STATUS_TAG); + if(LOGGER.isInfoEnabled()) info("Logout received - %s", message.toString(US_ASCII)); boolean isSequenceChanged = false; - if(sessionStatus != null) { - int statusCode = Integer.parseInt(Objects.requireNonNull(sessionStatus.getValue())); - if(statusCode != SUCCESSFUL_LOGOUT_CODE) { - FixField text = findField(message, TEXT_TAG); - if (text != null) { - LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); - String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); - if (wrongClientSequence != null) { - msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1); - isSequenceChanged = true; - } - String wrongClientNextExpectedSequence = StringUtils.substringBetween(text.getValue(), "MSN to be sent is ", " but received"); - if(wrongClientNextExpectedSequence != null && settings.getResetStateOnServerReset()) { - serverMsgSeqNum.set(Integer.parseInt(wrongClientNextExpectedSequence)); - } - } + FixField text = findField(message, TEXT_TAG); + if (text != null) { + LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); + String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); + if (wrongClientSequence != null) { + msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1); + isSequenceChanged = true; + } + String wrongClientNextExpectedSequence = StringUtils.substringBetween(text.getValue(), "MSN to be sent is ", " but received"); + if(wrongClientNextExpectedSequence != null && settings.getResetStateOnServerReset()) { + serverMsgSeqNum.set(Integer.parseInt(wrongClientNextExpectedSequence)); } } @@ -514,7 +559,7 @@ private void resetSequence(ByteBuf message) { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1); } } else { - LOGGER.trace("Failed to reset servers MsgSeqNum. No such tag in message: {}", message.toString(US_ASCII)); + if(LOGGER.isWarnEnabled()) warn("Failed to reset servers MsgSeqNum. No such tag in message: %s", message.toString(US_ASCII)); } } @@ -522,7 +567,9 @@ private void reset() { msgSeqNum.set(0); serverMsgSeqNum.set(0); sessionActive.set(true); - messageLoader.updateTime(); + if(messageLoader != null) { + messageLoader.updateTime(); + } channel.open(); } @@ -579,7 +626,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { } int endSeq = endSeqNo; - LOGGER.info("Loading messages from {} to {}", beginSeqNo, endSeqNo); + info("Loading messages from %d to %d", beginSeqNo, endSeqNo); if(settings.isLoadMissedMessagesFromCradle()) { Function1 processMessage = (buf) -> { FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); @@ -641,7 +688,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { resetHeartbeatTask(); } catch (Exception e) { - LOGGER.error("Error while loading messages for recovery", e); + error("Error while loading messages for recovery", e); String seqReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); channel.send( @@ -698,9 +745,9 @@ private boolean checkLogon(ByteBuf message) { public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map metadata) { onOutgoingUpdateTag(message, metadata); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Outgoing message: {}", message.toString(US_ASCII)); - } + if(LOGGER.isDebugEnabled()) debug("Outgoing message: %s", message.toString(US_ASCII)); + + if(enabled.get()) resetHeartbeatTask(); } public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map metadata) { @@ -723,9 +770,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map