diff --git a/README.md b/README.md index 8c986e7..542c012 100644 --- a/README.md +++ b/README.md @@ -333,6 +333,18 @@ spec: memory: 100Mi cpu: 20m ``` +## 1.3.0 + +* Migrated to th2 gradle plugin `0.1.1` +* Updated: + * bom: `4.6.1` + * common: `5.13.1-dev` + * common-utils: `2.2.3-dev` + * conn-dirty-tcp-core: `3.6.0-dev` + * grpc-lw-data-provider: `2.3.1-dev` + * httpclient5: `5.3.1` + * auto-service: `1.1.1` + * kotlin-logging: `3.0.5` ## 1.3.0 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index a84a3cf..9d32b50 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -59,6 +59,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import kotlin.Unit; +import kotlin.jvm.functions.Function1; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -97,15 +106,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import kotlin.Unit; -import kotlin.jvm.functions.Function1; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static com.exactpro.th2.common.event.EventUtils.createMessageBean; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField; @@ -182,7 +182,6 @@ import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.asExpandable; import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.indexOf; import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.isEmpty; -import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.startsWith; import static com.exactpro.th2.util.MessageUtil.findByte; import static com.exactpro.th2.util.MessageUtil.getBodyLength; import static com.exactpro.th2.util.MessageUtil.getChecksum; @@ -393,11 +392,11 @@ public CompletableFuture send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map deadline) { // The method should have checked exception in signature... - ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls", + ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls", settings.getConnectionTimeoutOnSend()))); } } @@ -427,7 +426,7 @@ public CompletableFuture send(@NotNull ByteBuf body, @NotNull Map deadline) { // The method should have checked exception in signature... - ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls", + ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls", settings.getConnectionTimeoutOnSend()))); } } @@ -497,7 +496,7 @@ public ByteBuf onReceive(@NotNull IChannel channel, @NotNull ByteBuf buffer) { @NotNull @Override - public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, MessageID messageId) { + public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull MessageID messageId) { Map metadata = new HashMap<>(); StrategyState state = strategy.getState(); @@ -538,7 +537,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu if(msgTypeValue.equals(MSG_TYPE_LOGOUT)) { serverMsgSeqNum.incrementAndGet(); state.addMessageID(messageId); - strategy.getIncomingMessageStrategy(x -> x.getLogoutStrategy()).process(message, metadata); + strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getLogoutStrategy).process(message, metadata); return metadata; } @@ -760,11 +759,7 @@ private void resetSequence(ByteBuf message) { } else { int newSeqNo = Integer.parseInt(requireNonNull(seqNumValue.getValue())); serverMsgSeqNum.updateAndGet(sequence -> { - if(sequence < newSeqNo - 1) { - return newSeqNo - 1; - } else { - return sequence; - } + return Math.max(sequence, newSeqNo - 1); }); } } @@ -851,7 +846,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi } AtomicBoolean skip = new AtomicBoolean(recoveryConfig.getOutOfOrder()); - AtomicReference skipped = new AtomicReference(null); + AtomicReference skipped = new AtomicReference<>(null); int endSeq = endSeqNo; LOGGER.info("Loading messages from {} to {}", beginSeqNo, endSeqNo); @@ -1608,7 +1603,7 @@ private Map gapFillSequenceReset(ByteBuf message, Map missOutgoingMessages(ByteBuf message, Map x <= countToMiss)) { - return null; + if(strategyState.addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> x <= countToMiss)) { + message.clear(); + } + if(strategy.getAllowMessagesBeforeRetransmissionFinishes() + && Duration.between(strategy.getStartTime(), Instant.now()).compareTo(strategy.getConfig().getDuration()) > 0 ) { + strategy.disableAllowMessagesBeforeRetransmissionFinishes("after " + strategy.getConfig().getDuration() + " strategy duration"); } - - message.clear(); return null; } @@ -1784,9 +1781,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) { ByteBuf logonBuf = Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)); channel.send(logonBuf, strategy.getState().enrichProperties(props), null, SendMode.DIRECT_MQ) - .thenAcceptAsync(x -> { - strategy.getState().addMessageID(x); - }, executorService); + .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); boolean logonSent = false; boolean responseReceived = true; @@ -1795,7 +1790,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) { try( Socket socket = new Socket(address.getAddress(), address.getPort()); DataOutputStream dOut = new DataOutputStream(socket.getOutputStream()); - DataInputStream dIn = new DataInputStream(socket.getInputStream()); + DataInputStream dIn = new DataInputStream(socket.getInputStream()) ){ socket.setSoTimeout(5000); @@ -1829,7 +1824,8 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) { LOGGER.info("Waiting for 5 seconds to check if main session will be disconnected."); Thread.sleep(5000); } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error("Interrupted", e); + Thread.currentThread().interrupt(); } HashMap additionalDetails = new HashMap<>(); @@ -2240,7 +2236,7 @@ private void applyNextStrategy() { ruleErrorEvent(nextStrategyConfig.getRuleType(), null, e); } - LOGGER.info("Next strategy applied: {}", nextStrategyConfig.getRuleType()); + LOGGER.info("Next strategy applied: {}, duration: {}", nextStrategyConfig.getRuleType(), nextStrategyConfig.getDuration()); executorService.schedule(this::applyNextStrategy, nextStrategyConfig.getDuration().toMillis(), TimeUnit.MILLISECONDS); } diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/PasswordManager.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/PasswordManager.kt index 9681ba0..4e01d45 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/PasswordManager.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/PasswordManager.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package com.exactpro.th2.conn.dirty.fix import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue import java.io.BufferedReader import java.io.ByteArrayInputStream import java.io.InputStreamReader @@ -65,15 +66,14 @@ class PasswordManager( K_LOGGER.error { "Error while pulling passwords: ${response.code}" } return@execute } - val responseMap: Map = - OBJECT_MAPPER.readValue(response.entity.content, Map::class.java) as Map + val responseMap: Map = OBJECT_MAPPER.readValue(response.entity.content) - val content = responseMap[CONTENT_PROPERTY] + val propContent = responseMap[CONTENT_PROPERTY] ?: error("Error while polling new passwords. No $CONTENT_PROPERTY in response.") val zipPassword = responseMap[PASSWORD_PROPERTY]?.toCharArray() ?: error("Error while polling new passwords. No $PASSWORD_PROPERTY in response.") - val zipContent: ByteArray = Base64.getDecoder().decode(content.toByteArray()) + val zipContent: ByteArray = Base64.getDecoder().decode(propContent.toByteArray()) val zipInputStream = ZipInputStream(ByteArrayInputStream(zipContent), zipPassword) val reader = BufferedReader(InputStreamReader(zipInputStream)) @@ -83,12 +83,12 @@ class PasswordManager( K_LOGGER.info { "Archive entry name: $entryName" } K_LOGGER.info { "Secret file name: $secretFileName" } if (entryName.contains(secretFileName)) { - val content = reader.readLine() - if (content.isNotBlank()) { - runCatching { OBJECT_MAPPER.readValue(content, Map::class.java) as Map } + val lineContent = reader.readLine() + if (lineContent.isNotBlank()) { + runCatching { OBJECT_MAPPER.readValue>(lineContent) } .onFailure { K_LOGGER.error(it) { "Error while getting secrets" } } .onSuccess { secrets -> - K_LOGGER.info { "Decoded secrets: ${secrets}" } + K_LOGGER.info { "Decoded secrets: $secrets" } secrets[newPasswordSecretName]?.let { newPassword = Base64.getDecoder().decode(it).decodeToString().ifBlank { null } } @@ -97,13 +97,13 @@ class PasswordManager( password = Base64.getDecoder().decode(it).decodeToString().ifBlank { null } } - secrets[previousPasswordSecretName]?.let { - val json = Base64.getDecoder().decode(it).decodeToString().ifBlank { null } + secrets[previousPasswordSecretName]?.let { secret -> + val json = Base64.getDecoder().decode(secret).decodeToString().ifBlank { null } if(json == null) { previouslyUsedPasswords.clear() } else { - runCatching { OBJECT_MAPPER.readValue(json, List::class.java) as List } + runCatching { OBJECT_MAPPER.readValue>(json) } .onFailure { K_LOGGER.error(it) { "Error while getting $previousPasswordSecretName." } } .onSuccess { previouslyUsedPasswords.clear() diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt index 32c54f0..1c32850 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt @@ -25,6 +25,7 @@ import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyState.Compani import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.CleanupHandler import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.OnCloseHandler import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.RecoveryHandler +import mu.KotlinLogging import java.time.Instant import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.read @@ -43,55 +44,43 @@ class StatefulStrategy( private val lock = ReentrantReadWriteLock() // configurations - var config: RuleConfiguration? = null - get() = state.config ?: error("Rule configuration isn't present.") - private set - var missIncomingMessagesConfig: MissMessageConfiguration? = null - get() = state.config?.missIncomingMessagesConfiguration ?: error("Miss incoming messages config isn't present.") - private set - var missOutgoingMessagesConfiguration: MissMessageConfiguration? = null - get() = state.config?.missOutgoingMessagesConfiguration ?: error("Miss outgoing messages config isn't present.") - private set - var disableForMessageTypes: Set = emptySet() - get() = state.config?.disableForMessageTypes ?: error("Disable for message types isn't present.") - private set - var transformMessageConfiguration: TransformMessageConfiguration? = null - get() = state.config?.transformMessageConfiguration ?: error("Transform message config isn't present.") - private set - var batchSendConfiguration: BatchSendConfiguration? = null - get() = state.config?.batchSendConfiguration ?: error("batch send config isn't present.") - private set - var splitSendConfiguration: SplitSendConfiguration? = null - get() = state.config?.splitSendConfiguration ?: error("split send configuration isn't present.") - private set - - var allowMessagesBeforeLogon: Boolean = false - get() = state.config?.allowMessagesBeforeLogonReply ?: false - private set - - var sendResendRequestOnLogonGap: Boolean = false - get() = state.config?.sendResendRequestOnLogonGap ?: false - private set - - var allowMessagesBeforeRetransmissionFinishes: Boolean = false - get() = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false - private set - - var sendResendRequestOnLogoutReply: Boolean = false - get() = state.config?.sendResendRequestOnLogoutReply ?: false - private set - - var increaseNextExpectedSequenceNumber: Boolean = false - get() = state.config?.increaseNextExpectedSequenceNumber ?: false - private set - - var decreaseNextExpectedSequenceNumber: Boolean = false - get() = state.config?.decreaseNextExpectedSequenceNumber ?: false - private set + val config: RuleConfiguration + get() = lock.read { state.config ?: error("Rule configuration isn't present.") } + val missIncomingMessagesConfig: MissMessageConfiguration + get() = lock.read { state.config?.missIncomingMessagesConfiguration ?: error("Miss incoming messages config isn't present.") } + val missOutgoingMessagesConfiguration: MissMessageConfiguration + get() = lock.read { state.config?.missOutgoingMessagesConfiguration ?: error("Miss outgoing messages config isn't present.") } + val disableForMessageTypes: Set + get() = lock.read { state.config?.disableForMessageTypes ?: emptySet() } + val transformMessageConfiguration: TransformMessageConfiguration + get() = lock.read { state.config?.transformMessageConfiguration ?: error("Transform message config isn't present.") } + val batchSendConfiguration: BatchSendConfiguration + get() = lock.read { state.config?.batchSendConfiguration ?: error("batch send config isn't present.") } + val splitSendConfiguration: SplitSendConfiguration + get() = lock.read { state.config?.splitSendConfiguration ?: error("split send configuration isn't present.") } + + val allowMessagesBeforeLogon: Boolean + get() = lock.read { state.config?.allowMessagesBeforeLogonReply ?: false } + + val sendResendRequestOnLogonGap: Boolean + get() = lock.read { state.config?.sendResendRequestOnLogonGap ?: false } + + private var _allowMessagesBeforeRetransmissionFinishes: Boolean = false + val allowMessagesBeforeRetransmissionFinishes: Boolean + get() = lock.read { _allowMessagesBeforeRetransmissionFinishes } + + val sendResendRequestOnLogoutReply: Boolean + get() = lock.read {state.config?.sendResendRequestOnLogoutReply ?: false } + + val increaseNextExpectedSequenceNumber: Boolean + get() = lock.read {state.config?.increaseNextExpectedSequenceNumber ?: false } + + val decreaseNextExpectedSequenceNumber: Boolean + get() = lock.read {state.config?.decreaseNextExpectedSequenceNumber ?: false } + + val recoveryConfig: RecoveryConfig + get() = lock.read { state.config?.recoveryConfig ?: RecoveryConfig() } - var recoveryConfig: RecoveryConfig = RecoveryConfig() - get() = state.config?.recoveryConfig ?: RecoveryConfig() - private set // strategies fun updateSendStrategy(func: SendStrategy.() -> Unit) = lock.write { sendStrategy.func() @@ -121,6 +110,11 @@ class StatefulStrategy( receiveStrategy.func() } + fun disableAllowMessagesBeforeRetransmissionFinishes(reason: String) = lock.write { + _allowMessagesBeforeRetransmissionFinishes = false + LOGGER.info("Disabled allow messages before retransmission finishes by the '$reason' reason") + } + fun getReceiveMessageStrategy(func: ReceiveStrategy.() -> T) = lock.read { receiveStrategy.func() } @@ -164,6 +158,7 @@ class StatefulStrategy( fun resetStrategyAndState(config: RuleConfiguration) { lock.write { state = state.resetAndCopyMissedMessages(config) + _allowMessagesBeforeRetransmissionFinishes = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false sendStrategy.sendHandler = defaultStrategy.sendStrategy.sendHandler sendStrategy.sendPreprocessor = defaultStrategy.sendStrategy.sendPreprocessor receiveStrategy.receivePreprocessor = defaultStrategy.receiveStrategy.receivePreprocessor @@ -180,6 +175,7 @@ class StatefulStrategy( fun cleanupStrategy() { lock.write { state = state.resetAndCopyMissedMessages() + _allowMessagesBeforeRetransmissionFinishes = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false sendStrategy.sendHandler = defaultStrategy.sendStrategy.sendHandler sendStrategy.sendPreprocessor = defaultStrategy.sendStrategy.sendPreprocessor receiveStrategy.receivePreprocessor = defaultStrategy.receiveStrategy.receivePreprocessor @@ -192,4 +188,8 @@ class StatefulStrategy( onCloseHandler = defaultStrategy.closeHandler } } + + companion object { + private val LOGGER = KotlinLogging.logger {} + } } \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/RecoveryTest.java b/src/test/java/com/exactpro/th2/RecoveryTest.java index ddfd567..1b12109 100644 --- a/src/test/java/com/exactpro/th2/RecoveryTest.java +++ b/src/test/java/com/exactpro/th2/RecoveryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ package com.exactpro.th2; import com.exactpro.th2.common.grpc.MessageID; -import com.exactpro.th2.conn.dirty.fix.MessageSearcher; +import com.exactpro.th2.conn.dirty.fix.TestMessageSearcher; import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse; import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest; @@ -54,7 +54,7 @@ void testSequenceResetInRange() { FixHandlerSettings settings = createHandlerSettings(); settings.setLoadMissedMessagesFromCradle(true); DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - MessageSearcher ms = new MessageSearcher( + TestMessageSearcher ms = new TestMessageSearcher( List.of( messageSearchResponse(2), messageSearchResponse(3), @@ -87,7 +87,7 @@ void testSequenceResetInsideRange() { FixHandlerSettings settings = createHandlerSettings(); settings.setLoadMissedMessagesFromCradle(true); DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - MessageSearcher ms = new MessageSearcher( + TestMessageSearcher ms = new TestMessageSearcher( List.of( messageSearchResponse(4), messageSearchResponse(5) @@ -143,7 +143,7 @@ void testSequenceResetOutOfRange() { FixHandlerSettings settings = createHandlerSettings(); settings.setLoadMissedMessagesFromCradle(true); DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - MessageSearcher ms = new MessageSearcher( + TestMessageSearcher ms = new TestMessageSearcher( List.of( messageSearchResponse(1), messageSearchResponse(2), @@ -177,7 +177,7 @@ void testSequenceResetAdminMessages() { FixHandlerSettings settings = createHandlerSettings(); settings.setLoadMissedMessagesFromCradle(true); DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - MessageSearcher ms = new MessageSearcher( + TestMessageSearcher ms = new TestMessageSearcher( List.of( messageSearchResponseAdmin(2), messageSearchResponse(4), diff --git a/src/test/java/com/exactpro/th2/TestUtils.java b/src/test/java/com/exactpro/th2/TestUtils.java index adcb4be..89c8adb 100644 --- a/src/test/java/com/exactpro/th2/TestUtils.java +++ b/src/test/java/com/exactpro/th2/TestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,18 +15,21 @@ */ package com.exactpro.th2; -import com.exactpro.th2.conn.dirty.fix.FixProtocolManglerSettings; import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BrokenConnConfiguration; -import java.nio.charset.StandardCharsets; -import java.util.Base64; import org.jetbrains.annotations.NotNull; +import static java.util.Objects.requireNonNullElse; + public class TestUtils { @NotNull public static FixHandlerSettings createHandlerSettings() { return createHandlerSettings(null, null, false); } + public static FixHandlerSettings createHandlerSettings(BrokenConnConfiguration brokenConfig) { + return createHandlerSettings(brokenConfig, 30, false); + } + public static FixHandlerSettings createHandlerSettings( BrokenConnConfiguration brokenConfig, Integer hbtInt, @@ -36,11 +39,8 @@ public static FixHandlerSettings createHandlerSettings( fixHandlerSettings.setHost("127.0.0.1"); fixHandlerSettings.setPort(8080); fixHandlerSettings.setBeginString("FIXT.1.1"); - if(hbtInt == null) { - fixHandlerSettings.setHeartBtInt(30); - } else { - fixHandlerSettings.setHeartBtInt(hbtInt); - } + fixHandlerSettings.setHeartBtInt(requireNonNullElse(hbtInt, 30)); + fixHandlerSettings.setDisconnectCleanUpTimeoutMs(100); fixHandlerSettings.setUseNextExpectedSeqNum(useNextExpectedSeqNumber); fixHandlerSettings.setSenderCompID("client"); fixHandlerSettings.setTargetCompID("server"); @@ -54,6 +54,7 @@ public static FixHandlerSettings createHandlerSettings( fixHandlerSettings.setResetOnLogon(false); fixHandlerSettings.setDefaultApplVerID("9"); fixHandlerSettings.setSenderSubID("trader"); + fixHandlerSettings.setCradleSaveTimeoutMs(100); fixHandlerSettings.setBrokenConnConfiguration(brokenConfig); return fixHandlerSettings; } diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageTransformer.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageTransformerTest.kt similarity index 98% rename from src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageTransformer.kt rename to src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageTransformerTest.kt index 8732e90..972532f 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageTransformer.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageTransformerTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Test import java.lang.System.lineSeparator import kotlin.text.Charsets.UTF_8 -class TestMessageTransformer { +class MessageTransformerTest { @Test fun `set field`() { val buffer = MESSAGE.toBuffer() val transform = set(49 to "abc") onlyIf (35 matches "A") diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/StrategiesTest.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/StrategiesTest.kt new file mode 100644 index 0000000..95aa95b --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/StrategiesTest.kt @@ -0,0 +1,1296 @@ +/* + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import com.exactpro.th2.FixHandler +import com.exactpro.th2.FixHandlerSettings +import com.exactpro.th2.TestUtils.createHandlerSettings +import com.exactpro.th2.common.event.Event +import com.exactpro.th2.common.grpc.ConnectionID +import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.toByteArray +import com.exactpro.th2.common.utils.message.toTimestamp +import com.exactpro.th2.common.utils.shutdownGracefully +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BatchSendConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BrokenConnConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ChangeSequenceConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.MissMessageConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RecoveryConfig +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ResendRequestConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SplitSendConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformMessageConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformationConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.RuleType +import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.RuleType.CREATE_OUTGOING_GAP +import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.RuleType.DEFAULT +import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.SchedulerType +import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext +import com.exactpro.th2.constants.Constants +import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse +import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation +import com.exactpro.th2.netty.bytebuf.util.asExpandable +import com.exactpro.th2.netty.bytebuf.util.contains +import com.exactpro.th2.netty.bytebuf.util.isEmpty +import com.google.protobuf.UnsafeByteOperations +import com.google.protobuf.util.Timestamps +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import mu.KotlinLogging +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.mockito.kotlin.any +import org.mockito.kotlin.anyOrNull +import org.mockito.kotlin.anyVararg +import org.mockito.kotlin.argThat +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.clearInvocations +import org.mockito.kotlin.description +import org.mockito.kotlin.doAnswer +import org.mockito.kotlin.eq +import org.mockito.kotlin.mock +import org.mockito.kotlin.timeout +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoMoreInteractions +import org.mockito.kotlin.whenever +import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit.MILLIS +import java.time.temporal.ChronoUnit.SECONDS +import java.util.Collections +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicInteger +import java.util.regex.Pattern +import kotlin.test.assertContains +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue +import kotlin.text.Charsets.US_ASCII +import com.exactpro.th2.common.grpc.Event as ProtoEvent + +private const val TEST_BOOK = "test-book" +private const val TEST_SCOPE = "test-scope" +private const val TEST_SESSION_GROUP = "test-session-group" +private const val TEST_SESSION_ALIAS = "test-session-alias" + +class StrategiesTest { + + private class TestContext( + val context: IHandlerContext, + val channel: IChannel, + val fixHandler: FixHandler, + val incomingSequence: AtomicInteger + ) + + @Test + @Disabled + fun testDisconnectStrategy() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(5, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val handlerSettings: FixHandlerSettings = createHandlerSettings(BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, MILLIS)), + RuleConfiguration( + RuleType.DISCONNECT_WITH_RECONNECT, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration + ), + ) + )) + val testContext = createTestContext(handlerSettings, enableAdditionalHandling = false) + val channel = testContext.channel + val handler = testContext.fixHandler + + verify(channel, timeout(defaultRuleDuration.millis() + 300)).close() + + handler.send(businessMessage(2), Collections.emptyMap(), null) + + verify(channel, timeout(businessRuleDuration.millis() + 300)).open() + + val captor = argumentCaptor { } + verify(channel, timeout(businessRuleCleanupDuration.millis() + 300).times(2)).send(captor.capture(), any(), anyOrNull(), any()) + + captor.firstValue.apply { + this.assertContains(mapOf(35 to "A")) + } + captor.secondValue.apply { + this.assertContains(mapOf(35 to "AE")) + } + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testIgnoreIncomingMessagesStrategyWithNextExpectedSequenceNumber() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = Duration.of(2, SECONDS), cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.IGNORE_INCOMING_MESSAGES, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration, + missIncomingMessagesConfiguration = MissMessageConfiguration(3) + ), + ), + ), 30, true + ) + val testContext = createTestContext(handlerSettings) { msg, mtd, mode -> + messages.add(Triple(msg, mtd, mode)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + val incomingSequence = testContext.incomingSequence + + val captor = argumentCaptor { } + + verify(channel, timeout(defaultRuleDuration.millis() + 1300)).open() + verify(channel, timeout(300)).send(any(), any(), anyOrNull(), any()) // Logon + clearInvocations(channel) + + Thread.sleep(200) // wait for strategies to apply + + handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) + handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) + handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) + + verify(channel, timeout(businessRuleDuration.millis() + businessRuleCleanupDuration.millis() + 1300)).open() + verify(channel, timeout(300)).send(captor.capture(), any(), anyOrNull(), any()) // Logon + clearInvocations(channel) + + captor.firstValue.apply { + this.assertContains(mapOf(35 to "A", 789 to "3")) + } + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testIgnoreIncomingMessagesStrategyResendRequest() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings(BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = Duration.of(2, SECONDS), cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.IGNORE_INCOMING_MESSAGES, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration, + missIncomingMessagesConfiguration = MissMessageConfiguration(3) + ), + ), + )) + val testContext = createTestContext(handlerSettings) { msg, mtd, mode -> + messages.add(Triple(msg, mtd, mode)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + val incomingSequence = testContext.incomingSequence + + val captor = argumentCaptor { } + + clearInvocations(channel) + verify(channel, timeout(defaultRuleDuration.millis() + 300)).open() + verify(channel, timeout(300)).send(any(), any(), anyOrNull(), any()) // Logon // 2 + clearInvocations(channel) + + Thread.sleep(200) // wait for strategies to apply + + handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) // 3 + handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) // 4 + handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) // 5 + handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) // 6 + + verify(channel, timeout(businessRuleDuration.millis() + businessRuleCleanupDuration.millis() + 300)).open() + verify(channel, timeout(600).times(2)).send(captor.capture(), any(), anyOrNull(), any()) // Logon + clearInvocations(channel) + + captor.firstValue.apply { + this.assertContains(mapOf(35 to "2", 7 to "3", 16 to "5")) + } + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testTransformLogonMessagesStrategy() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = Duration.of(2, SECONDS), cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.TRANSFORM_LOGON, + duration = businessRuleDuration, + cleanUpDuration = Duration.of(2, SECONDS), + transformMessageConfiguration = TransformMessageConfiguration( + listOf( + TransformationConfiguration( + listOf( + Action( + replace = FieldSelector( + tag = Constants.PASSWORD_TAG, + matches = Pattern.compile("pass"), + tagOneOf = null + ), + with = FieldDefinition( + tag = Constants.PASSWORD_TAG, + value = "mangledPassword", + tagOneOf = null, + valueOneOf = null + ) + ) + ), false, "A"), + TransformationConfiguration( + listOf( + Action( + replace = FieldSelector( + tag = Constants.PASSWORD_TAG, + matches = Pattern.compile("pass"), + tagOneOf = null + ), + with = FieldDefinition( + tag = Constants.PASSWORD_TAG, + value = "mangledPassword", + tagOneOf = null, + valueOneOf = null + ) + ) + ), false, "A") + ) + ), + ), + ) + ) + ) + val testContext = createTestContext(handlerSettings) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + + messages.clear() + verify(channel, timeout(defaultRuleDuration.millis() + businessRuleDuration.millis() + 300).times(3)).open() + + // start + verify(channel, timeout(800).times(3)).send(any(), any(), anyOrNull(), any()) + + messages[0].apply { + this.first.assertContains(mapOf(35 to "A", Constants.PASSWORD_TAG to "mangledPassword")) + } + messages[1].apply { + this.first.assertContains(mapOf(35 to "A", Constants.PASSWORD_TAG to "mangledPassword")) + } + messages[2].apply { + this.first.assertContains(mapOf(35 to "A", Constants.PASSWORD_TAG to "pass")) + } + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testBidirectionalResendRequest() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings(BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = Duration.of(2, SECONDS), cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.BI_DIRECTIONAL_RESEND_REQUEST, + duration = Duration.of(6, SECONDS), + cleanUpDuration = Duration.of(2, SECONDS), + missIncomingMessagesConfiguration = MissMessageConfiguration(2), + missOutgoingMessagesConfiguration = MissMessageConfiguration(2) + ), + ) + )) + val testContext = createTestContext(handlerSettings) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + clearInvocations(channel) + verify(channel, timeout(defaultRuleDuration.millis() + 300)).open() + verify(channel, timeout(600).times(1)).send(any(), any(), anyOrNull(), any()) + + Thread.sleep(200) // wait for strategies to apply + messages.clear() + + handler.onIncoming(channel, businessMessage(3), getMessageId()) + handler.onIncoming(channel, businessMessage(4), getMessageId()) + + handler.onOutgoing(channel, businessMessage(3).asExpandable(), Collections.emptyMap()) + handler.onOutgoing(channel, businessMessage(4).asExpandable(), Collections.emptyMap()) + + // Trigger resend request + handler.onIncoming(channel, businessMessage(5), getMessageId()) + + handler.onIncoming(channel, businessMessage(3, true), getMessageId()) + handler.onIncoming(channel, businessMessage(4, true), getMessageId()) + // end + + // Trigger recovery + handler.onIncoming(channel, resendRequest(6, 3, 4), getMessageId()) + // end + + messages[0].apply { + val buff = first + buff.assertContains(mapOf(35 to "2", 7 to "3", 16 to "4")) + } + + messages[1].apply { + val buff = first + buff.assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "3")) + } + + messages[2].apply { + val buff = first + buff.assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "4")) + } + + handler.close() + channel.close() + } + + @Test + fun `outgoing gap strategy test`() { + val ruleDuration = Duration.of(500, MILLIS) + val correction = ruleDuration.millis() / 2 + val handlerSettings: FixHandlerSettings = createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = ruleDuration, cleanUpDuration = Duration.ZERO), + RuleConfiguration( + CREATE_OUTGOING_GAP, + duration = ruleDuration, + cleanUpDuration = Duration.ZERO, + missOutgoingMessagesConfiguration = MissMessageConfiguration(3) + ), + ) + ) + ) + val testContext = createTestContext(handlerSettings) + + val context = testContext.context + val channel = testContext.channel + + testContext.fixHandler.use { handler -> + handler.onStart() + context.verifyInvocationsAndClean { + verify(this).createChannel(any(), any(), any(), any(), any(), any(), anyVararg()) + channel.verifyInvocationsAndClean { + verify( + this, + times(2).description("Check channel state in the onStart and sendLogon methods"), + ).isOpen + verify(this, description("open channel with default strategy first")).open() + verifySend(mapOf(35 to "A", 34 to "1"), correction) + } + verifySendEvent("successful login", "Info", correction) + } + + verifyChangeStrategy(channel, ruleDuration, DEFAULT, CREATE_OUTGOING_GAP, 2) + context.verifyInvocationsAndClean { + val events = captureSendEvents(2, correction) + assertAll( + { assertEquals("successful login", events[0].name) }, + { assertContains(events[1].name, "$CREATE_OUTGOING_GAP strategy started") }, + ) + } + + handler.onOutgoing(channel, businessMessage(3).asExpandable(), mutableMapOf()) + handler.onOutgoing(channel, businessMessage(4).asExpandable(), mutableMapOf()) + handler.onOutgoing(channel, businessMessage(5).asExpandable(), mutableMapOf()) + + handler.onIncoming(channel, resendRequest(3, 3, 5), getMessageId()) + channel.verifyInvocationsAndClean { + verify(this, timeout(correction).description("check status during handleResendRequest method")).isOpen + val byteBufs = captureSend(3, correction) + assertAll( + { byteBufs[0].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "3")) }, + { byteBufs[1].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "4")) }, + { byteBufs[2].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "5")) }, + ) + } + + verifyChangeStrategy(channel, ruleDuration, CREATE_OUTGOING_GAP, DEFAULT, 6) + context.verifyInvocationsAndClean { + val events = captureSendEvents(2, correction) + assertAll( + { assertEquals("successful login", events[0].name) }, + { assertContains(events[1].name, "$CREATE_OUTGOING_GAP strategy finished") }, + ) + } + } + } + + @Test + fun `outgoing gap strategy - long recovery in case of mixing recovery message with non-recovery messages test`() { + val ruleDuration = Duration.of(500, MILLIS) + val correction = ruleDuration.millis() / 2 + val messages = CopyOnWriteArrayList() + val handlerSettings: FixHandlerSettings = + createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = ruleDuration, cleanUpDuration = Duration.ZERO), + RuleConfiguration( + CREATE_OUTGOING_GAP, + duration = ruleDuration, + cleanUpDuration = Duration.ZERO, + allowMessagesBeforeRetransmissionFinishes = true, + missOutgoingMessagesConfiguration = MissMessageConfiguration(3), + ), + ), + ), + ).apply { + isLoadMissedMessagesFromCradle = true + cradleSaveTimeoutMs = 500 + } + val testContext = + createTestContext( + handlerSettings, + searchMessageGroups = { request -> searchMessageGroups(messages, request) }, + ) + + val context = testContext.context + val channel = testContext.channel + val handler = testContext.fixHandler + val executor = Executors.newCachedThreadPool() + + var msgSender: Future<*>? = null + try { + TestServerEmulator( + executor, + handler, + channel, + ::getMessageId, + ::logonResponse, + ::resendRequest, + ).use { server -> + whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { + val sendMode = it.arguments[3] as IChannel.SendMode + val byteBuf = it.arguments[0] as ByteBuf + handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf) + } + + handler.use { + handler.onStart() + verify(context, timeout(ruleDuration.toMillis() + correction)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started") + }, + anyOrNull(), + ) + + msgSender = executor.submit { sendBusinessMessages(handler) } + + verify(context, timeout(ruleDuration.toMillis() * 10)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished") + }, + anyOrNull()) + + } + } + } finally { + msgSender?.cancel(true) + executor.shutdownGracefully() + } + } + + private fun handleOutgoingMessages( + handler: FixHandler, + channel: IChannel, + messages: MutableList, + server: TestServerEmulator, + sendMode: IChannel.SendMode, + byteBuf: ByteBuf, + ): CompletableFuture = try { + + LOGGER.info { "put $sendMode ${byteBuf.toString(US_ASCII)}" } + + if (sendMode.handle) { + handler.onOutgoing(channel, byteBuf, mutableMapOf()) + } + + if (!byteBuf.isEmpty()) { + if (sendMode.mqPublish) { + messages.add( + MessageSearchResponse + .newBuilder() + .apply { + messageBuilder.apply { + messageIdBuilder.apply { + timestamp = Timestamps.now() + sequence = byteBuf.findField(MSG_SEQ_NUM_TAG)?.value?.toLong() ?: 0L + } + bodyRaw = UnsafeByteOperations.unsafeWrap(byteBuf.toByteArray()) + } + }.build(), + ) + LOGGER.info { "publish to mq ${byteBuf.findField(MSG_SEQ_NUM_TAG)?.value}" } + } + + if (sendMode.socketSend) { + server.consume(byteBuf) + } + } + + CompletableFuture.completedFuture(getMessageId()) + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + throw e + } + + @Test + fun `outgoing gap strategy - long recovery in case of sequence reset for admin is false test`() { + val ruleDuration = Duration.of(500, MILLIS) + val correction = ruleDuration.millis() / 2 + val messages = CopyOnWriteArrayList() + val handlerSettings: FixHandlerSettings = + createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = ruleDuration, cleanUpDuration = Duration.ZERO), + RuleConfiguration( + CREATE_OUTGOING_GAP, + duration = ruleDuration, + cleanUpDuration = Duration.ZERO, + missOutgoingMessagesConfiguration = MissMessageConfiguration(3), + recoveryConfig = RecoveryConfig( + sequenceResetForAdmin = false + ) + ), + ), + ), + ).apply { + isLoadMissedMessagesFromCradle = true + cradleSaveTimeoutMs = 500 + } + val testContext = + createTestContext( + handlerSettings, + searchMessageGroups = { request -> searchMessageGroups(messages, request) }, + ) + + val context = testContext.context + val channel = testContext.channel + val handler = testContext.fixHandler + val executor = Executors.newCachedThreadPool() + + var msgSender: Future<*>? = null + try { + TestServerEmulator( + executor, + handler, + channel, + ::getMessageId, + ::logonResponse, + ::resendRequest, + ).use { server -> + whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { + val sendMode = it.arguments[3] as IChannel.SendMode + val byteBuf = it.arguments[0] as ByteBuf + handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf) + } + + handler.use { + handler.onStart() + verify(context, timeout(ruleDuration.toMillis() + correction)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started") + }, + anyOrNull(), + ) + + msgSender = executor.submit { sendBusinessMessages(handler) } + + verify(context, timeout(ruleDuration.toMillis() * 10)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished") + }, + anyOrNull()) + + } + } + } finally { + msgSender?.cancel(true) + executor.shutdownGracefully() + } + } + + @Test + fun `outgoing gap strategy - long recovery in case of out of order is true test`() { + val ruleDuration = Duration.of(500, MILLIS) + val correction = ruleDuration.millis() / 2 + val messages = CopyOnWriteArrayList() + val handlerSettings: FixHandlerSettings = + createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = ruleDuration, cleanUpDuration = Duration.ZERO), + RuleConfiguration( + CREATE_OUTGOING_GAP, + duration = ruleDuration, + cleanUpDuration = Duration.ZERO, + missOutgoingMessagesConfiguration = MissMessageConfiguration(3), + recoveryConfig = RecoveryConfig( + outOfOrder = true + ) + ), + ), + ), + ).apply { + isLoadMissedMessagesFromCradle = true + cradleSaveTimeoutMs = 500 + } + val testContext = + createTestContext( + handlerSettings, + searchMessageGroups = { request -> searchMessageGroups(messages, request) }, + ) + + val context = testContext.context + val channel = testContext.channel + val handler = testContext.fixHandler + val executor = Executors.newCachedThreadPool() + + var msgSender: Future<*>? = null + try { + TestServerEmulator( + executor, + handler, + channel, + ::getMessageId, + ::logonResponse, + ::resendRequest, + ).use { server -> + whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { + val sendMode = it.arguments[3] as IChannel.SendMode + val byteBuf = it.arguments[0] as ByteBuf + handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf) + } + + handler.use { + handler.onStart() + verify(context, timeout(ruleDuration.toMillis() + correction)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started") + }, + anyOrNull(), + ) + + msgSender = executor.submit { sendBusinessMessages(handler) } + + verify(context, timeout(ruleDuration.toMillis() * 10)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished") + }, + anyOrNull()) + + } + } + } finally { + msgSender?.cancel(true) + executor.shutdownGracefully() + } + } + + private fun searchMessageGroups( + messages: List, + request: MessageGroupsSearchRequest + ): Iterator { + val from = if (request.hasStartTimestamp()) request.startTimestamp else null + val to = if (request.hasEndTimestamp()) request.endTimestamp else null + return when (request.searchDirection) { + TimeRelation.NEXT -> + messages + .toMutableList() + .asSequence() + .filter { response -> + (from == null || Timestamps.compare(from, response.message.messageId.timestamp) <= 0) && + (to == null || Timestamps.compare(to, response.message.messageId.timestamp) >= 0) + } + + TimeRelation.PREVIOUS -> + messages + .reversed() + .asSequence() + .filter { response -> + (from == null || Timestamps.compare(from, response.message.messageId.timestamp) >= 0) && + (to == null || Timestamps.compare(to, response.message.messageId.timestamp) <= 0) + } + + else -> error("Unsupported search direction") + }.iterator() + } + + private fun sendBusinessMessages(handler: FixHandler) { + try { + var seq = 2 + while (!Thread.currentThread().isInterrupted) { + seq += 1 + handler.send(businessMessage(seq).asExpandable(), mutableMapOf(), null) + Thread.sleep(1) + } + } catch (e: Exception) { + LOGGER.error(e) { "Send message problem" } + } finally { + LOGGER.info { "Stopped sending" } + } + } + + private fun verifyChangeStrategy( + channel: IChannel, + ruleDuration: Duration, + oldRule: RuleType, + newRule: RuleType, + logonSeq: Int, + ) { + val correction = ruleDuration.millis() / 2 + channel.verifyInvocationsAndClean { + verify( + this, + timeout(ruleDuration.millis() + correction) + .description("close in disconnect method at the end of $oldRule strategy"), + ).close() + verify( + this, + timeout(correction).description("open channel with $newRule strategy"), + ).open() + verify( + this, + times(2).description("check status in sendLogon and openChannelAndWaitForLogon"), + ).isOpen + verifySend(mapOf(35 to "A", 34 to logonSeq.toString()), correction) + } + } + + @Test + @Disabled + fun testClientOutage() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings(BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.CLIENT_OUTAGE, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration, + ), + ) + ), 1, false) + val testContext = createTestContext(handlerSettings) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + val seq = testContext.incomingSequence + + Thread.sleep(defaultRuleDuration.millis() + 100) // Waiting for strategy to apply + messages.clear() + + handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) + handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) + handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) + + handler.onClose(channel) + handler.onOpen(channel) + + clearInvocations(channel) + + for (message in messages) { + val buff = message.first + assertTrue("Message shouldn't be heartbeat or AE: ${buff.toString(US_ASCII)}") { !(buff.contains("35=AE") || buff.contains("35=0")) } + } + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testPartialClientOutage() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.PARTIAL_CLIENT_OUTAGE, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration, + ), + ) + ), 1, false + ) + val testContext = createTestContext(handlerSettings) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + val seq = testContext.incomingSequence + + Thread.sleep(defaultRuleDuration.millis() + 100) // Waiting for strategy to apply + messages.clear() + + handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) + handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) + handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) + + handler.onClose(channel) + handler.onOpen(channel) + + clearInvocations(channel) + + for (message in messages) { + val buff = message.first + if(buff.isEmpty()) continue + if(!buff.contains("35=0")) continue + buff.assertContains(mapOf(35 to "0", 112 to "test")) + } + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testSequenceResetStrategyOutgoing() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.SEQUENCE_RESET, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration, + changeSequenceConfiguration = ChangeSequenceConfiguration(5, false) + ), + ) + ) + ) + val testContext = createTestContext(handlerSettings) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + + verify(channel, timeout(defaultRuleDuration.millis() + businessRuleCleanupDuration.millis() + 300)).open() + messages.clear() + verify(channel, timeout(500)).send(any(), any(), anyOrNull(), any()) // Logon + + handler.onIncoming(channel, resendRequest(3, 3, 8), getMessageId()) + + clearInvocations(channel) + + for(message in messages) { + if(!message.first.contains("35=4")) continue + message.first.assertContains(mapOf(35 to "4", 34 to "3", 36 to "8")) + } + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testResendRequestStrategy() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.RESEND_REQUEST, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration, + resendRequestConfiguration = ResendRequestConfiguration(5) + ), + ) + ) + ) + val testContext = createTestContext(handlerSettings) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + + val captor = argumentCaptor {} + + handler.onIncoming(channel, businessMessage(2).asExpandable(), getMessageId()) + handler.onIncoming(channel, businessMessage(3).asExpandable(), getMessageId()) + handler.onIncoming(channel, businessMessage(4).asExpandable(), getMessageId()) + handler.onIncoming(channel, businessMessage(5).asExpandable(), getMessageId()) + handler.onIncoming(channel, businessMessage(6).asExpandable(), getMessageId()) + verify(channel, timeout(defaultRuleDuration.millis() + 300).times(1)).send(captor.capture(), any(), anyOrNull(), any()) + + captor.firstValue.apply { + this.assertContains(mapOf(35 to "2", 7 to "1", 16 to "6")) + } + + clearInvocations(channel) + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testBatchSend() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.BATCH_SEND, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration, + batchSendConfiguration = BatchSendConfiguration(3) + ), + ) + ) + ) + val testContext = createTestContext(handlerSettings) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + Thread.sleep(defaultRuleDuration.millis() + 300) + clearInvocations(channel) + + handler.send(businessMessage(2).asExpandable(), mutableMapOf(), null) + handler.send(businessMessage(3).asExpandable(), mutableMapOf(), null) + handler.send(businessMessage(4).asExpandable(), mutableMapOf(), null) + handler.send(businessMessage(5).asExpandable(), mutableMapOf(), null) + + val captor = argumentCaptor {} + verify(channel, timeout(businessRuleDuration.millis() + 600).times(2)).send(captor.capture(), any(), anyOrNull(), any()) + + val sizeOfOneMessage = businessMessage(2).asExpandable().also { + handler.onOutgoingUpdateTag(it, mutableMapOf()) + }.readableBytes() + + captor.firstValue.apply { + println(readableBytes()) + println(sizeOfOneMessage * 3) + assertTrue { this.readableBytes() >= sizeOfOneMessage * 3 } + } + + captor.secondValue.apply { + assertEquals(this.readableBytes(), sizeOfOneMessage) + } + + handler.close() + channel.close() + } + + @Test + @Disabled + fun testSplitSend() { + val defaultRuleDuration = Duration.of(2, SECONDS) + val businessRuleDuration = Duration.of(6, SECONDS) + val businessRuleCleanupDuration = Duration.of(2, SECONDS) + val messages = mutableListOf, IChannel.SendMode>>() + val handlerSettings: FixHandlerSettings = createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, SECONDS)), + RuleConfiguration( + RuleType.SPLIT_SEND, + duration = businessRuleDuration, + cleanUpDuration = businessRuleCleanupDuration, + splitSendConfiguration = SplitSendConfiguration(3, 100) + ), + ) + ) + ) + val testContext = createTestContext(handlerSettings) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val channel = testContext.channel + val handler = testContext.fixHandler + Thread.sleep(defaultRuleDuration.millis() + 300) + clearInvocations(channel) + val businessMessage = businessMessage(2).asExpandable() + + handler.send(businessMessage, mutableMapOf(), null) + + val captor = argumentCaptor {} + verify(channel, timeout(businessRuleDuration.millis() + 300).times(4)).send(captor.capture(), any(), anyOrNull(), any()) + + val partSize = businessMessage.readableBytes() / 3 + + captor.firstValue.apply { + assertEquals(partSize, this.readableBytes()) + } + + captor.secondValue.apply { + assertEquals(partSize, this.readableBytes()) + } + + captor.thirdValue.apply { + assertTrue { this.readableBytes() >= partSize } + } + + captor.allValues[3].apply { + assertEquals(businessMessage.readableBytes(), this.readableBytes()) + } + + handler.close() + channel.close() + } + + private fun createTestContext( + handlerSettings: FixHandlerSettings, + enableAdditionalHandling: Boolean = true, + searchMessageGroups: ( + MessageGroupsSearchRequest, + ) -> Iterator = { emptyList().iterator() }, + sendHandlerExtension: (ByteBuf, Map, IChannel.SendMode) -> Unit = {_, _, _ ->} + ): TestContext { + val channel: IChannel = mock { + on { sessionGroup }.thenReturn(TEST_SESSION_GROUP) + on { sessionAlias }.thenReturn(TEST_SESSION_ALIAS) + } + val dataProviderService: DataProviderService = mock { + on { this.searchMessageGroups(any()) }.doAnswer { + searchMessageGroups(it.arguments.first() as MessageGroupsSearchRequest) + } + } + val context: IHandlerContext = mock { + on { bookName }.thenReturn(TEST_BOOK) + on { settings }.thenReturn(handlerSettings) + on { createChannel(any(), any(), any(), any(), any(), any(), anyVararg()) }.thenReturn(channel) + on { getGrpcService(eq(DataProviderService::class.java)) }.thenReturn(dataProviderService) + } + + val incomingSequence = AtomicInteger(0) + var outgoingSequence = 0 + + val handler = FixHandler(context) + context.verifyInvocationsAndClean { + verifySendEvent("Strategy root event", "Info", 100) + verify(this).settings + if (handlerSettings.isLoadMissedMessagesFromCradle) { + verify(this).bookName + verify(this).getGrpcService(eq(DataProviderService::class.java)) + + } + } + + val onSendHandler: (ByteBuf, IChannel.SendMode) -> CompletableFuture = { msg, mode -> + LOGGER.info { "OnSendHandler ${msg.toString(US_ASCII)}" } + outgoingSequence += 1 + if(enableAdditionalHandling) { + val expanded = msg.copy().asExpandable() + val metadata = mutableMapOf() + if(mode == IChannel.SendMode.HANDLE || mode == IChannel.SendMode.HANDLE_AND_MANGLE) { + handler.onOutgoing(channel, expanded, metadata) + } + sendHandlerExtension(expanded, metadata, mode) + } + if(msg.contains("35=A\u0001")) { + if(handlerSettings.useNextExpectedSeqNum()) { + handler.onIncoming(channel, logonResponseWithNextExpectedSeq(incomingSequence.incrementAndGet(), outgoingSequence + 1), getMessageId()) + } else { + handler.onIncoming(channel, logonResponse(incomingSequence.incrementAndGet()), getMessageId()) + } + } + CompletableFuture.completedFuture(getMessageId()) + } + var isOpen = false + whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { + onSendHandler(it.arguments[0] as ByteBuf, it.arguments[3] as IChannel.SendMode) + } + whenever(channel.isOpen).doAnswer { + isOpen + } + whenever(channel.open()).doAnswer { + isOpen = true + handler.onOpen(channel) + mock { } + } + whenever(channel.close()).doAnswer { + isOpen = false + handler.onClose(channel) + mock { } + } + + return TestContext(context, channel, handler, incomingSequence) + } + + private inline fun T.verifyInvocationsAndClean(func: T.() -> Unit) { + this.func() + verifyNoMoreInteractions(this) + clearInvocations(this) + } + + private fun IChannel.verifySend( + valueByTag: Map, + timeout: Long = 0, + ): IChannel = + this.also { + val captor = argumentCaptor { } + verify(this, timeout(timeout)).send(captor.capture(), any(), anyOrNull(), any()) + captor.allValues.single().assertContains(valueByTag) + } + + private fun IChannel.captureSend( + times: Int, + timeout: Long, + ): List { + val captor = argumentCaptor { } + verify(this, timeout(timeout).times(times)).send(captor.capture(), any(), anyOrNull(), any()) + return captor.allValues + } + + private fun IHandlerContext.verifySendEvent( + name: String? = null, + type: String? = null, + timeout: Long = 0, + ): IHandlerContext = + this.also { + val captor = argumentCaptor { } + verify(this, timeout(timeout)).send(captor.capture(), anyOrNull()) + val event = captor.allValues.single().toProto(TEST_BOOK, TEST_SCOPE) + assertAll( + { name?.let { assertContains(event.name, name) } }, + { type?.let { assertContains(event.type, type) } }, + ) + } + + private fun IHandlerContext.captureSendEvents( + times: Int, + timeout: Long = 0, + ): List { + val captor = argumentCaptor { } + verify(this, timeout(timeout).times(times)).send(captor.capture(), anyOrNull()) + return captor.allValues.map { it.toProto(TEST_BOOK, TEST_SCOPE) } + } + + private fun ByteBuf.assertContains(values: Map) { + assertAll( + values.map { (tag, value) -> + { + val field = assertNotNull(findField(tag), "filed for tag: $tag") + assertEquals(value, field.value, "field value for tag: $tag") + } + } + ) + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + + private fun logonResponse(seq: Int): ByteBuf = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=105\u000135=A\u000134=${seq}\u000149=server\u000156=client\u000150=system\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u00011409=0\u000110=203\u0001".toByteArray( + US_ASCII + )) + private fun businessMessage(seq: Int?, possDup: Boolean = false): ByteBuf = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=13\u000135=AE${if(seq != null) "\u000134=${seq}" else ""}${if(possDup) "\u000143=Y" else ""}\u0001552=1\u000110=169\u0001".toByteArray( + US_ASCII + )) + private fun resendRequest(seq: Int, begin: Int, end: Int): ByteBuf = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=13\u000135=2\u000134=${seq}\u00017=${begin}\u000116=${end}\u0001552=1\u000110=169".toByteArray( + US_ASCII + )) + private fun testRequest(seq: Int) = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=13\u000135=1\u000134=${seq}\u0001112=test\u00011552=1\u000110=169".toByteArray( + US_ASCII + )) + private fun logonResponseWithNextExpectedSeq(seq: Int, nextExpected: Int) = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=105\u000135=A\u000134=${seq}\u0001789=${nextExpected}\u000149=server\u000156=client\u000150=system\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u00011409=0\u000110=203\u0001".toByteArray( + US_ASCII + )) + private fun Duration.millis() = toMillis() + private fun getMessageId() = MessageID.newBuilder().apply { + sequence = System.nanoTime() + bookName = "Test" + direction = Direction.FIRST + timestamp = Instant.now().toTimestamp() + connectionId = ConnectionID.newBuilder().apply { + sessionAlias = "SA" + sessionGroup = "SG" + }.build() + }.build() + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageSearcher.kt similarity index 91% rename from src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt rename to src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageSearcher.kt index 44210da..6a71743 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageSearcher.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation import com.google.protobuf.util.Timestamps -class MessageSearcher(private val messages: List) { +class TestMessageSearcher(private val messages: List) { fun searchMessages(request: MessageGroupsSearchRequest): Iterator { val startTimestamp = request.startTimestamp diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestServerEmulator.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestServerEmulator.kt new file mode 100644 index 0000000..a209016 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestServerEmulator.kt @@ -0,0 +1,176 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import com.exactpro.th2.FixHandler +import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel +import com.exactpro.th2.constants.Constants +import io.netty.buffer.ByteBuf +import mu.KotlinLogging +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit + +class TestServerEmulator( + executor: ExecutorService, + private val handler: FixHandler, + private val channel: IChannel, + private val generateMessageId: () -> MessageID, + private val generateLogon: (seq: Int) -> ByteBuf, + private val generateResendRequest: (seq: Int, begin: Int, end: Int) -> ByteBuf, +): AutoCloseable { + + private val consumerQueue: BlockingQueue = ArrayBlockingQueue(100) + private val senderQueue: BlockingQueue = ArrayBlockingQueue(100) + private val senderFuture: Future<*> + private val consumerFuture: Future<*> + + @Volatile + private var active = true + + init { + senderFuture = executor.submit(::runSender) + consumerFuture = executor.submit(::runConsumer) + } + + fun consume(byteBuf: ByteBuf) { + consumerQueue.place(byteBuf, "consumer queue overflow") + } + + private fun runSender() { + LOGGER.info { "Server sender emulator started" } + try { + while (active) { + val message = senderQueue.take() + handler.onIncoming(channel, message, generateMessageId()) + } + } catch (e: Exception) { + LOGGER.error(e) { "Server sender failure" } + } finally { + LOGGER.info { "Server sender emulator stopped" } + } + } + + private fun runConsumer() { + LOGGER.info { "Server consumer emulator started" } + try { + var clientSeq = 0 + var serverSeq = 0 + + var state = State.GENERAL + + while (active) { + val message = consumerQueue.take() + val msgType = requireNotNull(message.findField(35)?.value) + val seq = requireNotNull(message.findField(Constants.MSG_SEQ_NUM_TAG)?.value?.toInt()) + val posDup = message.findField(Constants.POSS_DUP_TAG)?.value == "Y" + LOGGER.info { "received msg seq: $seq, msg posDup: $posDup, clientSeq: $clientSeq, state: $state" } + + when (state) { + State.GENERAL -> { + if (posDup) { + // ignore + } else { + if (msgType == "A") { + serverSeq += 1 + senderQueue.place(generateLogon(serverSeq), "sender queue overflow") + LOGGER.info { "sent logon" } + } + + if (clientSeq + 1 == seq) { + clientSeq = seq + LOGGER.info { "incremented client seq to $clientSeq" } + } else { + val begin = clientSeq + 1 + senderQueue.place(generateResendRequest(serverSeq, begin, 0), "sender queue overflow") + state = State.WAIT_RESEND + LOGGER.info { "sent resend request from $begin" } + } + } + } + + State.WAIT_RESEND -> { + if (posDup) { + if (clientSeq + 1 == seq) { + clientSeq = seq + state = State.PROCESS_RESEND + LOGGER.info { "incremented client seq to $clientSeq after resend" } + } else { + // ignore it + } + } else { + // ignore it + } + } + + State.PROCESS_RESEND -> { + if (posDup) { + if (clientSeq + 1 == seq) { + clientSeq = seq + LOGGER.info { "incremented client seq to $clientSeq after resend" } + } else { + val begin = clientSeq + 1 + senderQueue.place(generateResendRequest(serverSeq, begin, 0), "sender queue overflow") + state = State.WAIT_RESEND + LOGGER.info { "sent resend request from $begin" } + } + } else { + if (clientSeq + 1 == seq) { + clientSeq = seq + state = State.GENERAL + LOGGER.info { "incremented client seq to $clientSeq" } + } else { + val begin = clientSeq + 1 + senderQueue.place(generateResendRequest(serverSeq, begin, 0), "sender queue overflow") + state = State.WAIT_RESEND + LOGGER.info { "sent resend request from $begin" } + } + } + } + } + } + } catch (e: Exception) { + LOGGER.error(e) { "Server consumer failure" } + } finally { + LOGGER.info { "Server consumer emulator stopped" } + } + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + + private fun BlockingQueue.place(item: T, msg: String) { + check(offer(item, 100, TimeUnit.MILLISECONDS)) { + "$msg, size: $size" + } + } + + private enum class State { + GENERAL, + WAIT_RESEND, + PROCESS_RESEND, + } + } + + override fun close() { + active = false + consumerFuture.cancel(true) + senderFuture.cancel(true) + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt deleted file mode 100644 index 1387d10..0000000 --- a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt +++ /dev/null @@ -1,801 +0,0 @@ -/* - * Copyright 2023 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.conn.dirty.fix - -import com.exactpro.th2.FixHandler -import com.exactpro.th2.TestUtils.createHandlerSettings -import com.exactpro.th2.common.grpc.ConnectionID -import com.exactpro.th2.common.grpc.Direction -import com.exactpro.th2.common.grpc.MessageID -import com.exactpro.th2.common.utils.message.toTimestamp -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BatchSendConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BlockMessageConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BrokenConnConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ChangeSequenceConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.MissMessageConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ResendRequestConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SplitSendConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformMessageConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformationConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.RuleType -import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.SchedulerType -import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel -import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext -import com.exactpro.th2.constants.Constants -import com.exactpro.th2.netty.bytebuf.util.asExpandable -import com.exactpro.th2.netty.bytebuf.util.contains -import com.exactpro.th2.netty.bytebuf.util.isEmpty -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled -import java.time.Duration -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Collections -import java.util.concurrent.CompletableFuture -import java.util.concurrent.atomic.AtomicInteger -import java.util.regex.Pattern -import kotlin.test.assertEquals -import kotlin.test.assertTrue -import mu.KotlinLogging -import org.junit.jupiter.api.Disabled -import org.junit.jupiter.api.Test -import org.mockito.kotlin.any -import org.mockito.kotlin.anyOrNull -import org.mockito.kotlin.argumentCaptor -import org.mockito.kotlin.clearInvocations -import org.mockito.kotlin.doAnswer -import org.mockito.kotlin.mock -import org.mockito.kotlin.timeout -import org.mockito.kotlin.times -import org.mockito.kotlin.verify -import org.mockito.kotlin.whenever - -@Disabled -class TestStrategies { - - private class TestContext( - val channel: IChannel, - val fixHandler: FixHandler, - val incomingSequence: AtomicInteger - ) - - @Test - fun testDisconnectStrategy() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(5, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val testContext = createTestContext(BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.MILLIS)), - RuleConfiguration( - RuleType.DISCONNECT_WITH_RECONNECT, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration - ), - ) - ), enableAdditionalHandling = false) - val channel = testContext.channel - val handler = testContext.fixHandler - - verify(channel, timeout(defaultRuleDuration.millis() + 300)).close() - - handler.send(businessMessage(2), Collections.emptyMap(), null) - - verify(channel, timeout(businessRuleDuration.millis() + 300)).open() - - val captor = argumentCaptor { } - verify(channel, timeout(businessRuleCleanupDuration.millis() + 300).times(2)).send(captor.capture(), any(), anyOrNull(), any()) - - captor.firstValue.apply { - assertContains(mapOf(35 to "A"), this) - } - captor.secondValue.apply { - assertContains(mapOf(35 to "AE"), this) - } - - handler.close() - channel.close() - } - - @Test - fun testIgnoreIncomingMessagesStrategyWithNextExpectedSequenceNumber() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext(BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, ChronoUnit.SECONDS), cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.IGNORE_INCOMING_MESSAGES, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - missIncomingMessagesConfiguration = MissMessageConfiguration(3) - ), - ), - ), useNextExpectedSeqNum = true) { msg, mtd, mode -> - messages.add(Triple(msg, mtd, mode)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - val incomingSequence = testContext.incomingSequence - - val captor = argumentCaptor { } - - verify(channel, timeout(defaultRuleDuration.millis() + 1300)).open() - verify(channel, timeout(300)).send(any(), any(), anyOrNull(), any()) // Logon - clearInvocations(channel) - - Thread.sleep(200) // wait for strategies to apply - - handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) - handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) - handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) - - verify(channel, timeout(businessRuleDuration.millis() + businessRuleCleanupDuration.millis() + 1300)).open() - verify(channel, timeout(300)).send(captor.capture(), any(), anyOrNull(), any()) // Logon - clearInvocations(channel) - - captor.firstValue.apply { - assertContains(mapOf(35 to "A", 789 to "3"), this) - } - - handler.close() - channel.close() - } - - @Test - fun testIgnoreIncomingMessagesStrategyResendRequest() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - - val testContext = createTestContext(BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, ChronoUnit.SECONDS), cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.IGNORE_INCOMING_MESSAGES, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - missIncomingMessagesConfiguration = MissMessageConfiguration(3) - ), - ), - )) { msg, mtd, mode -> - messages.add(Triple(msg, mtd, mode)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - val incomingSequence = testContext.incomingSequence - - val captor = argumentCaptor { } - - clearInvocations(channel) - verify(channel, timeout(defaultRuleDuration.millis() + 300)).open() - verify(channel, timeout(300)).send(any(), any(), anyOrNull(), any()) // Logon // 2 - clearInvocations(channel) - - Thread.sleep(200) // wait for strategies to apply - - handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) // 3 - handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) // 4 - handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) // 5 - handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId()) // 6 - - verify(channel, timeout(businessRuleDuration.millis() + businessRuleCleanupDuration.millis() + 300)).open() - verify(channel, timeout(600).times(2)).send(captor.capture(), any(), anyOrNull(), any()) // Logon - clearInvocations(channel) - - captor.firstValue.apply { - assertContains(mapOf(35 to "2", 7 to "3", 16 to "5"), this) - } - - handler.close() - channel.close() - } - - @Test - fun testTransformLogonMessagesStrategy() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext(BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, ChronoUnit.SECONDS), cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.TRANSFORM_LOGON, - duration = businessRuleDuration, - cleanUpDuration = Duration.of(2, ChronoUnit.SECONDS), - transformMessageConfiguration = TransformMessageConfiguration( - listOf( - TransformationConfiguration( - listOf( - Action( - replace = FieldSelector( - tag = Constants.PASSWORD_TAG, - matches = Pattern.compile("pass"), - tagOneOf = null - ), - with = FieldDefinition( - tag = Constants.PASSWORD_TAG, - value = "mangledPassword", - tagOneOf = null, - valueOneOf = null - ) - ) - ), false, "A"), - TransformationConfiguration( - listOf( - Action( - replace = FieldSelector( - tag = Constants.PASSWORD_TAG, - matches = Pattern.compile("pass"), - tagOneOf = null - ), - with = FieldDefinition( - tag = Constants.PASSWORD_TAG, - value = "mangledPassword", - tagOneOf = null, - valueOneOf = null - ) - ) - ), false, "A") - ) - ), - ), - ) - )) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - - messages.clear() - verify(channel, timeout(defaultRuleDuration.millis() + businessRuleDuration.millis() + 300).times(3)).open() - - // start - verify(channel, timeout(800).times(3)).send(any(), any(), anyOrNull(), any()) - - messages[0].apply { - assertContains(mapOf(35 to "A", Constants.PASSWORD_TAG to "mangledPassword"), this.first) - } - messages[1].apply { - assertContains(mapOf(35 to "A", Constants.PASSWORD_TAG to "mangledPassword"), this.first) - } - messages[2].apply { - assertContains(mapOf(35 to "A", Constants.PASSWORD_TAG to "pass"), this.first) - } - - handler.close() - channel.close() - } - - @Test - fun testBidirectionalResendRequest() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext(BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = Duration.of(2, ChronoUnit.SECONDS), cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.BI_DIRECTIONAL_RESEND_REQUEST, - duration = Duration.of(6, ChronoUnit.SECONDS), - cleanUpDuration = Duration.of(2, ChronoUnit.SECONDS), - missIncomingMessagesConfiguration = MissMessageConfiguration(2), - missOutgoingMessagesConfiguration = MissMessageConfiguration(2) - ), - ) - )) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - clearInvocations(channel) - verify(channel, timeout(defaultRuleDuration.millis() + 300)).open() - verify(channel, timeout(600).times(1)).send(any(), any(), anyOrNull(), any()) - - Thread.sleep(200) // wait for strategies to apply - messages.clear() - - handler.onIncoming(channel, businessMessage(3), getMessageId()) - handler.onIncoming(channel, businessMessage(4), getMessageId()) - - handler.onOutgoing(channel, businessMessage(3).asExpandable(), Collections.emptyMap()) - handler.onOutgoing(channel, businessMessage(4).asExpandable(), Collections.emptyMap()) - - // Trigger resend request - handler.onIncoming(channel, businessMessage(5), getMessageId()) - - handler.onIncoming(channel, businessMessage(3, true), getMessageId()) - handler.onIncoming(channel, businessMessage(4, true), getMessageId()) - // end - - // Trigger recovery - handler.onIncoming(channel, resendRequest(6, 3, 4), getMessageId()) - // end - - messages[0].apply { - val buff = first - assertContains(mapOf(35 to "2", 7 to "3", 16 to "4"), buff) - } - - messages[1].apply { - val buff = first - assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "3"), buff) - } - - messages[2].apply { - val buff = first - assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "4"), buff) - } - - handler.close() - channel.close() - } - - @Test - fun testOutgoingGap() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(3, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext(BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(2, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.CREATE_OUTGOING_GAP, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - missOutgoingMessagesConfiguration = MissMessageConfiguration(3) - ), - ) - )) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - - verify(channel, timeout(defaultRuleDuration.millis() + 300)).open() - verify(channel).send(any(), any(), anyOrNull(), any()) // Logon - clearInvocations(channel) - - Thread.sleep(500) // Waiting for strategy to apply - - handler.onOutgoing(channel, businessMessage(3).asExpandable(), Collections.emptyMap()) - handler.onOutgoing(channel, businessMessage(4).asExpandable(), Collections.emptyMap()) - handler.onOutgoing(channel, businessMessage(5).asExpandable(), Collections.emptyMap()) - clearInvocations(channel) - val captor = argumentCaptor { } - - handler.onIncoming(channel, resendRequest(3, 3, 5), getMessageId()) - verify(channel, timeout(500).times(3)).send(captor.capture(), any(), anyOrNull(), any()) // recovery - clearInvocations(channel) - - verify(channel, timeout(businessRuleDuration.millis() + 300)).open() - verify(channel, timeout(300)).send(any(), any(), anyOrNull(), any()) // Logon - clearInvocations(channel) - messages.clear() - - captor.firstValue.apply { - assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "3"), this) - } - - captor.secondValue.apply { - assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "4"), this) - } - - captor.thirdValue.apply { - assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "5"), this) - } - - handler.close() - channel.close() - } - - @Test - fun testClientOutage() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext( - BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.CLIENT_OUTAGE, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - ), - ) - ), - 1 - ) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - val seq = testContext.incomingSequence - - Thread.sleep(defaultRuleDuration.millis() + 100) // Waiting for strategy to apply - messages.clear() - - handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) - handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) - handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) - - handler.onClose(channel) - handler.onOpen(channel) - - clearInvocations(channel) - - for (message in messages) { - val buff = message.first - assertTrue("Message shouldn't be heartbeat or AE: ${buff.toString(Charsets.US_ASCII)}") { !(buff.contains("35=AE") || buff.contains("35=0")) } - } - - handler.close() - channel.close() - } - - @Test - fun testPartialClientOutage() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext( - BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.PARTIAL_CLIENT_OUTAGE, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - ), - ) - ), - 1 - ) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - val seq = testContext.incomingSequence - - Thread.sleep(defaultRuleDuration.millis() + 100) // Waiting for strategy to apply - messages.clear() - - handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) - handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) - handler.onIncoming(channel, testRequest(seq.incrementAndGet()).asExpandable(), getMessageId()) - - handler.onClose(channel) - handler.onOpen(channel) - - clearInvocations(channel) - - for (message in messages) { - val buff = message.first - if(buff.isEmpty()) continue - if(!buff.contains("35=0")) continue - assertContains(mapOf(35 to "0", 112 to "test"), buff) - } - - handler.close() - channel.close() - } - - @Test - fun testSequenceResetStrategyOutgoing() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext( - BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.SEQUENCE_RESET, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - changeSequenceConfiguration = ChangeSequenceConfiguration(5, false) - ), - ) - ) - ) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - - verify(channel, timeout(defaultRuleDuration.millis() + businessRuleCleanupDuration.millis() + 300)).open() - messages.clear() - verify(channel, timeout(500)).send(any(), any(), anyOrNull(), any()) // Logon - - handler.onIncoming(channel, resendRequest(3, 3, 8), getMessageId()) - - clearInvocations(channel) - - for(message in messages) { - if(!message.first.contains("35=4")) continue - assertContains(mapOf(35 to "4", 34 to "3", 36 to "8"), message.first) - } - - handler.close() - channel.close() - } - - @Test - fun testResendRequestStrategy() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext( - BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.RESEND_REQUEST, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - resendRequestConfiguration = ResendRequestConfiguration(5) - ), - ) - ) - ) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - - val captor = argumentCaptor {} - - handler.onIncoming(channel, businessMessage(2).asExpandable(), getMessageId()) - handler.onIncoming(channel, businessMessage(3).asExpandable(), getMessageId()) - handler.onIncoming(channel, businessMessage(4).asExpandable(), getMessageId()) - handler.onIncoming(channel, businessMessage(5).asExpandable(), getMessageId()) - handler.onIncoming(channel, businessMessage(6).asExpandable(), getMessageId()) - verify(channel, timeout(defaultRuleDuration.millis() + 300).times(1)).send(captor.capture(), any(), anyOrNull(), any()) - - captor.firstValue.apply { - assertContains(mapOf(35 to "2", 7 to "1", 16 to "6"), this) - } - - clearInvocations(channel) - - handler.close() - channel.close() - } - - @Test - fun testBatchSend() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext( - BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.BATCH_SEND, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - batchSendConfiguration = BatchSendConfiguration(3) - ), - ) - ) - ) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - Thread.sleep(defaultRuleDuration.millis() + 300) - clearInvocations(channel) - - handler.send(businessMessage(2).asExpandable(), mutableMapOf(), null) - handler.send(businessMessage(3).asExpandable(), mutableMapOf(), null) - handler.send(businessMessage(4).asExpandable(), mutableMapOf(), null) - handler.send(businessMessage(5).asExpandable(), mutableMapOf(), null) - - val captor = argumentCaptor {} - verify(channel, timeout(businessRuleDuration.millis() + 600).times(2)).send(captor.capture(), any(), anyOrNull(), any()) - - val sizeOfOneMessage = businessMessage(2).asExpandable().also { - handler.onOutgoingUpdateTag(it, mutableMapOf()) - }.readableBytes() - - captor.firstValue.apply { - println(readableBytes()) - println(sizeOfOneMessage * 3) - assertTrue { this.readableBytes() >= sizeOfOneMessage * 3 } - } - - captor.secondValue.apply { - assertEquals(this.readableBytes(), sizeOfOneMessage) - } - - handler.close() - channel.close() - } - - @Test - fun testSplitSend() { - val defaultRuleDuration = Duration.of(2, ChronoUnit.SECONDS) - val businessRuleDuration = Duration.of(6, ChronoUnit.SECONDS) - val businessRuleCleanupDuration = Duration.of(2, ChronoUnit.SECONDS) - val messages = mutableListOf, IChannel.SendMode>>() - val testContext = createTestContext( - BrokenConnConfiguration( - SchedulerType.CONSECUTIVE, - listOf( - RuleConfiguration(RuleType.DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(0, ChronoUnit.SECONDS)), - RuleConfiguration( - RuleType.SPLIT_SEND, - duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, - splitSendConfiguration = SplitSendConfiguration(3, 100) - ), - ) - ) - ) { msg, mode, mtd -> - messages.add(Triple(msg, mode, mtd)) - } - - val channel = testContext.channel - val handler = testContext.fixHandler - Thread.sleep(defaultRuleDuration.millis() + 300) - clearInvocations(channel) - val businessMessage = businessMessage(2).asExpandable() - - handler.send(businessMessage, mutableMapOf(), null) - - val captor = argumentCaptor {} - verify(channel, timeout(businessRuleDuration.millis() + 300).times(4)).send(captor.capture(), any(), anyOrNull(), any()) - - val partSize = businessMessage.readableBytes() / 3 - - captor.firstValue.apply { - assertEquals(partSize, this.readableBytes()) - } - - captor.secondValue.apply { - assertEquals(partSize, this.readableBytes()) - } - - captor.thirdValue.apply { - assertTrue { this.readableBytes() >= partSize } - } - - captor.allValues[3].apply { - assertEquals(businessMessage.readableBytes(), this.readableBytes()) - } - - handler.close() - channel.close() - } - - private fun createTestContext( - strategyConfig: BrokenConnConfiguration, - hbtInt: Int = 30, - enableAdditionalHandling: Boolean = true, - useNextExpectedSeqNum: Boolean = false, - sendHandlerExtension: (ByteBuf, Map, IChannel.SendMode) -> Unit = {_, _, _ ->}): TestContext { - val handlerSettings = createHandlerSettings(strategyConfig, hbtInt, useNextExpectedSeqNum) - val channel: IChannel = mock {} - val context: IHandlerContext = mock { - on { settings }.thenReturn(handlerSettings) - on { createChannel(any(), any(), any(), any(), any(), any(), any()) }.thenReturn(channel) - } - - var incomingSequence = AtomicInteger(0) - var outgoingSequence = 0 - - val handler = FixHandler(context) - val onSendHandler: (ByteBuf, IChannel.SendMode) -> CompletableFuture = { msg, mode -> - outgoingSequence += 1 - if(enableAdditionalHandling) { - val expanded = msg.copy().asExpandable() - val metadata = mutableMapOf() - if(mode == IChannel.SendMode.HANDLE || mode == IChannel.SendMode.HANDLE_AND_MANGLE) { - handler.onOutgoing(channel, expanded, metadata) - } - sendHandlerExtension(expanded, metadata, mode) - } - if(msg.contains("35=A\u0001")) { - if(useNextExpectedSeqNum) { - handler.onIncoming(channel, logonResponseWithNextExpectedSeq(incomingSequence.incrementAndGet(), outgoingSequence + 1), getMessageId()) - } else { - handler.onIncoming(channel, logonResponse(incomingSequence.incrementAndGet()), getMessageId()) - } - } - CompletableFuture.completedFuture(getMessageId()) - } - var isOpen = false - whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { - onSendHandler(it.arguments[0] as ByteBuf, it.arguments[3] as IChannel.SendMode) - } - whenever(channel.isOpen).doAnswer { - isOpen - } - whenever(channel.open()).doAnswer { - isOpen = true - handler.onOpen(channel) - mock { } - } - whenever(channel.close()).doAnswer { - isOpen = false - handler.onClose(channel) - mock { } - } - - channel.open() - clearInvocations(channel) - - return TestContext(channel, handler, incomingSequence) - } - - private fun assertContains(values: Map, message: ByteBuf) { - val expected = values.map { "${it.key}=${it.value}" }.joinToString(",") - assertTrue("Expected message to have $expected tags: ${message.toString(Charsets.US_ASCII)}") { - values.all { message.contains("${it.key}=${it.value}") } - } - } - - companion object { - private fun logonResponse(seq: Int): ByteBuf = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=105\u000135=A\u000134=${seq}\u000149=server\u000156=client\u000150=system\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u00011409=0\u000110=203\u0001".toByteArray(Charsets.US_ASCII)) - private fun businessMessage(seq: Int?, possDup: Boolean = false): ByteBuf = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=13\u000135=AE${if(seq != null) "\u000134=${seq}" else ""}${if(possDup) "\u000143=Y" else ""}\u0001552=1\u000110=169\u0001".toByteArray(Charsets.US_ASCII)) - private fun resendRequest(seq: Int, begin: Int, end: Int): ByteBuf = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=13\u000135=2\u000134=${seq}\u00017=${begin}\u000116=${end}\u0001552=1\u000110=169".toByteArray(Charsets.US_ASCII)) - private fun testRequest(seq: Int) = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=13\u000135=1\u000134=${seq}\u0001112=test\u00011552=1\u000110=169".toByteArray(Charsets.US_ASCII)) - private fun logonResponseWithNextExpectedSeq(seq: Int, nextExpected: Int) = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=105\u000135=A\u000134=${seq}\u0001789=${nextExpected}\u000149=server\u000156=client\u000150=system\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u00011409=0\u000110=203\u0001".toByteArray(Charsets.US_ASCII)) - private fun Duration.millis() = toMillis() - private fun getMessageId() = MessageID.newBuilder().apply { - sequence = System.nanoTime() - bookName = "Test" - direction = Direction.FIRST - timestamp = Instant.now().toTimestamp() - connectionId = ConnectionID.newBuilder().apply { - sessionAlias = "SA" - sessionGroup = "SG" - }.build() - }.build() - } -} \ No newline at end of file