Skip to content

Commit

Permalink
[TS-2459] Refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jul 9, 2024
1 parent c86a82d commit 9faa0f3
Show file tree
Hide file tree
Showing 11 changed files with 568 additions and 286 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.2.1)
# th2-conn-dirty-fix (1.3.0)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -335,7 +335,7 @@ spec:
```
## 1.3.0

* Migrated to th2 gradle plugin `0.0.8`
* Migrated to th2 gradle plugin `0.1.0`
* Updated:
* bom: `4.6.1`
* common: `5.13.1-dev`
Expand Down
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id "application"
id "com.exactpro.th2.gradle.component" version "0.0.8"
id "com.exactpro.th2.gradle.component" version "0.1.0"
id 'org.jetbrains.kotlin.jvm' version '1.8.22'
id "org.jetbrains.kotlin.kapt" version "1.8.22"
}
Expand Down Expand Up @@ -41,7 +41,6 @@ dependencies {
implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1'
implementation 'net.lingala.zip4j:zip4j:2.11.5'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
// implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
implementation'com.exactpro.th2:conn-dirty-tcp-core:3.6.0-dev'
implementation 'com.exactpro.th2:grpc-lw-data-provider:2.3.1-dev'

Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
41 changes: 19 additions & 22 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,11 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
try {
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (TimeoutException e) {
ExceptionUtils.rethrow(new TimeoutException(
ExceptionUtils.asRuntimeException(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
currentTimeout)));
} catch (Exception e) {
ExceptionUtils.rethrow(e);
ExceptionUtils.asRuntimeException(e);
}
}

Expand All @@ -410,7 +410,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
}
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
}
}
Expand All @@ -425,7 +425,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
}
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
}
}
Expand Down Expand Up @@ -495,7 +495,7 @@ public ByteBuf onReceive(@NotNull IChannel channel, @NotNull ByteBuf buffer) {

@NotNull
@Override
public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, MessageID messageId) {
public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull MessageID messageId) {
Map<String, String> metadata = new HashMap<>();

StrategyState state = strategy.getState();
Expand Down Expand Up @@ -536,7 +536,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
if(msgTypeValue.equals(MSG_TYPE_LOGOUT)) {
serverMsgSeqNum.incrementAndGet();
state.addMessageID(messageId);
strategy.getIncomingMessageStrategy(x -> x.getLogoutStrategy()).process(message, metadata);
strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getLogoutStrategy).process(message, metadata);
return metadata;
}

Expand Down Expand Up @@ -758,11 +758,7 @@ private void resetSequence(ByteBuf message) {
} else {
int newSeqNo = Integer.parseInt(requireNonNull(seqNumValue.getValue()));
serverMsgSeqNum.updateAndGet(sequence -> {
if(sequence < newSeqNo - 1) {
return newSeqNo - 1;
} else {
return sequence;
}
return Math.max(sequence, newSeqNo - 1);
});
}
}
Expand Down Expand Up @@ -849,7 +845,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
}

AtomicBoolean skip = new AtomicBoolean(recoveryConfig.getOutOfOrder());
AtomicReference<ByteBuf> skipped = new AtomicReference(null);
AtomicReference<ByteBuf> skipped = new AtomicReference<>(null);

int endSeq = endSeqNo;
LOGGER.info("Loading messages from {} to {}", beginSeqNo, endSeqNo);
Expand Down Expand Up @@ -1595,7 +1591,7 @@ private Map<String, String> gapFillSequenceReset(ByteBuf message, Map<String, St
onOutgoingUpdateTag(message, metadata);
FixField msgType = findField(message, MSG_TYPE_TAG, US_ASCII);

if(msgType == null || !msgType.getValue().equals(MSG_TYPE_SEQUENCE_RESET)) return null;
if(msgType == null || !Objects.equals(msgType.getValue(), MSG_TYPE_SEQUENCE_RESET)) return null;

if(resendRequestConfig.getGapFill()) return null;

Expand All @@ -1612,11 +1608,13 @@ private Map<String, String> missOutgoingMessages(ByteBuf message, Map<String, St
int countToMiss = strategy.getMissOutgoingMessagesConfiguration().getCount();
var strategyState = strategy.getState();
onOutgoingUpdateTag(message, metadata);
if(!strategyState.addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> x <= countToMiss)) {
return null;
if(strategyState.addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> x <= countToMiss)) {
message.clear();
}
if(strategy.getAllowMessagesBeforeRetransmissionFinishes()
&& Duration.between(strategy.getStartTime(), Instant.now()).compareTo(strategy.getConfig().getDuration()) > 0 ) {
strategy.disableAllowMessagesBeforeRetransmissionFinishes("after " + strategy.getConfig().getDuration() + " strategy duration");
}

message.clear();

return null;
}
Expand Down Expand Up @@ -1771,9 +1769,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
ByteBuf logonBuf = Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8));

channel.send(logonBuf, strategy.getState().enrichProperties(props), null, SendMode.DIRECT_MQ)
.thenAcceptAsync(x -> {
strategy.getState().addMessageID(x);
}, executorService);
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);

boolean logonSent = false;
boolean responseReceived = true;
Expand All @@ -1782,7 +1778,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
try(
Socket socket = new Socket(address.getAddress(), address.getPort());
DataOutputStream dOut = new DataOutputStream(socket.getOutputStream());
DataInputStream dIn = new DataInputStream(socket.getInputStream());
DataInputStream dIn = new DataInputStream(socket.getInputStream())
){
socket.setSoTimeout(5000);

Expand Down Expand Up @@ -1816,7 +1812,8 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
LOGGER.info("Waiting for 5 seconds to check if main session will be disconnected.");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error("Interrupted", e);
Thread.currentThread().interrupt();
}

HashMap<String, Object> additionalDetails = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,22 +16,19 @@
package com.exactpro.th2.conn.dirty.fix.brokenconn.strategy

import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BatchSendConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BlockMessageConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.MissMessageConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RecoveryConfig
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SplitSendConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.TransformMessageConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyState.Companion.resetAndCopyMissedMessages
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.CleanupHandler
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.MessageProcessor
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.OnCloseHandler
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.RecoveryHandler
import mu.KotlinLogging
import java.time.Instant
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.withLock
import kotlin.concurrent.write

class StatefulStrategy(
Expand All @@ -47,52 +44,41 @@ class StatefulStrategy(
private val lock = ReentrantReadWriteLock()

// configurations
var config: RuleConfiguration? = null
get() = state.config ?: error("Rule configuration isn't present.")
private set
var missIncomingMessagesConfig: MissMessageConfiguration? = null
get() = state.config?.missIncomingMessagesConfiguration ?: error("Miss incoming messages config isn't present.")
private set
var missOutgoingMessagesConfiguration: MissMessageConfiguration? = null
get() = state.config?.missOutgoingMessagesConfiguration ?: error("Miss outgoing messages config isn't present.")
private set
var transformMessageConfiguration: TransformMessageConfiguration? = null
get() = state.config?.transformMessageConfiguration ?: error("Transform message config isn't present.")
private set
var batchSendConfiguration: BatchSendConfiguration? = null
get() = state.config?.batchSendConfiguration ?: error("batch send config isn't present.")
private set
var splitSendConfiguration: SplitSendConfiguration? = null
get() = state.config?.splitSendConfiguration ?: error("split send configuration isn't present.")
private set
val config: RuleConfiguration
get() = lock.read { state.config ?: error("Rule configuration isn't present.") }
val missIncomingMessagesConfig: MissMessageConfiguration
get() = lock.read { state.config?.missIncomingMessagesConfiguration ?: error("Miss incoming messages config isn't present.") }
val missOutgoingMessagesConfiguration: MissMessageConfiguration
get() = lock.read { state.config?.missOutgoingMessagesConfiguration ?: error("Miss outgoing messages config isn't present.") }
val transformMessageConfiguration: TransformMessageConfiguration
get() = lock.read { state.config?.transformMessageConfiguration ?: error("Transform message config isn't present.") }
val batchSendConfiguration: BatchSendConfiguration
get() = lock.read { state.config?.batchSendConfiguration ?: error("batch send config isn't present.") }
val splitSendConfiguration: SplitSendConfiguration
get() = lock.read { state.config?.splitSendConfiguration ?: error("split send configuration isn't present.") }

var allowMessagesBeforeLogon: Boolean = false
get() = state.config?.allowMessagesBeforeLogonReply ?: false
private set
val allowMessagesBeforeLogon: Boolean
get() = lock.read { state.config?.allowMessagesBeforeLogonReply ?: false }

var sendResendRequestOnLogonGap: Boolean = false
get() = state.config?.sendResendRequestOnLogonGap ?: false
private set
val sendResendRequestOnLogonGap: Boolean
get() = lock.read { state.config?.sendResendRequestOnLogonGap ?: false }

var allowMessagesBeforeRetransmissionFinishes: Boolean = false
get() = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false
private set
private var _allowMessagesBeforeRetransmissionFinishes: Boolean = false
val allowMessagesBeforeRetransmissionFinishes: Boolean
get() = lock.read { _allowMessagesBeforeRetransmissionFinishes }

var sendResendRequestOnLogoutReply: Boolean = false
get() = state.config?.sendResendRequestOnLogoutReply ?: false
private set
val sendResendRequestOnLogoutReply: Boolean
get() = lock.read {state.config?.sendResendRequestOnLogoutReply ?: false }

var increaseNextExpectedSequenceNumber: Boolean = false
get() = state.config?.increaseNextExpectedSequenceNumber ?: false
private set
val increaseNextExpectedSequenceNumber: Boolean
get() = lock.read {state.config?.increaseNextExpectedSequenceNumber ?: false }

var decreaseNextExpectedSequenceNumber: Boolean = false
get() = state.config?.decreaseNextExpectedSequenceNumber ?: false
private set
val decreaseNextExpectedSequenceNumber: Boolean
get() = lock.read {state.config?.decreaseNextExpectedSequenceNumber ?: false }

val recoveryConfig: RecoveryConfig
get() = lock.read { state.config?.recoveryConfig ?: RecoveryConfig() }

var recoveryConfig: RecoveryConfig = RecoveryConfig()
get() = state.config?.recoveryConfig ?: RecoveryConfig()
private set
// strategies
fun updateSendStrategy(func: SendStrategy.() -> Unit) = lock.write {
sendStrategy.func()
Expand Down Expand Up @@ -122,6 +108,11 @@ class StatefulStrategy(
receiveStrategy.func()
}

fun disableAllowMessagesBeforeRetransmissionFinishes(reason: String) = lock.write {
_allowMessagesBeforeRetransmissionFinishes = false
LOGGER.info("Disabled allow messages before retransmission finishes by the '$reason' reason")
}

fun <T> getReceiveMessageStrategy(func: ReceiveStrategy.() -> T) = lock.read {
receiveStrategy.func()
}
Expand Down Expand Up @@ -165,6 +156,7 @@ class StatefulStrategy(
fun resetStrategyAndState(config: RuleConfiguration) {
lock.write {
state = state.resetAndCopyMissedMessages(config)
_allowMessagesBeforeRetransmissionFinishes = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false
sendStrategy.sendHandler = defaultStrategy.sendStrategy.sendHandler
sendStrategy.sendPreprocessor = defaultStrategy.sendStrategy.sendPreprocessor
receiveStrategy.receivePreprocessor = defaultStrategy.receiveStrategy.receivePreprocessor
Expand All @@ -181,6 +173,7 @@ class StatefulStrategy(
fun cleanupStrategy() {
lock.write {
state = state.resetAndCopyMissedMessages()
_allowMessagesBeforeRetransmissionFinishes = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false
sendStrategy.sendHandler = defaultStrategy.sendStrategy.sendHandler
sendStrategy.sendPreprocessor = defaultStrategy.sendStrategy.sendPreprocessor
receiveStrategy.receivePreprocessor = defaultStrategy.receiveStrategy.receivePreprocessor
Expand All @@ -193,4 +186,8 @@ class StatefulStrategy(
onCloseHandler = defaultStrategy.closeHandler
}
}

companion object {
private val LOGGER = KotlinLogging.logger {}
}
}
12 changes: 6 additions & 6 deletions src/test/java/com/exactpro/th2/RecoveryTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@
package com.exactpro.th2;

import com.exactpro.th2.common.grpc.MessageID;
import com.exactpro.th2.conn.dirty.fix.MessageSearcher;
import com.exactpro.th2.conn.dirty.fix.TestMessageSearcher;
import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService;
import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse;
import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest;
Expand Down Expand Up @@ -54,7 +54,7 @@ void testSequenceResetInRange() {
FixHandlerSettings settings = createHandlerSettings();
settings.setLoadMissedMessagesFromCradle(true);
DataProviderService dataProviderService = Mockito.mock(DataProviderService.class);
MessageSearcher ms = new MessageSearcher(
TestMessageSearcher ms = new TestMessageSearcher(
List.of(
messageSearchResponse(2),
messageSearchResponse(3),
Expand Down Expand Up @@ -87,7 +87,7 @@ void testSequenceResetInsideRange() {
FixHandlerSettings settings = createHandlerSettings();
settings.setLoadMissedMessagesFromCradle(true);
DataProviderService dataProviderService = Mockito.mock(DataProviderService.class);
MessageSearcher ms = new MessageSearcher(
TestMessageSearcher ms = new TestMessageSearcher(
List.of(
messageSearchResponse(4),
messageSearchResponse(5)
Expand Down Expand Up @@ -143,7 +143,7 @@ void testSequenceResetOutOfRange() {
FixHandlerSettings settings = createHandlerSettings();
settings.setLoadMissedMessagesFromCradle(true);
DataProviderService dataProviderService = Mockito.mock(DataProviderService.class);
MessageSearcher ms = new MessageSearcher(
TestMessageSearcher ms = new TestMessageSearcher(
List.of(
messageSearchResponse(1),
messageSearchResponse(2),
Expand Down Expand Up @@ -177,7 +177,7 @@ void testSequenceResetAdminMessages() {
FixHandlerSettings settings = createHandlerSettings();
settings.setLoadMissedMessagesFromCradle(true);
DataProviderService dataProviderService = Mockito.mock(DataProviderService.class);
MessageSearcher ms = new MessageSearcher(
TestMessageSearcher ms = new TestMessageSearcher(
List.of(
messageSearchResponseAdmin(2),
messageSearchResponse(4),
Expand Down
Loading

0 comments on commit 9faa0f3

Please sign in to comment.