diff --git a/README.md b/README.md index 030fc5f..63f319e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.2.0) +# th2-conn-dirty-fix (1.3.0) This microservice allows sending and receiving messages via FIX protocol @@ -334,7 +334,64 @@ spec: cpu: 20m ``` -# Changelog +# th2-conn-dirty-broken-fix (1.3.0) + +## Strategies (Rules) + +### Set Rate Limit `SET_RATE_LIMIT` + +This strategy ability to temporary change `RateLimit` value for the session. + +#### Workflow +1. Stops current session according to `gracefulDisconnect` option +2. Closes and destroys the current channel. This operation is required because `RateLimit` is set on channel level. +3. Creates new channel with rate limit is equal the `setRateLimitConfiguration.rateLimit` option in the rule configuration. +4. Opens the new channel and waits for Logon +5. Does nothing special operations during the strategy activity +6. Stops the new channel and recreates channel with general rate limit + +#### Snipped configuration +```yaml +apiVersion: th2.exactpro.com/v1 +kind: Th2Box +metadata: + name: fix-client +spec: + image-name: ghcr.io/th2-net/th2-conn-dirty-broken-fix + image-version: 1.3.0 + type: th2-conn + custom-config: + maxBatchSize: 1000 + maxFlushTime: 1000 + batchByGroup: true + publishSentEvents: true + publishConnectEvents: true + sessions: + - sessionAlias: client + handler: + rateLimit: 2147483647 + brokenConnConfiguration: + rules: + - ruleType: SET_RATE_LIMIT + cleanUpDuration: 20S + setRateLimitConfiguration: + rateLimit: 10 +``` + +# Changelog (th2-conn-dirty-broken-fix) + +## 1.3.0 + +* Added `SET_RATE_LIMIT` rule. +* Updated common: `5.8.0-dev` +* Updated common-utils: `2.2.2-dev` + +## 1.2.1 + +* Property `th2.broken.strategy` is added to metadata to each message when a strategy is active +* Updated conn-dirty-tcp-core: `3.4.0-dev` + +# Changelog (th2-conn-dirty-fix) ## 1.5.1 @@ -404,18 +461,9 @@ spec: ## 1.0.0 * Bump `conn-dirty-tcp-core` to `3.0.0` for books and pages support -<<<<<<< HEAD -<<<<<<< HEAD -======= -## 0.3.0 -* Ability to recover messages from cradle. - ->>>>>>> original/dev-version-1 -======= ## 0.3.0 * Ability to recover messages from cradle. ->>>>>>> original/dev-version-1 ## 0.2.0 * optional state reset on silent server reset. diff --git a/build.gradle b/build.gradle index bb334cf..f733701 100644 --- a/build.gradle +++ b/build.gradle @@ -47,10 +47,10 @@ repositories { dependencies { api platform('com.exactpro.th2:bom:4.5.0') - implementation("com.exactpro.th2:common:5.4.0-dev") { + implementation("com.exactpro.th2:common:5.8.0-dev") { exclude group: 'com.exactpro.th2', module: 'task-utils' } - implementation group: 'com.exactpro.th2', name: 'common-utils', version: '2.2.1-dev' + implementation group: 'com.exactpro.th2', name: 'common-utils', version: '2.2.2-dev' implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' implementation 'net.lingala.zip4j:zip4j:2.11.5' @@ -58,7 +58,7 @@ dependencies { implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2' - implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.3.0-TH2-5001+' + implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.4.0-dev' implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-dev' implementation 'org.slf4j:slf4j-api' @@ -77,6 +77,7 @@ dependencies { testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.8.10' testImplementation 'org.junit.jupiter:junit-jupiter-params:5.9.3' testImplementation 'org.mockito.kotlin:mockito-kotlin:4.1.0' + testImplementation 'org.awaitility:awaitility:4.2.0' annotationProcessor 'com.google.auto.service:auto-service:1.0.1' kapt 'com.google.auto.service:auto-service:1.0.1' diff --git a/gradle.properties b/gradle.properties index ed95b7e..c3996f9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version=1.2.0 \ No newline at end of file +release_version=1.3.0 \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index f4e4224..12883a4 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,2 @@ -rootProject.name = 'conn-dirty-fix' +rootProject.name = 'conn-dirty-broken-fix' diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index ca80f1e..e3e87d5 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,7 +76,6 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -96,12 +95,11 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Collectors; -import javax.net.SocketFactory; + import kotlin.Unit; import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.checkerframework.checker.units.qual.A; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -498,11 +496,13 @@ public ByteBuf onReceive(@NotNull IChannel channel, @NotNull ByteBuf buffer) { @NotNull @Override - public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, MessageID messageId) { + public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull MessageID messageId) { Map metadata = new HashMap<>(); + StrategyState state = strategy.getState(); + state.enrichProperties(metadata); if(strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getIncomingMessagesPreprocessor).process(message, metadata) != null) { - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); return metadata; } @@ -536,7 +536,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu String msgTypeValue = requireNonNull(msgType.getValue()); if(msgTypeValue.equals(MSG_TYPE_LOGOUT)) { serverMsgSeqNum.incrementAndGet(); - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); strategy.getIncomingMessageStrategy(x -> x.getLogoutStrategy()).process(message, metadata); return metadata; } @@ -579,7 +579,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu handleHeartbeat(message); break; case MSG_TYPE_LOGON: - strategy.getState().addMessageID(messageId);strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); Map logonMetadata = strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getLogonStrategy).process(message, metadata); if (logonMetadata != null) return logonMetadata; if(serverMsgSeqNum.get() < receivedMsgSeqNum && !isDup && !enabled.get()) { @@ -589,17 +589,17 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu } break; case MSG_TYPE_RESEND_REQUEST: - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); if (LOGGER.isInfoEnabled()) LOGGER.info("Resend request received - {}", message.toString(US_ASCII)); handleResendRequest(message); break; case MSG_TYPE_SEQUENCE_RESET: //gap fill - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII)); resetSequence(message); break; case MSG_TYPE_TEST_REQUEST: - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); if(LOGGER.isInfoEnabled()) LOGGER.info("Test request received - {}", message.toString(US_ASCII)); if(strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getTestRequestProcessor).process(message, metadata) != null) { return metadata; @@ -607,7 +607,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu break; default: if(isDup) { - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); } if(LOGGER.isInfoEnabled()) LOGGER.info("Received message - {}", message.toString(US_ASCII)); } @@ -789,7 +789,10 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo, boolean isPossDup) { resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); resendRequest.append(END_SEQ_NO).append(endSeqNo); setChecksumAndBodyLength(resendRequest); - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); } @@ -802,7 +805,10 @@ void sendResendRequest(int beginSeqNo) { //do private setChecksumAndBodyLength(resendRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); } @@ -869,7 +875,10 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi if(sequence - 1 != lastProcessedSequence.get() ) { StringBuilder sequenceReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.MANGLE); resetHeartbeatTask(); } @@ -878,7 +887,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi updateLength(buf); updateChecksum(buf); if(!skip.get()) { - channel.send(buf, new HashMap(), null, SendMode.MANGLE) + channel.send(buf, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { Thread.sleep(settings.getRecoverySendIntervalMs()); @@ -894,7 +903,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi if(!skip.get() && recoveryConfig.getOutOfOrder()) { skip.set(true); - channel.send(skipped.get(), new HashMap(), null, SendMode.MANGLE) + channel.send(skipped.get(), strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { Thread.sleep(settings.getRecoverySendIntervalMs()); @@ -920,7 +929,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), - new HashMap(), null, SendMode.MANGLE + strategy.getState().enrichProperties(), null, SendMode.MANGLE ).thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } } else { @@ -928,7 +937,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), - new HashMap(), null, SendMode.MANGLE + strategy.getState().enrichProperties(), null, SendMode.MANGLE ); } resetHeartbeatTask(); @@ -938,8 +947,10 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi String seqReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); channel.send( - Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), - new HashMap(), null, SendMode.MANGLE + Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.MANGLE ); } } @@ -978,6 +989,7 @@ private boolean checkLogon(ByteBuf message) { @Override public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map metadata) { + strategy.getState().enrichProperties(metadata); strategy.getOutgoingMessageStrategy(OutgoingMessagesStrategy::getOutgoingMessageProcessor).process(message, metadata); if (LOGGER.isInfoEnabled()) LOGGER.info("Outgoing message: {}", message.toString(US_ASCII)); @@ -1123,7 +1135,10 @@ private void sendHeartbeatWithTestRequest(String testRequestId, boolean possDup) if (enabled.get()) { LOGGER.info("Send Heartbeat to server - {}", heartbeat); - channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE); + channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE); resetHeartbeatTask(); } else { @@ -1141,7 +1156,10 @@ public void sendTestRequestWithPossDup(boolean isPossDup) { //do private testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet()); setChecksumAndBodyLength(testRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); LOGGER.info("Send TestRequest to server - {}", testRequest); resetTestRequestTask(); @@ -1176,7 +1194,10 @@ public void sendLogon() { StringBuilder logon = buildLogon(props); LOGGER.info("Send logon - {}", logon); - channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), props, null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(props), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } @@ -1250,7 +1271,7 @@ private void sendLogout(String text, boolean isPossDup) { try { MessageID messageID = channel.send( Unpooled.wrappedBuffer(logout.toString().getBytes(StandardCharsets.UTF_8)), - new HashMap(), + strategy.getState().enrichProperties(), null, SendMode.HANDLE_AND_MANGLE ).get(); @@ -1302,9 +1323,11 @@ public void close() { // defaultSend(IChannel channel, ByteBuf message, Map properties, EventID eventID) { - CompletableFuture messageId = channel.send(message, properties, eventID, SendMode.HANDLE_AND_MANGLE); - return messageId; + private CompletableFuture defaultSend(IChannel channel, + ByteBuf message, + Map properties, + EventID eventID) { + return channel.send(message, strategy.getState().enrichProperties(properties), eventID, SendMode.HANDLE_AND_MANGLE); } private CompletableFuture bulkSend(IChannel channel, ByteBuf message, Map properties, EventID eventID) { @@ -1316,7 +1339,10 @@ private CompletableFuture bulkSend(IChannel channel, ByteBuf message, strategyState.updateCacheAndRunOnCondition(message, x -> x >= config.getBatchSize(), buffer -> { try { LOGGER.info("Sending batch of size: {}", config.getBatchSize()); - channel.send(asExpandable(buffer), properties, eventID, SendMode.DIRECT) + channel.send(asExpandable(buffer), + strategy.getState().enrichProperties(properties), + eventID, + SendMode.DIRECT) .thenAcceptAsync(strategyState::addMessageID, executorService); } catch (Exception e) { LOGGER.error("Error while sending batch.", e); @@ -1349,7 +1375,10 @@ private CompletableFuture splitSend(IChannel channel, ByteBuf message } catch (InterruptedException e) { LOGGER.error("Error while sending messages in different tcp packets."); } - channel.send(asExpandable(slice), metadata, eventID, SendMode.DIRECT_SOCKET); + channel.send(asExpandable(slice), + strategy.getState().enrichProperties(metadata), + eventID, + SendMode.DIRECT_SOCKET); resetHeartbeatTask(); sendingTimes.add(Instant.now()); } @@ -1358,7 +1387,10 @@ private CompletableFuture splitSend(IChannel channel, ByteBuf message String slicesTimestamps = sendingTimes.stream().map(formatter::format).collect(Collectors.joining(",")); metadata.put(SPLIT_SEND_TIMESTAMPS_PROPERTY, slicesTimestamps); LOGGER.info("Sent message by slices: {}", slicesTimestamps); - CompletableFuture messageID = channel.send(asExpandable(message), metadata, eventID, SendMode.DIRECT_MSTORE); + CompletableFuture messageID = channel.send(asExpandable(message), + strategy.getState().enrichProperties(metadata), + eventID, + SendMode.DIRECT_MQ); messageID.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); return messageID; } @@ -1739,7 +1771,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) { props.put("sentUsingAnotherSocket", "True"); ByteBuf logonBuf = Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)); - channel.send(logonBuf, props, null, SendMode.DIRECT_MSTORE) + channel.send(logonBuf, strategy.getState().enrichProperties(props), null, SendMode.DIRECT_MQ) .thenAcceptAsync(x -> { strategy.getState().addMessageID(x); }, executorService); @@ -1797,9 +1829,40 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) { ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs(), additionalDetails); } + private void setupSetRateLimit(RuleConfiguration configuration) { + strategy.resetStrategyAndState(configuration); + strategy.setCleanupHandler(this::cleanupSetRateLimit); + ruleStartEvent(configuration.getRuleType(), strategy.getStartTime()); + try { + disconnect(configuration.getGracefulDisconnect()); + context.destroyChannel(channel); + channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, configuration.getSetRateLimitConfiguration().getRateLimit()); + openChannelAndWaitForLogon(); + } catch (Exception e) { + String message = String.format("Error while setting up %s", strategy.getType()); + LOGGER.error(message, e); + context.send(CommonUtil.toErrorEvent(message, e), strategyRootEvent); + } + } + + private void cleanupSetRateLimit() { + var state = strategy.getState(); + try { + disconnect(strategy.getGracefulDisconnect()); + context.destroyChannel(channel); + channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, settings.getRateLimit()); + openChannelAndWaitForLogon(); + } catch (Exception e) { + String message = String.format("Error while cleaning up %s", strategy.getType()); + LOGGER.error(message, e); + context.send(CommonUtil.toErrorEvent(message, e), strategyRootEvent); + } + ruleEndEvent(strategy.getType(), state.getStartTime(), strategy.getState().getMessageIDs()); + } + private void setupDisconnectStrategy(RuleConfiguration configuration) { strategy.resetStrategyAndState(configuration); - strategy.updateSendStrategy(x -> {x.setSendPreprocessor(this::blockSend); return Unit.INSTANCE; }); + strategy.updateSendStrategy(x -> { x.setSendPreprocessor(this::blockSend); return Unit.INSTANCE; }); strategy.setCleanupHandler(this::cleanupDisconnectStrategy); ruleStartEvent(configuration.getRuleType(), strategy.getStartTime()); try { @@ -2099,7 +2162,10 @@ private void sendSequenceReset(RuleConfiguration configuration) { } setChecksumAndBodyLength(sequenceReset); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); strategy.cleanupStrategy(); @@ -2118,7 +2184,7 @@ private void cleanupBatchSendStrategy() { strategy.updateSendStrategy(x -> { state.executeOnBatchCacheIfCondition(size -> size > 0, message -> { try { - channel.send(message, new HashMap(), null, SendMode.DIRECT) + channel.send(message, strategy.getState().enrichProperties(), null, SendMode.DIRECT) .thenAcceptAsync(messageID -> strategy.getState().addMessageID(messageID), executorService); } catch (Exception e) { LOGGER.error("Error while sending batch.", e); @@ -2218,6 +2284,7 @@ private Consumer getSetupFunction(RuleConfiguration config) { case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy; case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages; case LOGON_FROM_ANOTHER_CONNECTION: return this::runLogonFromAnotherConnection; + case SET_RATE_LIMIT: return this::setupSetRateLimit; case DEFAULT: return configuration -> strategy.cleanupStrategy(); default: throw new IllegalStateException(String.format("Unknown strategy type %s.", config.getRuleType())); } @@ -2252,7 +2319,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { channel.send( Unpooled.wrappedBuffer(seqReset.toString().getBytes(StandardCharsets.UTF_8)), - new HashMap(), null, SendMode.MANGLE + strategy.getState().enrichProperties(), null, SendMode.MANGLE ).thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } else { FixField possDup = findField(missedMessage, POSS_DUP_TAG); @@ -2271,7 +2338,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII)); if(!skip) { - channel.send(missedMessage, new HashMap(), null, SendMode.MANGLE) + channel.send(missedMessage, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { Thread.sleep(settings.getRecoverySendIntervalMs()); @@ -2286,7 +2353,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { } if(!skip && recoveryConfig.getOutOfOrder()) { - channel.send(skipped, new HashMap(), null, SendMode.MANGLE) + channel.send(skipped, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { Thread.sleep(settings.getRecoverySendIntervalMs()); diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RecoveryConfig.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RecoveryConfig.kt index 8f44e8f..62008a4 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RecoveryConfig.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RecoveryConfig.kt @@ -1,3 +1,18 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.exactpro.th2.conn.dirty.fix.brokenconn.configuration data class RecoveryConfig( diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt index ca8b0d4..89dd0f9 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt @@ -40,51 +40,55 @@ data class RuleConfiguration( val splitSendConfiguration: SplitSendConfiguration? = null, val changeSequenceConfiguration: ChangeSequenceConfiguration? = null, val resendRequestConfiguration: ResendRequestConfiguration? = null, - val sendSequenceResetConfiguration: SendSequenceResetConfiguration? = null + val sendSequenceResetConfiguration: SendSequenceResetConfiguration? = null, + val setRateLimitConfiguration: SetRateLimitConfiguration? = null, ) { init { when(ruleType) { RuleType.DISCONNECT_WITH_RECONNECT -> {} RuleType.IGNORE_INCOMING_MESSAGES -> { - require(missIncomingMessagesConfiguration != null) { "`blockIncomingMessagesConfiguration` is required for $ruleType" } + requireNotNull(missIncomingMessagesConfiguration) { "`blockIncomingMessagesConfiguration` is required for $ruleType" } } RuleType.SEND_SEQUENCE_RESET -> {} RuleType.TRANSFORM_LOGON -> { - require(transformMessageConfiguration != null) { "`transformMessageConfiguration` is required for $ruleType"} + requireNotNull(transformMessageConfiguration) { "`transformMessageConfiguration` is required for $ruleType"} } RuleType.TRANSFORM_MESSAGE_STRATEGY -> { - require(transformMessageConfiguration != null) { "`transformMessageConfiguration` is required for $ruleType"} + requireNotNull(transformMessageConfiguration) { "`transformMessageConfiguration` is required for $ruleType"} } RuleType.INVALID_CHECKSUM -> { - require(transformMessageConfiguration != null) { "`transformMessageConfiguration` is required for $ruleType" } + requireNotNull(transformMessageConfiguration) { "`transformMessageConfiguration` is required for $ruleType" } } RuleType.BI_DIRECTIONAL_RESEND_REQUEST -> { - require(missIncomingMessagesConfiguration != null) { "`blockIncomingMessagesConfiguration` is required for $ruleType" } - require(missOutgoingMessagesConfiguration != null) { "`blockOutgoingMessagesConfiguration` is required for $ruleType" } + requireNotNull(missIncomingMessagesConfiguration) { "`blockIncomingMessagesConfiguration` is required for $ruleType" } + requireNotNull(missOutgoingMessagesConfiguration) { "`blockOutgoingMessagesConfiguration` is required for $ruleType" } } RuleType.CREATE_OUTGOING_GAP -> { - require(missOutgoingMessagesConfiguration != null) { "`blockOutgoingMessagesConfiguration` is required for $ruleType" } + requireNotNull(missOutgoingMessagesConfiguration) { "`blockOutgoingMessagesConfiguration` is required for $ruleType" } } RuleType.CLIENT_OUTAGE -> {} RuleType.PARTIAL_CLIENT_OUTAGE -> {} RuleType.RESEND_REQUEST -> { - require(resendRequestConfiguration != null) { "`resendRequestConfiguration` is required for $ruleType" } + requireNotNull(resendRequestConfiguration) { "`resendRequestConfiguration` is required for $ruleType" } } RuleType.SLOW_CONSUMER -> {} RuleType.SEQUENCE_RESET -> { - require(changeSequenceConfiguration != null) { "`changeSequenceConfiguration` is required for $ruleType" } + requireNotNull(changeSequenceConfiguration) { "`changeSequenceConfiguration` is required for $ruleType" } } RuleType.BATCH_SEND -> { - require(batchSendConfiguration != null) { "`batchSendConfiguration` is required for $ruleType" } + requireNotNull(batchSendConfiguration) { "`batchSendConfiguration` is required for $ruleType" } } RuleType.SPLIT_SEND -> { - require(splitSendConfiguration != null) { "`splitSendConfiguration` is required for $ruleType" } + requireNotNull(splitSendConfiguration) { "`splitSendConfiguration` is required for $ruleType" } } RuleType.DEFAULT -> {} RuleType.FAKE_RETRANSMISSION -> {} RuleType.LOGON_AFTER_LOGON -> {} RuleType.POSS_DUP_SESSION_MESSAGES -> {} RuleType.LOGON_FROM_ANOTHER_CONNECTION -> {} + RuleType.SET_RATE_LIMIT -> { + requireNotNull(setRateLimitConfiguration) { "`setRateLimitConfiguration` is required for $ruleType" } + } } } diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/SendSequenceResetConfiguration.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/SendSequenceResetConfiguration.kt index 5b8565d..9e925ba 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/SendSequenceResetConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/SendSequenceResetConfiguration.kt @@ -1,3 +1,18 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.exactpro.th2.conn.dirty.fix.brokenconn.configuration data class SendSequenceResetConfiguration(val changeUp: Boolean) \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/SetRateLimitConfiguration.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/SetRateLimitConfiguration.kt new file mode 100644 index 0000000..1294f14 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/SetRateLimitConfiguration.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix.brokenconn.configuration + +data class SetRateLimitConfiguration(val rateLimit: Int) { + init { + require(rateLimit > 0) { + "Rate limit '$rateLimit' must be positive" + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt index 5b33e87..6295614 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,5 +35,6 @@ enum class RuleType { INVALID_CHECKSUM, POSS_DUP_SESSION_MESSAGES, LOGON_FROM_ANOTHER_CONNECTION, + SET_RATE_LIMIT, DEFAULT } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt index 129f9ae..f28ccf4 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,19 +16,18 @@ package com.exactpro.th2.conn.dirty.fix.brokenconn.strategy import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.common.message.toJson import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration import com.exactpro.th2.netty.bytebuf.util.asExpandable -import com.google.protobuf.TextFormat.shortDebugString import io.netty.buffer.ByteBuf import io.netty.buffer.CompositeByteBuf import io.netty.buffer.Unpooled +import mu.KotlinLogging import java.time.Instant -import java.util.Collections import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.read import kotlin.concurrent.write -import mu.KotlinLogging class StrategyState(val config: RuleConfiguration? = null, private val missedMessagesCache: ConcurrentHashMap = ConcurrentHashMap() @@ -36,7 +35,7 @@ class StrategyState(val config: RuleConfiguration? = null, val startTime: Instant = Instant.now() val type = config?.ruleType ?: RuleType.DEFAULT private val batchMessageCache: CompositeByteBuf = Unpooled.compositeBuffer() - private val messageIDs: MutableList = ArrayList() + private val messageIDs: MutableList = ArrayList() private val lock = ReentrantReadWriteLock() private var batchMessageCacheSize = 0 @@ -102,9 +101,24 @@ class StrategyState(val config: RuleConfiguration? = null, } } + @JvmOverloads + fun enrichProperties(properties: MutableMap? = null): MutableMap { + if (type != RuleType.DEFAULT) { + return properties?.apply { + val previous = put(STRATEGY_PROPERTY, type.name) + when (previous) { + null -> { /* do noting */ } + type.name -> K_LOGGER.debug { "Strategy name $type is already set" } + else -> K_LOGGER.warn { "Strategy name $properties has been replaced to $type" } + } + } ?: hashMapOf(STRATEGY_PROPERTY to type.name) + } + return properties ?: hashMapOf() + } + fun addMessageID(messageID: MessageID?) = lock.write { if (messageIDs.size + 1 >= TOO_BIG_MESSAGE_IDS_LIST) { - K_LOGGER.warn { "Strategy ${type} messageIDs list is too big. Skiping messageID: ${shortDebugString(messageID)}" } + K_LOGGER.warn { "Strategy $type messageIDs list is too big. Skipping messageID: ${messageID?.toJson()}" } } messageID?.let { messageIDs.add(it) } } @@ -114,7 +128,8 @@ class StrategyState(val config: RuleConfiguration? = null, } companion object { - private const val TOO_BIG_MESSAGE_IDS_LIST = 300; + private const val STRATEGY_PROPERTY: String = "th2.broken.strategy" + private const val TOO_BIG_MESSAGE_IDS_LIST = 300 private val K_LOGGER = KotlinLogging.logger { } fun StrategyState.resetAndCopyMissedMessages(ruleConfiguration: RuleConfiguration? = null): StrategyState = StrategyState(ruleConfiguration, this.missedMessagesCache) diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt index 1387d10..80fb687 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,18 +22,19 @@ import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.MessageID import com.exactpro.th2.common.utils.message.toTimestamp import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BatchSendConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BlockMessageConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BrokenConnConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ChangeSequenceConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.MissMessageConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ResendRequestConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SetRateLimitConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SplitSendConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformMessageConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformationConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.RuleType import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.SchedulerType import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext import com.exactpro.th2.constants.Constants import com.exactpro.th2.netty.bytebuf.util.asExpandable @@ -41,30 +42,37 @@ import com.exactpro.th2.netty.bytebuf.util.contains import com.exactpro.th2.netty.bytebuf.util.isEmpty import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled -import java.time.Duration -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Collections -import java.util.concurrent.CompletableFuture -import java.util.concurrent.atomic.AtomicInteger -import java.util.regex.Pattern -import kotlin.test.assertEquals -import kotlin.test.assertTrue -import mu.KotlinLogging +import org.awaitility.Awaitility.await import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout import org.mockito.kotlin.any import org.mockito.kotlin.anyOrNull import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.clearInvocations import org.mockito.kotlin.doAnswer +import org.mockito.kotlin.eq +import org.mockito.kotlin.inOrder import org.mockito.kotlin.mock +import org.mockito.kotlin.same import org.mockito.kotlin.timeout -import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.time.temporal.ChronoUnit.SECONDS +import java.util.Collections +import java.util.Queue +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletableFuture.completedFuture +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.regex.Pattern +import kotlin.test.assertEquals +import kotlin.test.assertTrue -@Disabled class TestStrategies { private class TestContext( @@ -74,10 +82,11 @@ class TestStrategies { ) @Test + @Disabled("update is needed") fun testDisconnectStrategy() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(5, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(5, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val testContext = createTestContext(BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( @@ -114,14 +123,14 @@ class TestStrategies { @Test fun testIgnoreIncomingMessagesStrategyWithNextExpectedSequenceNumber() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext(BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, ChronoUnit.SECONDS), cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, SECONDS), cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.IGNORE_INCOMING_MESSAGES, duration = businessRuleDuration, @@ -162,16 +171,17 @@ class TestStrategies { } @Test + @Disabled("update is needed") fun testIgnoreIncomingMessagesStrategyResendRequest() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext(BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, ChronoUnit.SECONDS), cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, SECONDS), cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.IGNORE_INCOMING_MESSAGES, duration = businessRuleDuration, @@ -214,18 +224,19 @@ class TestStrategies { } @Test + @Disabled("update is needed") fun testTransformLogonMessagesStrategy() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext(BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, ChronoUnit.SECONDS), cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, SECONDS), cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.TRANSFORM_LOGON, duration = businessRuleDuration, - cleanUpDuration = Duration.of(2, ChronoUnit.SECONDS), + cleanUpDuration = Duration.of(2, SECONDS), transformMessageConfiguration = TransformMessageConfiguration( listOf( TransformationConfiguration( @@ -292,17 +303,18 @@ class TestStrategies { } @Test + @Disabled("update is needed") fun testBidirectionalResendRequest() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext(BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, ChronoUnit.SECONDS), cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, SECONDS), cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.BI_DIRECTIONAL_RESEND_REQUEST, - duration = Duration.of(6, ChronoUnit.SECONDS), - cleanUpDuration = Duration.of(2, ChronoUnit.SECONDS), + duration = Duration.of(6, SECONDS), + cleanUpDuration = Duration.of(2, SECONDS), missIncomingMessagesConfiguration = MissMessageConfiguration(2), missOutgoingMessagesConfiguration = MissMessageConfiguration(2) ), @@ -357,15 +369,16 @@ class TestStrategies { } @Test + @Disabled("update is needed") fun testOutgoingGap() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(3, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(3, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext(BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(2, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(2, SECONDS)), RuleConfiguration( RuleType.CREATE_OUTGOING_GAP, duration = businessRuleDuration, @@ -419,15 +432,15 @@ class TestStrategies { @Test fun testClientOutage() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext( BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.CLIENT_OUTAGE, duration = businessRuleDuration, @@ -467,15 +480,15 @@ class TestStrategies { @Test fun testPartialClientOutage() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext( BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.PARTIAL_CLIENT_OUTAGE, duration = businessRuleDuration, @@ -516,16 +529,17 @@ class TestStrategies { } @Test + @Disabled("update is needed") fun testSequenceResetStrategyOutgoing() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext( BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.SEQUENCE_RESET, duration = businessRuleDuration, @@ -560,15 +574,15 @@ class TestStrategies { @Test fun testResendRequestStrategy() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext( BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.RESEND_REQUEST, duration = businessRuleDuration, @@ -605,15 +619,15 @@ class TestStrategies { @Test fun testBatchSend() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext( BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.BATCH_SEND, duration = businessRuleDuration, @@ -659,15 +673,15 @@ class TestStrategies { @Test fun testSplitSend() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext( BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), + RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), RuleConfiguration( RuleType.SPLIT_SEND, duration = businessRuleDuration, @@ -713,6 +727,89 @@ class TestStrategies { channel.close() } + @Test + @Timeout(10) + fun testSetRateLimitStrategy() { + val settings = createHandlerSettings( + BrokenConnConfiguration( + rules = listOf( + RuleConfiguration( + RuleType.SET_RATE_LIMIT, + duration = Duration.of(1, SECONDS), + cleanUpDuration = Duration.of(1, SECONDS), + setRateLimitConfiguration = SetRateLimitConfiguration(1), + ), + ) + ), + Int.MAX_VALUE, + false, + ) + val context: IHandlerContext = mock { + on { this.settings }.thenReturn(settings) + } + + val handler: IHandler = FixHandler(context) + val channelQueue: Queue = ArrayBlockingQueue(5) + + whenever(context.createChannel(any(), any(), any(), any(), any(), any())).doAnswer { + val isOpen = AtomicBoolean() + mock { + on { this.isOpen }.doAnswer { isOpen.get() } + on { open() }.doAnswer { + check(isOpen.compareAndSet(false, true)) { + "session isn't closed" + } + handler.onOpen(it.mock as IChannel) + completedFuture(Unit) + } + on { close() }.doAnswer { + check(isOpen.compareAndSet(true, false)) { + "session isn't opened" + } + handler.onClose(it.mock as IChannel) + completedFuture(Unit) + } + on { send(any(), any(), anyOrNull(), any()) }.doAnswer { + check(isOpen.get()) { + "session must be open" + } + println("send ${it.mock}") + val byteBuf = it.arguments[0] as ByteBuf + when { + byteBuf.contains("35=A\u0001") -> { + handler.onIncoming(it.mock as IChannel, logonResponse(1), getMessageId()) + } + } + completedFuture(getMessageId()) + } + }.also(channelQueue::add) + .also(::println) + } + + handler.onStart() + + verify(context, timeout(10_000).times(3)).createChannel(any(), any(), any(), any(), any(), any()) + await().until { channelQueue.size == 3 } + val channel1 = channelQueue.poll() + val channel2 = channelQueue.poll() + val channel3 = channelQueue.poll() + inOrder(context, channel1, channel2, channel3).apply { + verify(context).createChannel(any(), any(), any(), any(), any(), eq(Int.MAX_VALUE)) + verify(channel1).open() + verify(channel1).send(any(), any(), anyOrNull(), any()) + verify(channel1).close() + verify(context).destroyChannel(same(channel1)) + verify(context).createChannel(any(), any(), any(), any(), any(), eq(1)) + verify(channel2).open() + verify(channel2).send(any(), any(), anyOrNull(), any()) + verify(channel2).close() + verify(context).destroyChannel(same(channel2)) + verify(context).createChannel(any(), any(), any(), any(), any(), eq(Int.MAX_VALUE)) + verify(channel3).open() + verify(channel3).send(any(), any(), anyOrNull(), any()) + } + } + private fun createTestContext( strategyConfig: BrokenConnConfiguration, hbtInt: Int = 30, @@ -726,7 +823,7 @@ class TestStrategies { on { createChannel(any(), any(), any(), any(), any(), any(), any()) }.thenReturn(channel) } - var incomingSequence = AtomicInteger(0) + val incomingSequence = AtomicInteger(0) var outgoingSequence = 0 val handler = FixHandler(context) @@ -747,7 +844,7 @@ class TestStrategies { handler.onIncoming(channel, logonResponse(incomingSequence.incrementAndGet()), getMessageId()) } } - CompletableFuture.completedFuture(getMessageId()) + completedFuture(getMessageId()) } var isOpen = false whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer {