Skip to content

Commit

Permalink
[TS-2459] Long recovery in case of mixing recovery message with non-r…
Browse files Browse the repository at this point in the history
…ecovery messages (#6)

* Updated libraries
  • Loading branch information
Nikita-Smirnov-Exactpro authored Aug 20, 2024
1 parent 8c03151 commit 5970fa3
Show file tree
Hide file tree
Showing 15 changed files with 1,610 additions and 1,013 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM gradle:7.6-jdk11 AS build
FROM gradle:8.7-jdk11 AS build
ARG release_version
COPY ./ .
RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_version}
Expand Down
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.2.1)
# th2-conn-dirty-fix (1.3.0)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -333,6 +333,18 @@ spec:
memory: 100Mi
cpu: 20m
```
## 1.3.0

* Migrated to th2 gradle plugin `0.1.1`
* Updated:
* bom: `4.6.1`
* common: `5.13.1-dev`
* common-utils: `2.2.3-dev`
* conn-dirty-tcp-core: `3.6.0-dev`
* grpc-lw-data-provider: `2.3.1-dev`
* httpclient5: `5.3.1`
* auto-service: `1.1.1`
* kotlin-logging: `3.0.5`

## 1.2.1

Expand Down
118 changes: 19 additions & 99 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import com.github.jk1.license.filter.LicenseBundleNormalizer
import com.github.jk1.license.render.JsonReportRenderer

plugins {
id 'java'
id "application"
id "com.exactpro.th2.gradle.component" version "0.1.1"
id 'org.jetbrains.kotlin.jvm' version '1.8.22'
id 'com.palantir.docker' version '0.25.0'
id "org.owasp.dependencycheck" version "8.3.1"
id "com.gorylenko.gradle-git-properties" version "2.4.1"
id 'com.github.jk1.dependency-license-report' version '2.5'
id "de.undercouch.download" version "5.4.0"
id "org.jetbrains.kotlin.kapt" version "1.8.22"
}

apply plugin: 'application'
apply plugin: 'com.palantir.docker'
apply plugin: 'kotlin-kapt'

group 'com.exactpro.th2'
version release_version

sourceCompatibility = 11
targetCompatibility = 11
kotlin {
jvmToolchain(11)
}

repositories {
mavenCentral()
Expand All @@ -45,41 +34,34 @@ repositories {
}

dependencies {
api platform('com.exactpro.th2:bom:4.5.0')

implementation("com.exactpro.th2:common:5.4.0-dev") {
implementation("com.exactpro.th2:common:5.13.1-dev") {
exclude group: 'com.exactpro.th2', module: 'task-utils'
}
implementation group: 'com.exactpro.th2', name: 'common-utils', version: '2.2.1-dev'

implementation "com.exactpro.th2:common-utils:2.2.3-dev"
implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1'
implementation 'net.lingala.zip4j:zip4j:2.11.5'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'

implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'

implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.4.0-dev'
implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-dev'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
implementation'com.exactpro.th2:conn-dirty-tcp-core:3.6.0-dev'
implementation 'com.exactpro.th2:grpc-lw-data-provider:2.3.1-dev'

implementation 'org.slf4j:slf4j-api'
implementation 'io.github.microutils:kotlin-logging:3.0.0' // The last version bases on kotlin 1.6.0
implementation 'io.github.microutils:kotlin-logging:3.0.5'
implementation 'org.apache.commons:commons-lang3'

implementation 'io.netty:netty-all'
implementation 'com.google.auto.service:auto-service:1.0.1'
implementation 'com.google.auto.service:auto-service:1.1.1'

implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.module:jackson-module-kotlin'

testImplementation 'org.mockito:mockito-core:5.4.0'
testImplementation 'org.mockito:mockito-all:1.10.19'
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.8.10'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.9.3'
testImplementation 'org.mockito.kotlin:mockito-kotlin:4.1.0'
testImplementation 'org.mockito:mockito-core:5.12.0'
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.8.22'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.3'
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.3.1'

annotationProcessor 'com.google.auto.service:auto-service:1.0.1'
kapt 'com.google.auto.service:auto-service:1.0.1'
annotationProcessor 'com.google.auto.service:auto-service:1.1.1'
kapt 'com.google.auto.service:auto-service:1.1.1'
}

test {
Expand All @@ -90,70 +72,8 @@ application {
mainClass.set('com.exactpro.th2.conn.dirty.tcp.core.Main')
}

applicationName = 'service'

distTar {
archiveFileName.set("${applicationName}.tar")
}

dockerPrepare {
dependsOn distTar
}

docker {
copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar"))
}

tasks.withType(KotlinCompile).configureEach {
compilerOptions {
jvmTarget.set(JvmTarget.JVM_11)
}
}

test {
testLogging {
events "passed", "skipped", "failed"
// You can adjust the logging level as needed
exceptionFormat "full"
showStandardStreams = true
}
}

dependencyCheck {
formats=['SARIF', 'JSON', 'HTML']
failBuildOnCVSS=5
suppressionFile="suppressions.xml"

//FIXME: we should check all used dependencies
skipConfigurations = ['kapt', 'kaptClasspath_kaptKotlin', 'kaptTest', 'kaptTestFixtures', 'annotationProcessor']
analyzers {
assemblyEnabled = false
nugetconfEnabled = false
nodeEnabled = false
}
}

dependencyLocking {
lockAllConfigurations()
}

licenseReport {
def licenseNormalizerBundlePath = "$buildDir/license-normalizer-bundle.json"

if (!file(licenseNormalizerBundlePath).exists()) {
download.run {
src 'https://raw.githubusercontent.com/th2-net/.github/main/license-compliance/gradle-license-report/license-normalizer-bundle.json'
dest "$buildDir/license-normalizer-bundle.json"
overwrite false
}
}

filters = [
new LicenseBundleNormalizer(licenseNormalizerBundlePath, false)
]
renderers = [
new JsonReportRenderer('licenses.json', false),
]
excludeOwnGroup = false
allowedLicensesFile = new URL("https://raw.githubusercontent.com/th2-net/.github/main/license-compliance/gradle-license-report/allowed-licenses.json")
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version=1.2.1
release_version=1.3.0
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
zipStorePath=wrapper/dists
62 changes: 29 additions & 33 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -96,15 +105,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.exactpro.th2.common.event.EventUtils.createMessageBean;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField;
Expand Down Expand Up @@ -181,7 +181,6 @@
import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.asExpandable;
import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.indexOf;
import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.isEmpty;
import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.startsWith;
import static com.exactpro.th2.util.MessageUtil.findByte;
import static com.exactpro.th2.util.MessageUtil.getBodyLength;
import static com.exactpro.th2.util.MessageUtil.getChecksum;
Expand Down Expand Up @@ -392,11 +391,11 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
try {
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (TimeoutException e) {
ExceptionUtils.rethrow(new TimeoutException(
ExceptionUtils.asRuntimeException(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
currentTimeout)));
} catch (Exception e) {
ExceptionUtils.rethrow(e);
ExceptionUtils.asRuntimeException(e);
}
}

Expand All @@ -411,7 +410,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
}
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
}
}
Expand All @@ -426,7 +425,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
}
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
}
}
Expand Down Expand Up @@ -496,7 +495,7 @@ public ByteBuf onReceive(@NotNull IChannel channel, @NotNull ByteBuf buffer) {

@NotNull
@Override
public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, MessageID messageId) {
public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull MessageID messageId) {
Map<String, String> metadata = new HashMap<>();

StrategyState state = strategy.getState();
Expand Down Expand Up @@ -537,7 +536,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
if(msgTypeValue.equals(MSG_TYPE_LOGOUT)) {
serverMsgSeqNum.incrementAndGet();
state.addMessageID(messageId);
strategy.getIncomingMessageStrategy(x -> x.getLogoutStrategy()).process(message, metadata);
strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getLogoutStrategy).process(message, metadata);
return metadata;
}

Expand Down Expand Up @@ -759,11 +758,7 @@ private void resetSequence(ByteBuf message) {
} else {
int newSeqNo = Integer.parseInt(requireNonNull(seqNumValue.getValue()));
serverMsgSeqNum.updateAndGet(sequence -> {
if(sequence < newSeqNo - 1) {
return newSeqNo - 1;
} else {
return sequence;
}
return Math.max(sequence, newSeqNo - 1);
});
}
}
Expand Down Expand Up @@ -850,7 +845,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
}

AtomicBoolean skip = new AtomicBoolean(recoveryConfig.getOutOfOrder());
AtomicReference<ByteBuf> skipped = new AtomicReference(null);
AtomicReference<ByteBuf> skipped = new AtomicReference<>(null);

int endSeq = endSeqNo;
LOGGER.info("Loading messages from {} to {}", beginSeqNo, endSeqNo);
Expand Down Expand Up @@ -1596,7 +1591,7 @@ private Map<String, String> gapFillSequenceReset(ByteBuf message, Map<String, St
onOutgoingUpdateTag(message, metadata);
FixField msgType = findField(message, MSG_TYPE_TAG, US_ASCII);

if(msgType == null || !msgType.getValue().equals(MSG_TYPE_SEQUENCE_RESET)) return null;
if(msgType == null || !Objects.equals(msgType.getValue(), MSG_TYPE_SEQUENCE_RESET)) return null;

if(resendRequestConfig.getGapFill()) return null;

Expand All @@ -1613,11 +1608,13 @@ private Map<String, String> missOutgoingMessages(ByteBuf message, Map<String, St
int countToMiss = strategy.getMissOutgoingMessagesConfiguration().getCount();
var strategyState = strategy.getState();
onOutgoingUpdateTag(message, metadata);
if(!strategyState.addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> x <= countToMiss)) {
return null;
if(strategyState.addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> x <= countToMiss)) {
message.clear();
}
if(strategy.getAllowMessagesBeforeRetransmissionFinishes()
&& Duration.between(strategy.getStartTime(), Instant.now()).compareTo(strategy.getConfig().getDuration()) > 0 ) {
strategy.disableAllowMessagesBeforeRetransmissionFinishes("after " + strategy.getConfig().getDuration() + " strategy duration");
}

message.clear();

return null;
}
Expand Down Expand Up @@ -1772,9 +1769,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
ByteBuf logonBuf = Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8));

channel.send(logonBuf, strategy.getState().enrichProperties(props), null, SendMode.DIRECT_MQ)
.thenAcceptAsync(x -> {
strategy.getState().addMessageID(x);
}, executorService);
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);

boolean logonSent = false;
boolean responseReceived = true;
Expand All @@ -1783,7 +1778,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
try(
Socket socket = new Socket(address.getAddress(), address.getPort());
DataOutputStream dOut = new DataOutputStream(socket.getOutputStream());
DataInputStream dIn = new DataInputStream(socket.getInputStream());
DataInputStream dIn = new DataInputStream(socket.getInputStream())
){
socket.setSoTimeout(5000);

Expand Down Expand Up @@ -1817,7 +1812,8 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
LOGGER.info("Waiting for 5 seconds to check if main session will be disconnected.");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error("Interrupted", e);
Thread.currentThread().interrupt();
}

HashMap<String, Object> additionalDetails = new HashMap<>();
Expand Down Expand Up @@ -2228,7 +2224,7 @@ private void applyNextStrategy() {
ruleErrorEvent(nextStrategyConfig.getRuleType(), null, e);
}

LOGGER.info("Next strategy applied: {}", nextStrategyConfig.getRuleType());
LOGGER.info("Next strategy applied: {}, duration: {}", nextStrategyConfig.getRuleType(), nextStrategyConfig.getDuration());
executorService.schedule(this::applyNextStrategy, nextStrategyConfig.getDuration().toMillis(), TimeUnit.MILLISECONDS);
}

Expand Down
Loading

0 comments on commit 5970fa3

Please sign in to comment.