From ed4b4ccfd69d1b7fe513807ed460db1eaf77d7e1 Mon Sep 17 00:00:00 2001 From: Denis Plotnikov Date: Fri, 2 Feb 2024 12:55:07 +0400 Subject: [PATCH] Logon to the same user from another socket strategy --- .../java/com/exactpro/th2/FixHandler.java | 108 +++++++++++++++++- .../configuration/RuleConfiguration.kt | 1 + .../dirty/fix/brokenconn/strategy/RuleType.kt | 1 + 3 files changed, 105 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 96c26f5..f5ffc2e 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -48,6 +48,7 @@ import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyScheduler; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyState; import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.CleanupHandler; +import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.OnCloseHandler; import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; @@ -58,7 +59,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; @@ -90,6 +96,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Collectors; +import javax.net.SocketFactory; import kotlin.Unit; import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; @@ -1166,6 +1173,14 @@ public void sendLogon() { return; } + StringBuilder logon = buildLogon(props); + + LOGGER.info("Send logon - {}", logon); + channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), props, null, SendMode.HANDLE_AND_MANGLE) + .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); + } + + private StringBuilder buildLogon(Map props) { StringBuilder logon = new StringBuilder(); Boolean reset; if (!connStarted.get()) { @@ -1205,9 +1220,8 @@ public void sendLogon() { }); setChecksumAndBodyLength(logon); - LOGGER.info("Send logon - {}", logon); - channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), props, null, SendMode.HANDLE_AND_MANGLE) - .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); + + return logon; } private void sendLogout(boolean isPossDup) { @@ -1705,6 +1719,84 @@ private void runPossDupSessionMessages(RuleConfiguration configuration) { ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs()); } + private OnCloseHandler getRunLogonFromAnotherConnectionOnCloseHandler(AtomicBoolean sessionDisconnected) { + return () -> sessionDisconnected.set(true); + } + + private void runLogonFromAnotherConnection(RuleConfiguration configuration) { + Instant start = Instant.now(); + strategy.resetStrategyAndState(configuration); + ruleStartEvent(configuration.getRuleType(), strategy.getStartTime()); + if(!enabled.get()) { + ruleErrorEvent(strategy.getType(), String.format("Session %s isn't logged in.", channel.getSessionAlias()), null); + return; + } + AtomicBoolean isMainSessionDisconnected = new AtomicBoolean(false); + strategy.setOnCloseHandler(getRunLogonFromAnotherConnectionOnCloseHandler(isMainSessionDisconnected)); + + Map props = new HashMap<>(); + StringBuilder logon = buildLogon(props); + props.put("sentUsingAnotherSocket", "True"); + ByteBuf logonBuf = Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)); + + channel.send(logonBuf, props, null, SendMode.DIRECT_MSTORE) + .thenAcceptAsync(x -> { + strategy.getState().addMessageID(x); + }, executorService); + + boolean logonSent = false; + boolean responseReceived = true; + boolean sessionDisconnected = false; + + try( + Socket socket = new Socket(address.getAddress(), address.getPort()); + DataOutputStream dOut = new DataOutputStream(socket.getOutputStream()); + DataInputStream dIn = new DataInputStream(socket.getInputStream()); + ){ + socket.setSoTimeout(5000); + + byte[] logonByteArray = new byte[logonBuf.readableBytes()]; + logonBuf.readBytes(logonByteArray); + dOut.write(logonByteArray); + logonSent = true; + + + try { + byte[] buffer = new byte[1024]; + int read = dIn.read(buffer); + + if (read == -1) { + responseReceived = false; + sessionDisconnected = true; + } else { + responseReceived = true; + LOGGER.warn("Received response while connecting with the same compId and there is live session for this compId. {}", new String(buffer, StandardCharsets.UTF_8)); + } + } catch (SocketTimeoutException e) { + responseReceived = false; + } + + } catch (IOException e) { + LOGGER.error("Error while connecting from another socket to the same user.", e); + responseReceived = false; + } + + try { + LOGGER.info("Waiting for 5 seconds to check if main session will be disconnected."); + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + HashMap additionalDetails = new HashMap<>(); + additionalDetails.put("logonFromAnotherSocketSent", logonSent); + additionalDetails.put("responseForLogonInAnotherSessionReceived", responseReceived); + additionalDetails.put("anotherSocketSessionDisconnectedAfterLogon", sessionDisconnected); + additionalDetails.put("isMainSessionDisconnected", isMainSessionDisconnected.get()); + additionalDetails.put("type", "logon_from_another_socket"); + ruleEndEvent(configuration.getRuleType(), start, Collections.emptyList(), additionalDetails); + } + private void setupDisconnectStrategy(RuleConfiguration configuration) { strategy.resetStrategyAndState(configuration); strategy.updateSendStrategy(x -> {x.setSendPreprocessor(this::blockSend); return Unit.INSTANCE; }); @@ -2125,6 +2217,7 @@ private Consumer getSetupFunction(RuleConfiguration config) { case INVALID_CHECKSUM: return this::setupTransformMessageStrategy; case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy; case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages; + case LOGON_FROM_ANOTHER_CONNECTION: return this::runLogonFromAnotherConnection; case DEFAULT: return configuration -> strategy.cleanupStrategy(); default: throw new IllegalStateException(String.format("Unknown strategy type %s.", config.getRuleType())); } @@ -2418,14 +2511,15 @@ private void ruleStartEvent(RuleType type, Instant start) { ); } - private void ruleEndEvent(RuleType type, Instant start, List messageIDS) { + private void ruleEndEvent(RuleType type, Instant start, List messageIDS, Map additionalDetails) { Instant end = Instant.now(); String message = String.format("%s strategy finished: %s - %s", type.name(), start.toString(), end.toString()); LOGGER.info(message); try { Message jsonBody = createMessageBean(mapper.writeValueAsString(Map.of( "StartTime", start.toString(), "EndTime", end.toString(), - "Type", type.toString(), "AffectedMessages", messageIDS.stream().map(UtilKt::logId).collect(Collectors.toList()) + "Type", type.toString(), "AffectedMessages", messageIDS.stream().map(UtilKt::logId).collect(Collectors.toList()), + "AdditionalDetails", additionalDetails ))); Event event = Event .start() @@ -2443,6 +2537,10 @@ private void ruleEndEvent(RuleType type, Instant start, List messageI } } + private void ruleEndEvent(RuleType type, Instant start, List messageIDS) { + ruleEndEvent(type, start, messageIDS, Collections.emptyMap()); + } + private void ruleErrorEvent(RuleType type, String message, Throwable error) { String errorLog = String.format("Rule %s error event: message - %s, error - %s", type, message, error); LOGGER.error(errorLog, error); diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt index 1e188bf..ca8b0d4 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/configuration/RuleConfiguration.kt @@ -84,6 +84,7 @@ data class RuleConfiguration( RuleType.FAKE_RETRANSMISSION -> {} RuleType.LOGON_AFTER_LOGON -> {} RuleType.POSS_DUP_SESSION_MESSAGES -> {} + RuleType.LOGON_FROM_ANOTHER_CONNECTION -> {} } } diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt index 5f18782..5b33e87 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/RuleType.kt @@ -34,5 +34,6 @@ enum class RuleType { FAKE_RETRANSMISSION, INVALID_CHECKSUM, POSS_DUP_SESSION_MESSAGES, + LOGON_FROM_ANOTHER_CONNECTION, DEFAULT } \ No newline at end of file