Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TS-2413] Provided ability to disable transformation strategies #7

Merged
merged 5 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.3.0)
# th2-conn-dirty-fix (1.4.0)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -333,8 +333,13 @@ spec:
memory: 100Mi
cpu: 20m
```
## 1.4.0

* Provided ability to disable transformation strategies: `TRANSFORM_MESSAGE_STRATEGY`, `INVALID_CHECKSUM`, `FAKE_RETRANSMISSION` for raw message types specified in the `disableForMessageTypes` property

## 1.3.0

* Fixed the problem long recovery in case of mixing recovery message with non-recovery messages
* Migrated to th2 gradle plugin `0.1.1`
* Updated:
* bom: `4.6.1`
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version=1.3.0
release_version=1.4.0
25 changes: 19 additions & 6 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -1391,13 +1392,18 @@ private CompletableFuture<MessageID> splitSend(IChannel channel, ByteBuf message
}

// TODO: Add simplified configuration
private Map<String, String> transformProcessor(
private void transformProcessor(
ByteBuf message,
Map<String, String> metadata
) {
FixField msgTypeField = findField(message, MSG_TYPE_TAG, US_ASCII);
if(msgTypeField == null || msgTypeField.getValue() == null) {
return null;
return;
}
Set<String> disableForMessageTypes = strategy.getDisableForMessageTypes();
if (disableForMessageTypes.contains(msgTypeField.getValue())) {
LOGGER.info("Strategy '{}' is disabled for {} message type", strategy.getType(), msgTypeField.getValue());
return;
}

TransformMessageConfiguration config = strategy.getTransformMessageConfiguration();
Expand All @@ -1406,7 +1412,7 @@ private Map<String, String> transformProcessor(
if(!msgTypeField.getValue().equals(transformation.getMessageType())) {
if(!transformation.getAnyMessageType()) {
config.decreaseCounter();
return null;
return;
}
}

Expand Down Expand Up @@ -1493,7 +1499,6 @@ private Map<String, String> transformProcessor(
}
);

return null;
}

private Map<String, String>
Expand Down Expand Up @@ -1573,6 +1578,13 @@ private Map<String, String> transformOutgoingMessageStrategy(ByteBuf message, Ma
private Map<String, String> fakeRetransmissionOutgoingProcessor(ByteBuf message, Map<String, String> metadata) {
onOutgoingUpdateTag(message, metadata);

Set<String> disableForMessageTypes = strategy.getDisableForMessageTypes();
FixField msgTypeField = findField(message, MSG_TYPE_TAG, US_ASCII);
if(msgTypeField != null && msgTypeField.getValue() != null && disableForMessageTypes.contains(msgTypeField.getValue())) {
LOGGER.info("Strategy '{}' is disabled for {} message type", strategy.getType(), msgTypeField.getValue());
return null;
}

FixField sendingTime = requireNonNull(findField(message, SENDING_TIME_TAG));
strategy.getState().addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> true);

Expand Down Expand Up @@ -2239,13 +2251,14 @@ private Consumer<RuleConfiguration> getSetupFunction(RuleConfiguration config) {
case SEQUENCE_RESET: return this::runReconnectWithSequenceResetStrategy;
case SEND_SEQUENCE_RESET: return this::sendSequenceReset;
case TRANSFORM_LOGON: return this::setupTransformStrategy;
case TRANSFORM_MESSAGE_STRATEGY: return this::setupTransformMessageStrategy;
case TRANSFORM_MESSAGE_STRATEGY:
case INVALID_CHECKSUM:
return this::setupTransformMessageStrategy;
case CREATE_OUTGOING_GAP: return this::setupOutgoingGapStrategy;
case PARTIAL_CLIENT_OUTAGE: return this::setupPartialClientOutageStrategy;
case IGNORE_INCOMING_MESSAGES: return this::setupIgnoreIncomingMessagesStrategy;
case DISCONNECT_WITH_RECONNECT: return this::setupDisconnectStrategy;
case FAKE_RETRANSMISSION: return this::setupFakeRetransmissionStrategy;
case INVALID_CHECKSUM: return this::setupTransformMessageStrategy;
case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy;
case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages;
case LOGON_FROM_ANOTHER_CONNECTION: return this::runLogonFromAnotherConnection;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,8 @@ data class RuleConfiguration(
val splitSendConfiguration: SplitSendConfiguration? = null,
val changeSequenceConfiguration: ChangeSequenceConfiguration? = null,
val resendRequestConfiguration: ResendRequestConfiguration? = null,
val sendSequenceResetConfiguration: SendSequenceResetConfiguration? = null
val sendSequenceResetConfiguration: SendSequenceResetConfiguration? = null,
val disableForMessageTypes: Set<String> = setOf("q") // Order Mass Cansel Request (q) message shouldn't be transformed
) {
init {
when(ruleType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class StatefulStrategy(
get() = lock.read { state.config?.missIncomingMessagesConfiguration ?: error("Miss incoming messages config isn't present.") }
val missOutgoingMessagesConfiguration: MissMessageConfiguration
get() = lock.read { state.config?.missOutgoingMessagesConfiguration ?: error("Miss outgoing messages config isn't present.") }
val disableForMessageTypes: Set<String>
get() = lock.read { state.config?.disableForMessageTypes ?: emptySet() }
val transformMessageConfiguration: TransformMessageConfiguration
get() = lock.read { state.config?.transformMessageConfiguration ?: error("Transform message config isn't present.") }
val batchSendConfiguration: BatchSendConfiguration
Expand Down