Skip to content

Commit

Permalink
[TS-2413] Merge remote-tracking branch 'origin/main' into TS-2413
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Sep 3, 2024
2 parents a44a48b + 5970fa3 commit 0aa9523
Show file tree
Hide file tree
Showing 11 changed files with 1,593 additions and 913 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,18 @@ spec:
memory: 100Mi
cpu: 20m
```
## 1.3.0

* Migrated to th2 gradle plugin `0.1.1`
* Updated:
* bom: `4.6.1`
* common: `5.13.1-dev`
* common-utils: `2.2.3-dev`
* conn-dirty-tcp-core: `3.6.0-dev`
* grpc-lw-data-provider: `2.3.1-dev`
* httpclient5: `5.3.1`
* auto-service: `1.1.1`
* kotlin-logging: `3.0.5`

## 1.3.0

Expand Down
62 changes: 29 additions & 33 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -97,15 +106,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.exactpro.th2.common.event.EventUtils.createMessageBean;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField;
Expand Down Expand Up @@ -182,7 +182,6 @@
import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.asExpandable;
import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.indexOf;
import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.isEmpty;
import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.startsWith;
import static com.exactpro.th2.util.MessageUtil.findByte;
import static com.exactpro.th2.util.MessageUtil.getBodyLength;
import static com.exactpro.th2.util.MessageUtil.getChecksum;
Expand Down Expand Up @@ -393,11 +392,11 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
try {
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (TimeoutException e) {
ExceptionUtils.rethrow(new TimeoutException(
ExceptionUtils.asRuntimeException(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
currentTimeout)));
} catch (Exception e) {
ExceptionUtils.rethrow(e);
ExceptionUtils.asRuntimeException(e);
}
}

Expand All @@ -412,7 +411,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
}
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
}
}
Expand All @@ -427,7 +426,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
}
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
}
}
Expand Down Expand Up @@ -497,7 +496,7 @@ public ByteBuf onReceive(@NotNull IChannel channel, @NotNull ByteBuf buffer) {

@NotNull
@Override
public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, MessageID messageId) {
public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull MessageID messageId) {
Map<String, String> metadata = new HashMap<>();

StrategyState state = strategy.getState();
Expand Down Expand Up @@ -538,7 +537,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
if(msgTypeValue.equals(MSG_TYPE_LOGOUT)) {
serverMsgSeqNum.incrementAndGet();
state.addMessageID(messageId);
strategy.getIncomingMessageStrategy(x -> x.getLogoutStrategy()).process(message, metadata);
strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getLogoutStrategy).process(message, metadata);
return metadata;
}

Expand Down Expand Up @@ -760,11 +759,7 @@ private void resetSequence(ByteBuf message) {
} else {
int newSeqNo = Integer.parseInt(requireNonNull(seqNumValue.getValue()));
serverMsgSeqNum.updateAndGet(sequence -> {
if(sequence < newSeqNo - 1) {
return newSeqNo - 1;
} else {
return sequence;
}
return Math.max(sequence, newSeqNo - 1);
});
}
}
Expand Down Expand Up @@ -851,7 +846,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
}

AtomicBoolean skip = new AtomicBoolean(recoveryConfig.getOutOfOrder());
AtomicReference<ByteBuf> skipped = new AtomicReference(null);
AtomicReference<ByteBuf> skipped = new AtomicReference<>(null);

int endSeq = endSeqNo;
LOGGER.info("Loading messages from {} to {}", beginSeqNo, endSeqNo);
Expand Down Expand Up @@ -1608,7 +1603,7 @@ private Map<String, String> gapFillSequenceReset(ByteBuf message, Map<String, St
onOutgoingUpdateTag(message, metadata);
FixField msgType = findField(message, MSG_TYPE_TAG, US_ASCII);

if(msgType == null || !msgType.getValue().equals(MSG_TYPE_SEQUENCE_RESET)) return null;
if(msgType == null || !Objects.equals(msgType.getValue(), MSG_TYPE_SEQUENCE_RESET)) return null;

if(resendRequestConfig.getGapFill()) return null;

Expand All @@ -1625,11 +1620,13 @@ private Map<String, String> missOutgoingMessages(ByteBuf message, Map<String, St
int countToMiss = strategy.getMissOutgoingMessagesConfiguration().getCount();
var strategyState = strategy.getState();
onOutgoingUpdateTag(message, metadata);
if(!strategyState.addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> x <= countToMiss)) {
return null;
if(strategyState.addMissedMessageToCacheIfCondition(msgSeqNum.get(), message.copy(), x -> x <= countToMiss)) {
message.clear();
}
if(strategy.getAllowMessagesBeforeRetransmissionFinishes()
&& Duration.between(strategy.getStartTime(), Instant.now()).compareTo(strategy.getConfig().getDuration()) > 0 ) {
strategy.disableAllowMessagesBeforeRetransmissionFinishes("after " + strategy.getConfig().getDuration() + " strategy duration");
}

message.clear();

return null;
}
Expand Down Expand Up @@ -1784,9 +1781,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
ByteBuf logonBuf = Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8));

channel.send(logonBuf, strategy.getState().enrichProperties(props), null, SendMode.DIRECT_MQ)
.thenAcceptAsync(x -> {
strategy.getState().addMessageID(x);
}, executorService);
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);

boolean logonSent = false;
boolean responseReceived = true;
Expand All @@ -1795,7 +1790,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
try(
Socket socket = new Socket(address.getAddress(), address.getPort());
DataOutputStream dOut = new DataOutputStream(socket.getOutputStream());
DataInputStream dIn = new DataInputStream(socket.getInputStream());
DataInputStream dIn = new DataInputStream(socket.getInputStream())
){
socket.setSoTimeout(5000);

Expand Down Expand Up @@ -1829,7 +1824,8 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
LOGGER.info("Waiting for 5 seconds to check if main session will be disconnected.");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error("Interrupted", e);
Thread.currentThread().interrupt();
}

HashMap<String, Object> additionalDetails = new HashMap<>();
Expand Down Expand Up @@ -2240,7 +2236,7 @@ private void applyNextStrategy() {
ruleErrorEvent(nextStrategyConfig.getRuleType(), null, e);
}

LOGGER.info("Next strategy applied: {}", nextStrategyConfig.getRuleType());
LOGGER.info("Next strategy applied: {}, duration: {}", nextStrategyConfig.getRuleType(), nextStrategyConfig.getDuration());
executorService.schedule(this::applyNextStrategy, nextStrategyConfig.getDuration().toMillis(), TimeUnit.MILLISECONDS);
}

Expand Down
24 changes: 12 additions & 12 deletions src/main/kotlin/com/exactpro/th2/conn/dirty/fix/PasswordManager.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
package com.exactpro.th2.conn.dirty.fix

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import java.io.BufferedReader
import java.io.ByteArrayInputStream
import java.io.InputStreamReader
Expand Down Expand Up @@ -65,15 +66,14 @@ class PasswordManager(
K_LOGGER.error { "Error while pulling passwords: ${response.code}" }
return@execute
}
val responseMap: Map<String, String> =
OBJECT_MAPPER.readValue(response.entity.content, Map::class.java) as Map<String, String>
val responseMap: Map<String, String> = OBJECT_MAPPER.readValue(response.entity.content)

val content = responseMap[CONTENT_PROPERTY]
val propContent = responseMap[CONTENT_PROPERTY]
?: error("Error while polling new passwords. No $CONTENT_PROPERTY in response.")
val zipPassword = responseMap[PASSWORD_PROPERTY]?.toCharArray()
?: error("Error while polling new passwords. No $PASSWORD_PROPERTY in response.")

val zipContent: ByteArray = Base64.getDecoder().decode(content.toByteArray())
val zipContent: ByteArray = Base64.getDecoder().decode(propContent.toByteArray())

val zipInputStream = ZipInputStream(ByteArrayInputStream(zipContent), zipPassword)
val reader = BufferedReader(InputStreamReader(zipInputStream))
Expand All @@ -83,12 +83,12 @@ class PasswordManager(
K_LOGGER.info { "Archive entry name: $entryName" }
K_LOGGER.info { "Secret file name: $secretFileName" }
if (entryName.contains(secretFileName)) {
val content = reader.readLine()
if (content.isNotBlank()) {
runCatching { OBJECT_MAPPER.readValue(content, Map::class.java) as Map<String, String> }
val lineContent = reader.readLine()
if (lineContent.isNotBlank()) {
runCatching { OBJECT_MAPPER.readValue<Map<String, String>>(lineContent) }
.onFailure { K_LOGGER.error(it) { "Error while getting secrets" } }
.onSuccess { secrets ->
K_LOGGER.info { "Decoded secrets: ${secrets}" }
K_LOGGER.info { "Decoded secrets: $secrets" }
secrets[newPasswordSecretName]?.let {
newPassword = Base64.getDecoder().decode(it).decodeToString().ifBlank { null }
}
Expand All @@ -97,13 +97,13 @@ class PasswordManager(
password = Base64.getDecoder().decode(it).decodeToString().ifBlank { null }
}

secrets[previousPasswordSecretName]?.let {
val json = Base64.getDecoder().decode(it).decodeToString().ifBlank { null }
secrets[previousPasswordSecretName]?.let { secret ->
val json = Base64.getDecoder().decode(secret).decodeToString().ifBlank { null }

if(json == null) {
previouslyUsedPasswords.clear()
} else {
runCatching { OBJECT_MAPPER.readValue(json, List::class.java) as List<String> }
runCatching { OBJECT_MAPPER.readValue<List<String>>(json) }
.onFailure { K_LOGGER.error(it) { "Error while getting $previousPasswordSecretName." } }
.onSuccess {
previouslyUsedPasswords.clear()
Expand Down
Loading

0 comments on commit 0aa9523

Please sign in to comment.