Skip to content

Commit

Permalink
TS-2689: Structure mutation strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
denis.plotnikov committed Oct 4, 2024
1 parent b166e3d commit 1e30b69
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 14 deletions.
126 changes: 120 additions & 6 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.IncomingMessagesStrategy;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.OutgoingMessagesStrategy;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.ReceiveStrategy;
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.CorruptMessageStructureConfiguration;
import com.exactpro.th2.conn.dirty.fix.FIXMessageStructureMutator;
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.AdjustSendingTimeConfiguration;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.RuleType;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.SchedulerType;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.SendStrategy;
Expand Down Expand Up @@ -425,7 +428,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
if (LOGGER.isWarnEnabled()) LOGGER.warn("Session is not yet logged in: {}", channel.getSessionAlias());
try {
//noinspection BusyWait
Thread.sleep(10);
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Error while sleeping.");
}
Expand All @@ -440,7 +443,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
if (LOGGER.isWarnEnabled()) LOGGER.warn("Session is not yet logged in: {}", channel.getSessionAlias());
try {
//noinspection BusyWait
Thread.sleep(10);
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Error while sleeping.");
}
Expand Down Expand Up @@ -1581,7 +1584,7 @@ private void transformProcessor(
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime <= timeToBlock) {
try {
Thread.sleep(10);
Thread.sleep(100);
} catch (Exception e) {
LOGGER.error("Error while blocking send.", e);
}
Expand Down Expand Up @@ -1649,6 +1652,77 @@ private Map<String, String> transformOutgoingMessageStrategy(ByteBuf message, Ma
return null;
}

private Map<String, String> corruptMessageStructureProcessor(ByteBuf message, Map<String, String> metadata) {
onOutgoingUpdateTag(message, metadata);
Set <String> disableForMessageTypes = strategy.getDisableForMessageTypes();

FixField msgTypeField = findField(message, MSG_TYPE_TAG, US_ASCII);
if(msgTypeField != null && msgTypeField.getValue() != null && disableForMessageTypes.contains(msgTypeField.getValue())) {
LOGGER.info("Strategy '{}' is disabled for {} message type", strategy.getType(), msgTypeField.getValue());
return null;
}
String msgType = msgTypeField.getValue();
if(ADMIN_MESSAGES.contains(msgType) && !Objects.equals(msgType, MSG_TYPE_HEARTBEAT)) {
return null;
}

StrategyState state = strategy.getState();
if(state.getMessageCorrupted().get()) {
return null;
}

CorruptMessageStructureConfiguration config = strategy.getCorruptMessageStructureConfiguration();
FIXMessageStructureMutator mutator = new FIXMessageStructureMutator(
config.getHeaderTags(),
config.getTrailerTags()
);
if (config.getMoveHeaderConfiguration() != null) {
mutator.moveHeader(config.getMoveHeaderConfiguration().getPosition(), message);
metadata.put("structureMutated", "Y");
metadata.put("structureMutationKind", "header");
metadata.put("structureMutationPosition", config.getMoveHeaderConfiguration().getPosition().name());
}

if (config.getMoveTrailerConfiguration() != null) {
mutator.moveTrailer(config.getMoveTrailerConfiguration().getPosition(), message);
metadata.put("structureMutated", "Y");
metadata.put("structureMutationKind", "trailer");
metadata.put("structureMutationPosition", config.getMoveTrailerConfiguration().getPosition().name());
}

state.getMessageCorrupted().set(true);

return null;
}

private Map<String, String> adjustSendingTimeStrategyProcessor(ByteBuf message, Map<String, String> metadata) {
onOutgoingUpdateTag(message, metadata);
Set <String> disableForMessageTypes = strategy.getDisableForMessageTypes();

FixField msgTypeField = findField(message, MSG_TYPE_TAG, US_ASCII);
if(msgTypeField != null && msgTypeField.getValue() != null && disableForMessageTypes.contains(msgTypeField.getValue())) {
LOGGER.info("Strategy '{}' is disabled for {} message type", strategy.getType(), msgTypeField.getValue());
return null;
}
String msgType = msgTypeField.getValue();
if(ADMIN_MESSAGES.contains(msgType) && !Objects.equals(msgType, MSG_TYPE_HEARTBEAT)) {
return null;
}

StrategyState state = strategy.getState();
if(state.getMessageCorrupted().get()) {
return null;
}

AdjustSendingTimeConfiguration config = strategy.getAdjustSendingTimeConfiguration();
metadata.put("sendingTimeUpdated", "Y");
metadata.put("sendingTimeUpdateSeconds", Long.toString(config.getAdjustDuration().toSeconds()));

updateSendingTime(message, config.getAdjustDuration(), config.getSubstract());

return null;
}

private Map<String, String> fakeRetransmissionOutgoingProcessor(ByteBuf message, Map<String, String> metadata) {
onOutgoingUpdateTag(message, metadata);

Expand Down Expand Up @@ -1744,7 +1818,7 @@ private Map<String, String> blockReceiveQueue(ByteBuf message, Map<String, Strin
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime <= timeToBlock) {
try {
Thread.sleep(10);
Thread.sleep(100);
} catch (Exception e) {
LOGGER.error("Error while blocking receive.", e);
}
Expand Down Expand Up @@ -2272,6 +2346,32 @@ private void cleanupSplitSendStrategy() {
ruleEndEvent(strategy.getType(), strategy.getStartTime(), strategy.getState().getMessageIDs());
strategy.cleanupStrategy();
}

private void setupCorruptMessageStructureStrategy(RuleConfiguration configuration) {
strategy.resetStrategyAndState(configuration);
strategy.updateOutgoingMessageStrategy(x -> {x.setOutgoingMessageProcessor(this::corruptMessageStructureProcessor); return Unit.INSTANCE;});
strategy.setCleanupHandler(this::cleanUpCorruptMessageStructureStrategy);
ruleStartEvent(strategy.getType(), strategy.getStartTime());
}

private void cleanUpCorruptMessageStructureStrategy() {
strategy.updateOutgoingMessageStrategy(x -> {x.setOutgoingMessageProcessor(this::defaultOutgoingStrategy); return Unit.INSTANCE;});
ruleEndEvent(strategy.getType(), strategy.getStartTime(), strategy.getState().getMessageIDs());
strategy.cleanupStrategy();
}

private void setupAdjustSendingTimeStrategy(RuleConfiguration configuration) {
strategy.resetStrategyAndState(configuration);
strategy.updateOutgoingMessageStrategy(x -> {x.setOutgoingMessageProcessor(this::adjustSendingTimeStrategyProcessor); return Unit.INSTANCE;});
strategy.setCleanupHandler(this::cleanUpAdjustSendingTimeStrategy);
ruleStartEvent(strategy.getType(), strategy.getStartTime());
}

private void cleanUpAdjustSendingTimeStrategy() {
strategy.updateOutgoingMessageStrategy(x -> {x.setOutgoingMessageProcessor(this::defaultOutgoingStrategy); return Unit.INSTANCE;});
ruleEndEvent(strategy.getType(), strategy.getStartTime(), strategy.getState().getMessageIDs());
strategy.cleanupStrategy();
}
// </editor-fold>

private Map<String, String> defaultMessageProcessor(ByteBuf message, Map<String, String> metadata) {return null;}
Expand Down Expand Up @@ -2352,6 +2452,8 @@ private Consumer<RuleConfiguration> getSetupFunction(RuleConfiguration config) {
case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy;
case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages;
case LOGON_FROM_ANOTHER_CONNECTION: return this::runLogonFromAnotherConnection;
case CORRUPT_MESSAGE_STRUCTURE: return this::setupCorruptMessageStructureStrategy;
case ADJUST_SENDING_TIME: return this::setupAdjustSendingTimeStrategy;
case DEFAULT: return configuration -> strategy.cleanupStrategy();
default: throw new IllegalStateException(String.format("Unknown strategy type %s.", config.getRuleType()));
}
Expand Down Expand Up @@ -2473,6 +2575,18 @@ private void updateSendingTime(ByteBuf buf) {
sendingTime.setValue(getTime());
}

private void updateSendingTime(ByteBuf buf, Duration adjustDuration, boolean substract) {
FixField sendingTime = Objects.requireNonNull(findField(buf, SENDING_TIME_TAG));
DateTimeFormatter formatter = settings.getSendingDateTimeFormat();
LocalDateTime datetime = LocalDateTime.now(ZoneOffset.UTC);
if(substract) {
datetime.minus(adjustDuration);
} else {
datetime.plus(adjustDuration);
}
sendingTime.setValue(formatter.format(datetime));
}

private void setTime(ByteBuf buf) {
FixField sendingTime = findField(buf, SENDING_TIME_TAG);
FixField seqNum = requireNonNull(findField(buf, MSG_SEQ_NUM_TAG), "SeqNum field was null.");
Expand Down Expand Up @@ -2520,7 +2634,7 @@ private void waitUntilLoggedIn() {
while (!enabled.get() && System.currentTimeMillis() - start < 2000) {
LOGGER.info("Waiting until session will be logged in: {}", channel.getSessionAlias());
try {
Thread.sleep(10);
Thread.sleep(100);
} catch (Exception e) {
LOGGER.error("Error while waiting session login.", e);
}
Expand All @@ -2533,7 +2647,7 @@ private void waitLogoutResponse() {
if (LOGGER.isWarnEnabled()) LOGGER.warn("Waiting session logout: {}", channel.getSessionAlias());
try {
//noinspection BusyWait
Thread.sleep(10);
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Error while sleeping.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.conn.dirty.fix

import io.netty.buffer.ByteBuf
import mu.KotlinLogging
import kotlin.text.Charsets.US_ASCII

enum class HeaderPosition {
INSIDE_BODY,
AFTER_BODY,
INSIDE_TRAILER,
AFTER_TRAILER
}

enum class TrailerPosition {
BEFORE_HEADER,
INSIDE_HEADER,
AFTER_HEADER,
INSIDE_BODY
}

class FIXMessageStructureMutator(
private val headerTags: Set<Int>,
private val trailerTags: Set<Int>
) {

fun moveHeader(dst: HeaderPosition, message: ByteBuf): ByteBuf {
val condition: (FixField) -> Boolean = {headerTags.contains(it.tag)}

val field = when (dst) {
HeaderPosition.INSIDE_BODY -> getBodyFirstTag(message)
HeaderPosition.AFTER_BODY -> getBodyLastTag(message)
HeaderPosition.INSIDE_TRAILER -> getTrailerFirstTag(message)
HeaderPosition.AFTER_TRAILER -> getTrailerLastTag(message)
}

if (field?.tag == null) {
K_LOGGER.warn { "Move header operation failed. Not found position to insert for $dst in message: ${message.toString(
US_ASCII)}" }
return message
}

moveBlockAfter(field.tag!!, message, condition)
return message
}

fun moveTrailer(dst: TrailerPosition, message: ByteBuf): ByteBuf {
val condition: (FixField) -> Boolean = {trailerTags.contains(it.tag)}

val field = when(dst) {
TrailerPosition.BEFORE_HEADER -> getHeaderFirstTag(message)
TrailerPosition.INSIDE_HEADER -> getHeaderFirstTag(message)
TrailerPosition.AFTER_HEADER -> getHeaderLastTag(message)
TrailerPosition.INSIDE_BODY -> getBodyFirstTag(message)
}

if (field?.tag == null) {
K_LOGGER.warn { "Move trailer operation failed. Not found position to insert for $dst in message: ${message.toString(
US_ASCII)}" }
return message
}
if(dst == TrailerPosition.BEFORE_HEADER) {
moveBlockBefore(field.tag!!, message, condition)
} else {
moveBlockAfter(field.tag!!, message, condition)
}
return message
}

private fun moveBlockAfter(afterTag: Int, message: ByteBuf, condition: (FixField) -> Boolean) {
val toMove = mutableListOf<Pair<Int, String>>()
message.forEachField {
if(condition(it)) {
toMove.add(Pair(it.tag!!, it.value!!))
it.clear()
}
}

val afterField = message.findField(afterTag)

if (afterField == null) {
K_LOGGER.warn { "Something went wrong while moving block. Tag $afterTag is missing from message. Impossible to move block." }
return
}

var previous: FixField = afterField
toMove.forEach {
previous = previous.insertNext(it.first, it.second)
}
}

private fun moveBlockBefore(beforeTag: Int, message: ByteBuf, condition: (FixField) -> Boolean) {
val toMove = mutableListOf<Pair<Int, String>>()
message.forEachField {
if(condition(it)) {
toMove.add(0, Pair(it.tag!!, it.value!!))
it.clear()
}
}

val afterField = message.findField(beforeTag)

if (afterField == null) {
K_LOGGER.warn { "Something went wrong while moving block. Tag $beforeTag is missing from message. Impossible to move block." }
return
}

var previous: FixField = afterField
toMove.forEach {
previous = previous.insertPrevious(it.first, it.second)
}
}

private fun getBodyFirstTag(message: ByteBuf): FixField? = findFirstTag(message) { !headerTags.contains(it.tag) }
private fun getBodyLastTag(message: ByteBuf): FixField? = findLastTag(message) { !headerTags.contains(it.tag) && !trailerTags.contains(it.tag) }

private fun getTrailerFirstTag(message: ByteBuf): FixField? = findFirstTag(message) { trailerTags.contains(it.tag) }
private fun getTrailerLastTag(message: ByteBuf): FixField? = findLastTag(message) { trailerTags.contains(it.tag) }

private fun getHeaderFirstTag(message: ByteBuf): FixField? = findFirstTag(message) { headerTags.contains(it.tag) }
private fun getHeaderLastTag(message: ByteBuf): FixField? = findLastTag(message) { headerTags.contains(it.tag) }

private fun findFirstTag(message: ByteBuf, condition: (FixField) -> Boolean): FixField? {
message.forEachField {
if(condition(it)) {
return it
}
}
return null
}

private fun findLastTag(message: ByteBuf, condition: (FixField) -> Boolean): FixField? {
var lastField: FixField? = null
message.forEachField {
if(condition(it)) {
lastField = it
}
}
return lastField
}

companion object {
val K_LOGGER = KotlinLogging.logger { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.conn.dirty.fix.brokenconn.configuration

import java.time.Duration

data class AdjustSendingTimeConfiguration(
val adjustDuration: Duration,
val substract: Boolean = false
)
Loading

0 comments on commit 1e30b69

Please sign in to comment.