From 1e30b694aef0423883866dab5074c1ee92395917 Mon Sep 17 00:00:00 2001 From: "denis.plotnikov" Date: Fri, 4 Oct 2024 17:24:53 +0300 Subject: [PATCH] TS-2689: Structure mutation strategies --- .../java/com/exactpro/th2/FixHandler.java | 126 +++++++++++++- .../dirty/fix/FIXMessageStructureMutator.kt | 159 ++++++++++++++++++ .../AdjustSendingTimeConfiguration.kt | 23 +++ .../CorruptMessageStructureConfiguration.kt | 29 ++++ .../configuration/RuleConfiguration.kt | 10 +- .../dirty/fix/brokenconn/strategy/RuleType.kt | 4 +- .../brokenconn/strategy/StatefulStrategy.kt | 13 +- .../fix/brokenconn/strategy/StrategyState.kt | 3 + .../fix/FIXMessageStructureMutatorTest.kt | 77 +++++++++ 9 files changed, 430 insertions(+), 14 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FIXMessageStructureMutator.kt create mode 100644 src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/AdjustSendingTimeConfiguration.kt create mode 100644 src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/CorruptMessageStructureConfiguration.kt create mode 100644 src/test/kotlin/com/exactpro/th2/conn/dirty/fix/FIXMessageStructureMutatorTest.kt diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index dbc6314..07282ed 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -41,6 +41,9 @@ import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.IncomingMessagesStrategy; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.OutgoingMessagesStrategy; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.ReceiveStrategy; +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.CorruptMessageStructureConfiguration; +import com.exactpro.th2.conn.dirty.fix.FIXMessageStructureMutator; +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.AdjustSendingTimeConfiguration; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.RuleType; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.SchedulerType; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.SendStrategy; @@ -425,7 +428,7 @@ public CompletableFuture send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map transformOutgoingMessageStrategy(ByteBuf message, Ma return null; } + private Map corruptMessageStructureProcessor(ByteBuf message, Map metadata) { + onOutgoingUpdateTag(message, metadata); + Set disableForMessageTypes = strategy.getDisableForMessageTypes(); + + FixField msgTypeField = findField(message, MSG_TYPE_TAG, US_ASCII); + if(msgTypeField != null && msgTypeField.getValue() != null && disableForMessageTypes.contains(msgTypeField.getValue())) { + LOGGER.info("Strategy '{}' is disabled for {} message type", strategy.getType(), msgTypeField.getValue()); + return null; + } + String msgType = msgTypeField.getValue(); + if(ADMIN_MESSAGES.contains(msgType) && !Objects.equals(msgType, MSG_TYPE_HEARTBEAT)) { + return null; + } + + StrategyState state = strategy.getState(); + if(state.getMessageCorrupted().get()) { + return null; + } + + CorruptMessageStructureConfiguration config = strategy.getCorruptMessageStructureConfiguration(); + FIXMessageStructureMutator mutator = new FIXMessageStructureMutator( + config.getHeaderTags(), + config.getTrailerTags() + ); + if (config.getMoveHeaderConfiguration() != null) { + mutator.moveHeader(config.getMoveHeaderConfiguration().getPosition(), message); + metadata.put("structureMutated", "Y"); + metadata.put("structureMutationKind", "header"); + metadata.put("structureMutationPosition", config.getMoveHeaderConfiguration().getPosition().name()); + } + + if (config.getMoveTrailerConfiguration() != null) { + mutator.moveTrailer(config.getMoveTrailerConfiguration().getPosition(), message); + metadata.put("structureMutated", "Y"); + metadata.put("structureMutationKind", "trailer"); + metadata.put("structureMutationPosition", config.getMoveTrailerConfiguration().getPosition().name()); + } + + state.getMessageCorrupted().set(true); + + return null; + } + + private Map adjustSendingTimeStrategyProcessor(ByteBuf message, Map metadata) { + onOutgoingUpdateTag(message, metadata); + Set disableForMessageTypes = strategy.getDisableForMessageTypes(); + + FixField msgTypeField = findField(message, MSG_TYPE_TAG, US_ASCII); + if(msgTypeField != null && msgTypeField.getValue() != null && disableForMessageTypes.contains(msgTypeField.getValue())) { + LOGGER.info("Strategy '{}' is disabled for {} message type", strategy.getType(), msgTypeField.getValue()); + return null; + } + String msgType = msgTypeField.getValue(); + if(ADMIN_MESSAGES.contains(msgType) && !Objects.equals(msgType, MSG_TYPE_HEARTBEAT)) { + return null; + } + + StrategyState state = strategy.getState(); + if(state.getMessageCorrupted().get()) { + return null; + } + + AdjustSendingTimeConfiguration config = strategy.getAdjustSendingTimeConfiguration(); + metadata.put("sendingTimeUpdated", "Y"); + metadata.put("sendingTimeUpdateSeconds", Long.toString(config.getAdjustDuration().toSeconds())); + + updateSendingTime(message, config.getAdjustDuration(), config.getSubstract()); + + return null; + } + private Map fakeRetransmissionOutgoingProcessor(ByteBuf message, Map metadata) { onOutgoingUpdateTag(message, metadata); @@ -1744,7 +1818,7 @@ private Map blockReceiveQueue(ByteBuf message, Map {x.setOutgoingMessageProcessor(this::corruptMessageStructureProcessor); return Unit.INSTANCE;}); + strategy.setCleanupHandler(this::cleanUpCorruptMessageStructureStrategy); + ruleStartEvent(strategy.getType(), strategy.getStartTime()); + } + + private void cleanUpCorruptMessageStructureStrategy() { + strategy.updateOutgoingMessageStrategy(x -> {x.setOutgoingMessageProcessor(this::defaultOutgoingStrategy); return Unit.INSTANCE;}); + ruleEndEvent(strategy.getType(), strategy.getStartTime(), strategy.getState().getMessageIDs()); + strategy.cleanupStrategy(); + } + + private void setupAdjustSendingTimeStrategy(RuleConfiguration configuration) { + strategy.resetStrategyAndState(configuration); + strategy.updateOutgoingMessageStrategy(x -> {x.setOutgoingMessageProcessor(this::adjustSendingTimeStrategyProcessor); return Unit.INSTANCE;}); + strategy.setCleanupHandler(this::cleanUpAdjustSendingTimeStrategy); + ruleStartEvent(strategy.getType(), strategy.getStartTime()); + } + + private void cleanUpAdjustSendingTimeStrategy() { + strategy.updateOutgoingMessageStrategy(x -> {x.setOutgoingMessageProcessor(this::defaultOutgoingStrategy); return Unit.INSTANCE;}); + ruleEndEvent(strategy.getType(), strategy.getStartTime(), strategy.getState().getMessageIDs()); + strategy.cleanupStrategy(); + } // private Map defaultMessageProcessor(ByteBuf message, Map metadata) {return null;} @@ -2352,6 +2452,8 @@ private Consumer getSetupFunction(RuleConfiguration config) { case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy; case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages; case LOGON_FROM_ANOTHER_CONNECTION: return this::runLogonFromAnotherConnection; + case CORRUPT_MESSAGE_STRUCTURE: return this::setupCorruptMessageStructureStrategy; + case ADJUST_SENDING_TIME: return this::setupAdjustSendingTimeStrategy; case DEFAULT: return configuration -> strategy.cleanupStrategy(); default: throw new IllegalStateException(String.format("Unknown strategy type %s.", config.getRuleType())); } @@ -2473,6 +2575,18 @@ private void updateSendingTime(ByteBuf buf) { sendingTime.setValue(getTime()); } + private void updateSendingTime(ByteBuf buf, Duration adjustDuration, boolean substract) { + FixField sendingTime = Objects.requireNonNull(findField(buf, SENDING_TIME_TAG)); + DateTimeFormatter formatter = settings.getSendingDateTimeFormat(); + LocalDateTime datetime = LocalDateTime.now(ZoneOffset.UTC); + if(substract) { + datetime.minus(adjustDuration); + } else { + datetime.plus(adjustDuration); + } + sendingTime.setValue(formatter.format(datetime)); + } + private void setTime(ByteBuf buf) { FixField sendingTime = findField(buf, SENDING_TIME_TAG); FixField seqNum = requireNonNull(findField(buf, MSG_SEQ_NUM_TAG), "SeqNum field was null."); @@ -2520,7 +2634,7 @@ private void waitUntilLoggedIn() { while (!enabled.get() && System.currentTimeMillis() - start < 2000) { LOGGER.info("Waiting until session will be logged in: {}", channel.getSessionAlias()); try { - Thread.sleep(10); + Thread.sleep(100); } catch (Exception e) { LOGGER.error("Error while waiting session login.", e); } @@ -2533,7 +2647,7 @@ private void waitLogoutResponse() { if (LOGGER.isWarnEnabled()) LOGGER.warn("Waiting session logout: {}", channel.getSessionAlias()); try { //noinspection BusyWait - Thread.sleep(10); + Thread.sleep(100); } catch (InterruptedException e) { LOGGER.error("Error while sleeping."); } diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FIXMessageStructureMutator.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FIXMessageStructureMutator.kt new file mode 100644 index 0000000..c5dc823 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FIXMessageStructureMutator.kt @@ -0,0 +1,159 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import io.netty.buffer.ByteBuf +import mu.KotlinLogging +import kotlin.text.Charsets.US_ASCII + +enum class HeaderPosition { + INSIDE_BODY, + AFTER_BODY, + INSIDE_TRAILER, + AFTER_TRAILER +} + +enum class TrailerPosition { + BEFORE_HEADER, + INSIDE_HEADER, + AFTER_HEADER, + INSIDE_BODY +} + +class FIXMessageStructureMutator( + private val headerTags: Set, + private val trailerTags: Set +) { + + fun moveHeader(dst: HeaderPosition, message: ByteBuf): ByteBuf { + val condition: (FixField) -> Boolean = {headerTags.contains(it.tag)} + + val field = when (dst) { + HeaderPosition.INSIDE_BODY -> getBodyFirstTag(message) + HeaderPosition.AFTER_BODY -> getBodyLastTag(message) + HeaderPosition.INSIDE_TRAILER -> getTrailerFirstTag(message) + HeaderPosition.AFTER_TRAILER -> getTrailerLastTag(message) + } + + if (field?.tag == null) { + K_LOGGER.warn { "Move header operation failed. Not found position to insert for $dst in message: ${message.toString( + US_ASCII)}" } + return message + } + + moveBlockAfter(field.tag!!, message, condition) + return message + } + + fun moveTrailer(dst: TrailerPosition, message: ByteBuf): ByteBuf { + val condition: (FixField) -> Boolean = {trailerTags.contains(it.tag)} + + val field = when(dst) { + TrailerPosition.BEFORE_HEADER -> getHeaderFirstTag(message) + TrailerPosition.INSIDE_HEADER -> getHeaderFirstTag(message) + TrailerPosition.AFTER_HEADER -> getHeaderLastTag(message) + TrailerPosition.INSIDE_BODY -> getBodyFirstTag(message) + } + + if (field?.tag == null) { + K_LOGGER.warn { "Move trailer operation failed. Not found position to insert for $dst in message: ${message.toString( + US_ASCII)}" } + return message + } + if(dst == TrailerPosition.BEFORE_HEADER) { + moveBlockBefore(field.tag!!, message, condition) + } else { + moveBlockAfter(field.tag!!, message, condition) + } + return message + } + + private fun moveBlockAfter(afterTag: Int, message: ByteBuf, condition: (FixField) -> Boolean) { + val toMove = mutableListOf>() + message.forEachField { + if(condition(it)) { + toMove.add(Pair(it.tag!!, it.value!!)) + it.clear() + } + } + + val afterField = message.findField(afterTag) + + if (afterField == null) { + K_LOGGER.warn { "Something went wrong while moving block. Tag $afterTag is missing from message. Impossible to move block." } + return + } + + var previous: FixField = afterField + toMove.forEach { + previous = previous.insertNext(it.first, it.second) + } + } + + private fun moveBlockBefore(beforeTag: Int, message: ByteBuf, condition: (FixField) -> Boolean) { + val toMove = mutableListOf>() + message.forEachField { + if(condition(it)) { + toMove.add(0, Pair(it.tag!!, it.value!!)) + it.clear() + } + } + + val afterField = message.findField(beforeTag) + + if (afterField == null) { + K_LOGGER.warn { "Something went wrong while moving block. Tag $beforeTag is missing from message. Impossible to move block." } + return + } + + var previous: FixField = afterField + toMove.forEach { + previous = previous.insertPrevious(it.first, it.second) + } + } + + private fun getBodyFirstTag(message: ByteBuf): FixField? = findFirstTag(message) { !headerTags.contains(it.tag) } + private fun getBodyLastTag(message: ByteBuf): FixField? = findLastTag(message) { !headerTags.contains(it.tag) && !trailerTags.contains(it.tag) } + + private fun getTrailerFirstTag(message: ByteBuf): FixField? = findFirstTag(message) { trailerTags.contains(it.tag) } + private fun getTrailerLastTag(message: ByteBuf): FixField? = findLastTag(message) { trailerTags.contains(it.tag) } + + private fun getHeaderFirstTag(message: ByteBuf): FixField? = findFirstTag(message) { headerTags.contains(it.tag) } + private fun getHeaderLastTag(message: ByteBuf): FixField? = findLastTag(message) { headerTags.contains(it.tag) } + + private fun findFirstTag(message: ByteBuf, condition: (FixField) -> Boolean): FixField? { + message.forEachField { + if(condition(it)) { + return it + } + } + return null + } + + private fun findLastTag(message: ByteBuf, condition: (FixField) -> Boolean): FixField? { + var lastField: FixField? = null + message.forEachField { + if(condition(it)) { + lastField = it + } + } + return lastField + } + + companion object { + val K_LOGGER = KotlinLogging.logger { } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/AdjustSendingTimeConfiguration.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/AdjustSendingTimeConfiguration.kt new file mode 100644 index 0000000..78869c8 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/AdjustSendingTimeConfiguration.kt @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix.brokenconn.configuration + +import java.time.Duration + +data class AdjustSendingTimeConfiguration( + val adjustDuration: Duration, + val substract: Boolean = false +) \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/CorruptMessageStructureConfiguration.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/CorruptMessageStructureConfiguration.kt new file mode 100644 index 0000000..3dfa603 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/CorruptMessageStructureConfiguration.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix.brokenconn.configuration + +import com.exactpro.th2.conn.dirty.fix.HeaderPosition +import com.exactpro.th2.conn.dirty.fix.TrailerPosition + +data class MoveTrailerConfiguration(val position: TrailerPosition) +data class MoveHeaderConfiguration(val position: HeaderPosition) + +data class CorruptMessageStructureConfiguration( + val trailerTags: Set = setOf(10), + val headerTags: Set = setOf(8, 9, 35, 1128, 49, 56, 43, 97, 52, 122), + val moveHeaderConfiguration: MoveHeaderConfiguration?, + val moveTrailerConfiguration: MoveTrailerConfiguration? +) \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt index 1585b70..00cf356 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt @@ -41,7 +41,9 @@ data class RuleConfiguration( val changeSequenceConfiguration: ChangeSequenceConfiguration? = null, val resendRequestConfiguration: ResendRequestConfiguration? = null, val sendSequenceResetConfiguration: SendSequenceResetConfiguration? = null, - val disableForMessageTypes: Set = setOf("q") // Order Mass Cansel Request (q) message shouldn't be transformed + val disableForMessageTypes: Set = setOf("q"), // Order Mass Cansel Request (q) message shouldn't be transformed + val corruptMessageStructureConfiguration: CorruptMessageStructureConfiguration? = null, + val adjustSendingTimeConfiguration: AdjustSendingTimeConfiguration? = null ) { init { when(ruleType) { @@ -86,6 +88,12 @@ data class RuleConfiguration( RuleType.LOGON_AFTER_LOGON -> {} RuleType.POSS_DUP_SESSION_MESSAGES -> {} RuleType.LOGON_FROM_ANOTHER_CONNECTION -> {} + RuleType.ADJUST_SENDING_TIME -> { + require(adjustSendingTimeConfiguration != null) { "`adjustSendingTimeConfiguration` is required for $ruleType" } + } + RuleType.CORRUPT_MESSAGE_STRUCTURE -> { + require(corruptMessageStructureConfiguration != null) { "`corruptMessageStructureConfiguration` is required for $ruleType"} + } } } diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt index 5b33e87..85b160e 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt @@ -35,5 +35,7 @@ enum class RuleType { INVALID_CHECKSUM, POSS_DUP_SESSION_MESSAGES, LOGON_FROM_ANOTHER_CONNECTION, - DEFAULT + DEFAULT, + CORRUPT_MESSAGE_STRUCTURE, + ADJUST_SENDING_TIME } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt index 1c32850..3bc8f57 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt @@ -15,12 +15,7 @@ */ package com.exactpro.th2.conn.dirty.fix.brokenconn.strategy -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BatchSendConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.MissMessageConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RecoveryConfig -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SplitSendConfiguration -import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformMessageConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.* import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyState.Companion.resetAndCopyMissedMessages import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.CleanupHandler import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.OnCloseHandler @@ -81,6 +76,12 @@ class StatefulStrategy( val recoveryConfig: RecoveryConfig get() = lock.read { state.config?.recoveryConfig ?: RecoveryConfig() } + val corruptMessageStructureConfiguration: CorruptMessageStructureConfiguration + get() = lock.read { state.config?.corruptMessageStructureConfiguration ?: error("corruptMessageSturctureConfiguration isn't present.") } + + val adjustSendingTimeConfiguration: AdjustSendingTimeConfiguration + get() = lock.read { state.config?.adjustSendingTimeConfiguration ?: error("adjustSendingTimeConfiguration isn't present.") } + // strategies fun updateSendStrategy(func: SendStrategy.() -> Unit) = lock.write { sendStrategy.func() diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt index f28ccf4..fbf63be 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt @@ -25,6 +25,7 @@ import io.netty.buffer.Unpooled import mu.KotlinLogging import java.time.Instant import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.read import kotlin.concurrent.write @@ -78,6 +79,8 @@ class StrategyState(val config: RuleConfiguration? = null, } } + var messageCorrupted = AtomicBoolean(false) + fun getMissedMessage(sequence: Long): ByteBuf? = lock.read { missedMessagesCache.remove(sequence) } fun updateCacheAndRunOnCondition(message: ByteBuf, condition: (Int) -> Boolean, function: (ByteBuf) -> Unit) = lock.write { diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/FIXMessageStructureMutatorTest.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/FIXMessageStructureMutatorTest.kt new file mode 100644 index 0000000..4f0b038 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/FIXMessageStructureMutatorTest.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Assertions.assertEquals +import kotlin.text.Charsets.UTF_8 + +class FIXMessageStructureMutatorTest { + + @Test fun `move header inside body fields`() { + val newMessage = MESSAGE_STRUCTURE_MUTATOR.moveHeader(HeaderPosition.INSIDE_BODY, MESSAGE.toBuffer()) + assertEquals("11=TEST|8=FIX.4.2|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|55=TEST|89=10|10=062|", newMessage.asString()) + } + + @Test fun `move header after body fields`() { + val newMessage = MESSAGE_STRUCTURE_MUTATOR.moveHeader(HeaderPosition.AFTER_BODY, MESSAGE.toBuffer()) + assertEquals("11=TEST|55=TEST|8=FIX.4.2|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|89=10|10=062|", newMessage.asString()) + } + + @Test fun `move header inside trailer fields`() { + val newMessage = MESSAGE_STRUCTURE_MUTATOR.moveHeader(HeaderPosition.INSIDE_TRAILER, MESSAGE.toBuffer()) + assertEquals("11=TEST|55=TEST|89=10|8=FIX.4.2|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|10=062|", newMessage.asString()) + } + + @Test fun `move header after trailer fields`() { + val newMessage = MESSAGE_STRUCTURE_MUTATOR.moveHeader(HeaderPosition.AFTER_TRAILER, MESSAGE.toBuffer()) + assertEquals("11=TEST|55=TEST|89=10|10=062|8=FIX.4.2|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|", newMessage.asString()) + } + + @Test fun `move trailer before header`() { + val newMessage = MESSAGE_STRUCTURE_MUTATOR.moveTrailer(TrailerPosition.BEFORE_HEADER, MESSAGE.toBuffer()) + assertEquals("89=10|10=062|8=FIX.4.2|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|11=TEST|55=TEST|", newMessage.asString()) + } + + @Test fun `move trailer inside header`() { + val newMessage = MESSAGE_STRUCTURE_MUTATOR.moveTrailer(TrailerPosition.INSIDE_HEADER, MESSAGE.toBuffer()) + assertEquals("8=FIX.4.2|89=10|10=062|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|11=TEST|55=TEST|", newMessage.asString()) + } + + @Test fun `move trailer after header`() { + val newMessage = MESSAGE_STRUCTURE_MUTATOR.moveTrailer(TrailerPosition.AFTER_HEADER, MESSAGE.toBuffer()) + assertEquals("8=FIX.4.2|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|89=10|10=062|11=TEST|55=TEST|", newMessage.asString()) + } + + @Test fun `move trailer inside body`() { + val newMessage = MESSAGE_STRUCTURE_MUTATOR.moveTrailer(TrailerPosition.INSIDE_BODY, MESSAGE.toBuffer()) + assertEquals("8=FIX.4.2|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|11=TEST|89=10|10=062|55=TEST|", newMessage.asString()) + } + + companion object { + private const val MESSAGE = "8=FIX.4.2|9=65|35=A|49=SERVER|56=CLIENT|34=177|52=20090107-18:15:16|98=0|11=TEST|55=TEST|89=10|10=062|" + private val HEADER_TAGS = setOf(8, 9, 35, 49, 56, 34, 52, 98) + private val TRAILER_TAGS = setOf(89, 10) + private val MESSAGE_STRUCTURE_MUTATOR = FIXMessageStructureMutator( + HEADER_TAGS, + TRAILER_TAGS + ) + private fun String.toBuffer() = Unpooled.buffer().writeBytes(replace('|', SOH_CHAR).toByteArray(UTF_8)) + private fun ByteBuf.asString() = toString(UTF_8).replace(SOH_CHAR, '|') + } +} \ No newline at end of file