Skip to content

Commit

Permalink
Logon to the same user from another socket strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Plotnikov committed Feb 2, 2024
1 parent 6f50b6e commit ed4b4cc
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 5 deletions.
108 changes: 103 additions & 5 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyScheduler;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyState;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.CleanupHandler;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.OnCloseHandler;
import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler;
import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel;
import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode;
Expand All @@ -58,7 +59,12 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -90,6 +96,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.net.SocketFactory;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -1166,6 +1173,14 @@ public void sendLogon() {
return;
}

StringBuilder logon = buildLogon(props);

LOGGER.info("Send logon - {}", logon);
channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), props, null, SendMode.HANDLE_AND_MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
}

private StringBuilder buildLogon(Map<String, String> props) {
StringBuilder logon = new StringBuilder();
Boolean reset;
if (!connStarted.get()) {
Expand Down Expand Up @@ -1205,9 +1220,8 @@ public void sendLogon() {
});

setChecksumAndBodyLength(logon);
LOGGER.info("Send logon - {}", logon);
channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), props, null, SendMode.HANDLE_AND_MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);

return logon;
}

private void sendLogout(boolean isPossDup) {
Expand Down Expand Up @@ -1705,6 +1719,84 @@ private void runPossDupSessionMessages(RuleConfiguration configuration) {
ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs());
}

private OnCloseHandler getRunLogonFromAnotherConnectionOnCloseHandler(AtomicBoolean sessionDisconnected) {
return () -> sessionDisconnected.set(true);
}

private void runLogonFromAnotherConnection(RuleConfiguration configuration) {
Instant start = Instant.now();
strategy.resetStrategyAndState(configuration);
ruleStartEvent(configuration.getRuleType(), strategy.getStartTime());
if(!enabled.get()) {
ruleErrorEvent(strategy.getType(), String.format("Session %s isn't logged in.", channel.getSessionAlias()), null);
return;
}
AtomicBoolean isMainSessionDisconnected = new AtomicBoolean(false);
strategy.setOnCloseHandler(getRunLogonFromAnotherConnectionOnCloseHandler(isMainSessionDisconnected));

Map<String, String> props = new HashMap<>();
StringBuilder logon = buildLogon(props);
props.put("sentUsingAnotherSocket", "True");
ByteBuf logonBuf = Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8));

channel.send(logonBuf, props, null, SendMode.DIRECT_MSTORE)
.thenAcceptAsync(x -> {
strategy.getState().addMessageID(x);
}, executorService);

boolean logonSent = false;
boolean responseReceived = true;
boolean sessionDisconnected = false;

try(
Socket socket = new Socket(address.getAddress(), address.getPort());
DataOutputStream dOut = new DataOutputStream(socket.getOutputStream());
DataInputStream dIn = new DataInputStream(socket.getInputStream());
){
socket.setSoTimeout(5000);

byte[] logonByteArray = new byte[logonBuf.readableBytes()];
logonBuf.readBytes(logonByteArray);
dOut.write(logonByteArray);
logonSent = true;


try {
byte[] buffer = new byte[1024];
int read = dIn.read(buffer);

if (read == -1) {
responseReceived = false;
sessionDisconnected = true;
} else {
responseReceived = true;
LOGGER.warn("Received response while connecting with the same compId and there is live session for this compId. {}", new String(buffer, StandardCharsets.UTF_8));
}
} catch (SocketTimeoutException e) {
responseReceived = false;
}

} catch (IOException e) {
LOGGER.error("Error while connecting from another socket to the same user.", e);
responseReceived = false;
}

try {
LOGGER.info("Waiting for 5 seconds to check if main session will be disconnected.");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

HashMap<String, Object> additionalDetails = new HashMap<>();
additionalDetails.put("logonFromAnotherSocketSent", logonSent);
additionalDetails.put("responseForLogonInAnotherSessionReceived", responseReceived);
additionalDetails.put("anotherSocketSessionDisconnectedAfterLogon", sessionDisconnected);
additionalDetails.put("isMainSessionDisconnected", isMainSessionDisconnected.get());
additionalDetails.put("type", "logon_from_another_socket");
ruleEndEvent(configuration.getRuleType(), start, Collections.emptyList(), additionalDetails);
}

private void setupDisconnectStrategy(RuleConfiguration configuration) {
strategy.resetStrategyAndState(configuration);
strategy.updateSendStrategy(x -> {x.setSendPreprocessor(this::blockSend); return Unit.INSTANCE; });
Expand Down Expand Up @@ -2125,6 +2217,7 @@ private Consumer<RuleConfiguration> getSetupFunction(RuleConfiguration config) {
case INVALID_CHECKSUM: return this::setupTransformMessageStrategy;
case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy;
case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages;
case LOGON_FROM_ANOTHER_CONNECTION: return this::runLogonFromAnotherConnection;
case DEFAULT: return configuration -> strategy.cleanupStrategy();
default: throw new IllegalStateException(String.format("Unknown strategy type %s.", config.getRuleType()));
}
Expand Down Expand Up @@ -2418,14 +2511,15 @@ private void ruleStartEvent(RuleType type, Instant start) {
);
}

private void ruleEndEvent(RuleType type, Instant start, List<MessageID> messageIDS) {
private void ruleEndEvent(RuleType type, Instant start, List<MessageID> messageIDS, Map<String, Object> additionalDetails) {
Instant end = Instant.now();
String message = String.format("%s strategy finished: %s - %s", type.name(), start.toString(), end.toString());
LOGGER.info(message);
try {
Message jsonBody = createMessageBean(mapper.writeValueAsString(Map.of(
"StartTime", start.toString(), "EndTime", end.toString(),
"Type", type.toString(), "AffectedMessages", messageIDS.stream().map(UtilKt::logId).collect(Collectors.toList())
"Type", type.toString(), "AffectedMessages", messageIDS.stream().map(UtilKt::logId).collect(Collectors.toList()),
"AdditionalDetails", additionalDetails
)));
Event event = Event
.start()
Expand All @@ -2443,6 +2537,10 @@ private void ruleEndEvent(RuleType type, Instant start, List<MessageID> messageI
}
}

private void ruleEndEvent(RuleType type, Instant start, List<MessageID> messageIDS) {
ruleEndEvent(type, start, messageIDS, Collections.emptyMap());
}

private void ruleErrorEvent(RuleType type, String message, Throwable error) {
String errorLog = String.format("Rule %s error event: message - %s, error - %s", type, message, error);
LOGGER.error(errorLog, error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ data class RuleConfiguration(
RuleType.FAKE_RETRANSMISSION -> {}
RuleType.LOGON_AFTER_LOGON -> {}
RuleType.POSS_DUP_SESSION_MESSAGES -> {}
RuleType.LOGON_FROM_ANOTHER_CONNECTION -> {}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ enum class RuleType {
FAKE_RETRANSMISSION,
INVALID_CHECKSUM,
POSS_DUP_SESSION_MESSAGES,
LOGON_FROM_ANOTHER_CONNECTION,
DEFAULT
}

0 comments on commit ed4b4cc

Please sign in to comment.