Skip to content

Commit

Permalink
[TS-2413] Provided ability to disable transformation strategies: `TRA…
Browse files Browse the repository at this point in the history
…NSFORM_MESSAGE_STRATEGY`, `INVALID_CHECKSUM`, `FAKE_RETRANSMISSION` for raw message types specified in the `disableForMessageTypes` property
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jul 11, 2024
1 parent 8c03151 commit be494c8
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 15 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.2.1)
# th2-conn-dirty-fix (1.3.0)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -334,6 +334,10 @@ spec:
cpu: 20m
```

## 1.3.0

* Provided ability to disable transformation strategies: `TRANSFORM_MESSAGE_STRATEGY`, `INVALID_CHECKSUM`, `FAKE_RETRANSMISSION` for raw message types specified in the `disableForMessageTypes` property

## 1.2.1

* Property `th2.broken.strategy` is added to metadata to each message when a strategy is active
Expand Down
23 changes: 16 additions & 7 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -1396,13 +1397,15 @@ private CompletableFuture<MessageID> splitSend(IChannel channel, ByteBuf message
}

// TODO: Add simplified configuration
private Map<String, String> transformProcessor(
private void transformProcessor(
ByteBuf message,
Map<String, String> metadata
) {
Set<String> disableForMessageTypes = strategy.getDisableForMessageTypes();

FixField msgTypeField = findField(message, MSG_TYPE_TAG, US_ASCII);
if(msgTypeField == null || msgTypeField.getValue() == null) {
return null;
if(msgTypeField == null || msgTypeField.getValue() == null || disableForMessageTypes.contains(msgTypeField.getValue())) {
return;
}

TransformMessageConfiguration config = strategy.getTransformMessageConfiguration();
Expand All @@ -1411,7 +1414,7 @@ private Map<String, String> transformProcessor(
if(!msgTypeField.getValue().equals(transformation.getMessageType())) {
if(!transformation.getAnyMessageType()) {
config.decreaseCounter();
return null;
return;
}
}

Expand Down Expand Up @@ -1498,7 +1501,6 @@ private Map<String, String> transformProcessor(
}
);

return null;
}

private Map<String, String>
Expand Down Expand Up @@ -1578,6 +1580,12 @@ private Map<String, String> transformOutgoingMessageStrategy(ByteBuf message, Ma
private Map<String, String> fakeRetransmissionOutgoingProcessor(ByteBuf message, Map<String, String> metadata) {
onOutgoingUpdateTag(message, metadata);

Set<String> disableForMessageTypes = strategy.getDisableForMessageTypes();
FixField msgTypeField = findField(message, MSG_TYPE_TAG, US_ASCII);
if(msgTypeField != null && msgTypeField.getValue() != null && disableForMessageTypes.contains(msgTypeField.getValue())) {
return null;
}

FixField sendingTime = requireNonNull(findField(message, SENDING_TIME_TAG));
strategy.getState().addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> true);

Expand Down Expand Up @@ -2243,13 +2251,14 @@ private Consumer<RuleConfiguration> getSetupFunction(RuleConfiguration config) {
case SEQUENCE_RESET: return this::runReconnectWithSequenceResetStrategy;
case SEND_SEQUENCE_RESET: return this::sendSequenceReset;
case TRANSFORM_LOGON: return this::setupTransformStrategy;
case TRANSFORM_MESSAGE_STRATEGY: return this::setupTransformMessageStrategy;
case TRANSFORM_MESSAGE_STRATEGY:
case INVALID_CHECKSUM:
return this::setupTransformMessageStrategy;
case CREATE_OUTGOING_GAP: return this::setupOutgoingGapStrategy;
case PARTIAL_CLIENT_OUTAGE: return this::setupPartialClientOutageStrategy;
case IGNORE_INCOMING_MESSAGES: return this::setupIgnoreIncomingMessagesStrategy;
case DISCONNECT_WITH_RECONNECT: return this::setupDisconnectStrategy;
case FAKE_RETRANSMISSION: return this::setupFakeRetransmissionStrategy;
case INVALID_CHECKSUM: return this::setupTransformMessageStrategy;
case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy;
case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages;
case LOGON_FROM_ANOTHER_CONNECTION: return this::runLogonFromAnotherConnection;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,8 @@ data class RuleConfiguration(
val splitSendConfiguration: SplitSendConfiguration? = null,
val changeSequenceConfiguration: ChangeSequenceConfiguration? = null,
val resendRequestConfiguration: ResendRequestConfiguration? = null,
val sendSequenceResetConfiguration: SendSequenceResetConfiguration? = null
val sendSequenceResetConfiguration: SendSequenceResetConfiguration? = null,
val disableForMessageTypes: Set<String> = setOf("q") // Order Mass Cansel Request (q) message shouldn't be transformed
) {
init {
when(ruleType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,22 +16,18 @@
package com.exactpro.th2.conn.dirty.fix.brokenconn.strategy

import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BatchSendConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BlockMessageConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.MissMessageConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RecoveryConfig
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SplitSendConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformMessageConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyState.Companion.resetAndCopyMissedMessages
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.CleanupHandler
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.MessageProcessor
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.OnCloseHandler
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.RecoveryHandler
import java.time.Instant
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.withLock
import kotlin.concurrent.write

class StatefulStrategy(
Expand All @@ -56,6 +52,9 @@ class StatefulStrategy(
var missOutgoingMessagesConfiguration: MissMessageConfiguration? = null
get() = state.config?.missOutgoingMessagesConfiguration ?: error("Miss outgoing messages config isn't present.")
private set
var disableForMessageTypes: Set<String> = emptySet()
get() = state.config?.disableForMessageTypes ?: error("Disable for message types isn't present.")
private set
var transformMessageConfiguration: TransformMessageConfiguration? = null
get() = state.config?.transformMessageConfiguration ?: error("Transform message config isn't present.")
private set
Expand Down

0 comments on commit be494c8

Please sign in to comment.