diff --git a/README.md b/README.md index 389b638..030fc5f 100644 --- a/README.md +++ b/README.md @@ -53,8 +53,10 @@ This microservice allows sending and receiving messages via FIX protocol + *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message. + *connectionTimeoutOnSend* - timeout in milliseconds for sending message from queue thread (please read about [acknowledgment timeout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) to understand the problem). - _Default, 30000 mls._ + _Default, 30000 mls._ Each failed sending attempt decreases the timeout in half (but not less than _minConnectionTimeoutOnSend_). + The timeout is reset to the original value after a successful sending attempt. If connection is not established within the specified timeout an error will be reported. ++ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._ ### Security settings @@ -334,6 +336,17 @@ spec: # Changelog +## 1.5.1 + +* Property `th2.operation_timestamp` is added to metadata to each message +* Use mutable map for metadata when sending a messages from the handler + * Fix error when new property with operation timestamp added to the immutable map + +## 1.5.0 + +* `minConnectionTimeoutOnSend` parameter is added. +* Sending timeout now decreases in half on each failed attempt (but not less than `minConnectionTimeoutOnSend`). + ## 1.4.2 * Ungraceful session disconnect support. * Removed NPE when session is reset by schedule. diff --git a/build.gradle b/build.gradle index bbaceca..8063861 100644 --- a/build.gradle +++ b/build.gradle @@ -55,7 +55,7 @@ dependencies { implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2' - implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-TH2-5001+' + implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.3.0-TH2-5001+' implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-dev' implementation 'org.slf4j:slf4j-api' diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index fc8e8d8..79c82e3 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -46,6 +46,7 @@ import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyScheduler; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyState; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.CleanupHandler; +import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; @@ -217,6 +218,8 @@ public class FixHandler implements AutoCloseable, IHandler { private final AtomicReference> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); private final AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); + + private final SendingTimeoutHandler sendingTimeoutHandler; private Future reconnectRequestTimer = CompletableFuture.completedFuture(null); private volatile IChannel channel; protected FixHandlerSettings settings; @@ -302,6 +305,12 @@ public FixHandler(IHandlerContext context) { throw new IllegalStateException("`newPassword` contains password that was already used in the past."); } + this.sendingTimeoutHandler = SendingTimeoutHandler.create( + settings.getMinConnectionTimeoutOnSend(), + settings.getConnectionTimeoutOnSend(), + context::send + ); + if (settings.getBrokenConnConfiguration() == null) { scheduler = new StrategyScheduler(SchedulerType.CONSECUTIVE, Collections.emptyList()); return; @@ -349,21 +358,26 @@ public CompletableFuture send(@NotNull ByteBuf body, @NotNull Map(), null, SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); - } else { - sendLogon(); } } @@ -1030,15 +1046,23 @@ public void onOpen(@NotNull IChannel channel) { sendLogon(); } + public void sendHeartbeatWithPossDup(boolean isPossDup) { + sendHeartbeatWithTestRequest(null, isPossDup); + } + + private void sendHeartbeatWithTestRequest(String testRequestID) { + sendHeartbeatWithTestRequest(testRequestID, false); + } + public void sendHeartbeat() { - sendHeartbeatWithTestRequest(null); + sendHeartbeatWithTestRequest(null, false); } - private void sendHeartbeatWithTestRequest(String testRequestId) { + private void sendHeartbeatWithTestRequest(String testRequestId, boolean possDup) { StringBuilder heartbeat = new StringBuilder(); int seqNum = msgSeqNum.incrementAndGet(); - setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum, null); + setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum, null, possDup); if(testRequestId != null) { heartbeat.append(TEST_REQ_ID).append(testRequestId); @@ -1057,8 +1081,12 @@ private void sendHeartbeatWithTestRequest(String testRequestId) { } public void sendTestRequest() { //do private + sendTestRequestWithPossDup(false); + } + + public void sendTestRequestWithPossDup(boolean isPossDup) { //do private StringBuilder testRequest = new StringBuilder(); - setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet(), null); + setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet(), null, isPossDup); testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet()); setChecksumAndBodyLength(testRequest); if (enabled.get()) { @@ -1115,14 +1143,22 @@ public void sendLogon() { .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } - private void sendLogout() { - sendLogout(null); + private void sendLogout(boolean isPossDup) { + sendLogout(null, isPossDup); } private void sendLogout(String text) { + sendLogout(text, false); + } + + private void sendLogout() { + sendLogout(null, false); + } + + private void sendLogout(String text, boolean isPossDup) { if (enabled.get()) { StringBuilder logout = new StringBuilder(); - setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null); + setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null, isPossDup); if(text != null) { logout.append(TEXT).append(text); } @@ -1582,6 +1618,22 @@ private void runLogonAfterLogonStrategy(RuleConfiguration configuration) { ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs()); } + private void runPossDupSessionMessages(RuleConfiguration configuration) { + Instant start = Instant.now(); + strategy.resetStrategyAndState(configuration); + ruleStartEvent(configuration.getRuleType(), strategy.getStartTime()); + if(!enabled.get()) { + ruleErrorEvent(strategy.getType(), String.format("Session %s isn't logged in.", channel.getSessionAlias()), null); + return; + } + + sendResendRequest(serverMsgSeqNum.get() - 2, serverMsgSeqNum.get(), true); + sendHeartbeatWithPossDup(true); + sendTestRequestWithPossDup(true); + sendLogout(true); + ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs()); + } + private void setupDisconnectStrategy(RuleConfiguration configuration) { strategy.resetStrategyAndState(configuration); strategy.updateSendStrategy(x -> {x.setSendPreprocessor(this::blockSend); return Unit.INSTANCE; }); @@ -1975,7 +2027,9 @@ private Consumer getSetupFunction(RuleConfiguration config) { case IGNORE_INCOMING_MESSAGES: return this::setupIgnoreIncomingMessagesStrategy; case DISCONNECT_WITH_RECONNECT: return this::setupDisconnectStrategy; case FAKE_RETRANSMISSION: return this::setupFakeRetransmissionStrategy; + case INVALID_CHECKSUM: return this::setupTransformMessageStrategy; case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy; + case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages; case DEFAULT: return configuration -> strategy.cleanupStrategy(); default: throw new IllegalStateException(String.format("Unknown strategy type %s.", config.getRuleType())); } @@ -2130,6 +2184,10 @@ private void waitLogoutResponse() { } private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time) { + setHeader(stringBuilder, msgType, seqNum, time, false); + } + + private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time, boolean isPossDup) { stringBuilder.append(BEGIN_STRING_TAG).append("=").append(settings.getBeginString()); stringBuilder.append(MSG_TYPE).append(msgType); stringBuilder.append(MSG_SEQ_NUM).append(seqNum); @@ -2142,6 +2200,9 @@ private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqN } else { stringBuilder.append(getTime()); } + if(isPossDup) { + stringBuilder.append(POSS_DUP).append(IS_POSS_DUP); + } } private void setChecksumAndBodyLength(StringBuilder stringBuilder) { diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 76ea112..b3d6c0a 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -80,6 +80,8 @@ public class FixHandlerSettings implements IHandlerSettings { */ private long connectionTimeoutOnSend = DEFAULT_CONNECTION_TIMEOUT_ON_SEND; + private long minConnectionTimeoutOnSend = 1_000; + @JsonDeserialize(using = DateTimeFormatterDeserializer.class) private DateTimeFormatter sendingDateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS"); @@ -338,4 +340,12 @@ public long getConnectionTimeoutOnSend() { public void setConnectionTimeoutOnSend(long connectionTimeoutOnSend) { this.connectionTimeoutOnSend = connectionTimeoutOnSend; } + + public long getMinConnectionTimeoutOnSend() { + return minConnectionTimeoutOnSend; + } + + public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) { + this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend; + } } diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt index adfc67f..633a605 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt @@ -54,6 +54,9 @@ data class RuleConfiguration( RuleType.TRANSFORM_MESSAGE_STRATEGY -> { require(transformMessageConfiguration != null) { "`transformMessageConfiguration` is required for $ruleType"} } + RuleType.INVALID_CHECKSUM -> { + require(transformMessageConfiguration != null) { "`transformMessageConfiguration` is required for $ruleType" } + } RuleType.BI_DIRECTIONAL_RESEND_REQUEST -> { require(missIncomingMessagesConfiguration != null) { "`blockIncomingMessagesConfiguration` is required for $ruleType" } require(missOutgoingMessagesConfiguration != null) { "`blockOutgoingMessagesConfiguration` is required for $ruleType" } @@ -79,6 +82,7 @@ data class RuleConfiguration( RuleType.DEFAULT -> {} RuleType.FAKE_RETRANSMISSION -> {} RuleType.LOGON_AFTER_LOGON -> {} + RuleType.POSS_DUP_SESSION_MESSAGES -> {} } } diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt index 1ea5311..5f18782 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt @@ -32,5 +32,7 @@ enum class RuleType { SPLIT_SEND, LOGON_AFTER_LOGON, FAKE_RETRANSMISSION, + INVALID_CHECKSUM, + POSS_DUP_SESSION_MESSAGES, DEFAULT } \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java b/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java index 4353258..5e81642 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java @@ -59,6 +59,7 @@ void sendTimeoutOnConnectionOpen() { settings.setPort(42); settings.setHost("localhost"); settings.setConnectionTimeoutOnSend(300); // 300 millis + settings.setMinConnectionTimeoutOnSend(100); Mockito.when(contextMock.getSettings()) .thenReturn(settings); var fixHandler = new FixHandler(contextMock); @@ -100,6 +101,7 @@ void sendTimeoutOnSessionEnabled() { settings.setPort(42); settings.setHost("localhost"); settings.setConnectionTimeoutOnSend(300); // 300 millis + settings.setMinConnectionTimeoutOnSend(100); LocalTime currentTime = LocalTime.now(ZoneOffset.UTC); int deltaMinutes = currentTime.isAfter(LocalTime.NOON) ? -1