Skip to content

Commit

Permalink
Open channel only if session is active
Browse files Browse the repository at this point in the history
  • Loading branch information
denis.plotnikov committed Dec 8, 2024
1 parent 8360cab commit a78d69e
Showing 1 changed file with 26 additions and 19 deletions.
45 changes: 26 additions & 19 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,19 +358,11 @@ public void onStart() {
msgSeqNum.set(sequences.getClientSeq());
serverMsgSeqNum.set(sequences.getServerSeq());
}
if(!channel.isOpen()) channel.open();
if(!channel.isOpen()) openChannel();
}

@NotNull
public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<String, String> properties, @Nullable EventID eventID) {
try {
disconnectStrategyLock.lock();
strategy.getSendStrategy(SendStrategy::getSendPreprocessor).process(body, properties);
}
finally {
disconnectStrategyLock.unlock();
}

FixField msgType = findField(body, MSG_TYPE_TAG);
boolean isLogout = msgType != null && Objects.equals(msgType.getValue(), MSG_TYPE_LOGOUT);
boolean isLogon = msgType != null && Objects.equals(msgType.getValue(), MSG_TYPE_LOGON);
Expand All @@ -380,7 +372,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
context.send(CommonUtil.toEvent(String.format("Enabling session reconnects: %b", channel.getSessionAlias())));
sessionActive.set(true);
try {
sendingTimeoutHandler.getWithTimeout(channel.open());
sendingTimeoutHandler.getWithTimeout(openChannel());
} catch (Exception e) {
context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias()), e));
}
Expand Down Expand Up @@ -410,7 +402,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
return CompletableFuture.completedFuture(null);
}

boolean isUngracefulDisconnect = Boolean.getBoolean(properties.get(UNGRACEFUL_DISCONNECT_PROPERTY));
boolean isUngracefulDisconnect = Objects.equals(properties.getOrDefault(UNGRACEFUL_DISCONNECT_PROPERTY, "N"), "Y");

boolean disableReconnect = properties.containsKey(DISABLE_RECONNECT_PROPERTY);

Expand All @@ -425,15 +417,23 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
enabled.set(false);
activeLogonExchange.set(false);
if(!disableReconnect) {
sendingTimeoutHandler.getWithTimeout(channel.open());
sendingTimeoutHandler.getWithTimeout(openChannel());
} else {
channel.close().get();
}
} catch (Exception e) {
context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect), e));
}
return CompletableFuture.completedFuture(null);
}


try {
disconnectStrategyLock.lock();
strategy.getSendStrategy(SendStrategy::getSendPreprocessor).process(body, properties);
}
finally {
disconnectStrategyLock.unlock();
}

// TODO: probably, this should be moved to the core part
// But those changes will break API
Expand All @@ -443,7 +443,7 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str

if (!channel.isOpen()) {
try {
sendingTimeoutHandler.getWithTimeout(channel.open());
sendingTimeoutHandler.getWithTimeout(openChannel());
} catch (TimeoutException e) {
ExceptionUtils.asRuntimeException(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
Expand Down Expand Up @@ -795,7 +795,7 @@ private Map<String, String> handleLogout(@NotNull ByteBuf message, Map<String, S
context.send(CommonUtil.toEvent("logout for sender - " + settings.getSenderCompID()), null);//make more useful
try {
disconnect(false);
channel.open();
openChannel();
} catch (Exception e) {
LOGGER.error("Error while disconnecting in handle logout.");
}
Expand Down Expand Up @@ -826,7 +826,7 @@ private void reset() {
if(messageLoader != null) {
messageLoader.updateTime();
}
if(!channel.isOpen()) channel.open();
if(!channel.isOpen()) openChannel();
}

public void sendResendRequest(int beginSeqNo, int endSeqNo) {
Expand Down Expand Up @@ -1659,7 +1659,7 @@ private Map<String, String> logoutOnLogon(ByteBuf message, Map<String, String> m
handleLogon(message, metadata);
try {
disconnect(strategy.getGracefulDisconnect());
if(!channel.isOpen()) channel.open().get();
if(!channel.isOpen()) openChannel().get();
} catch (Exception e) {
LOGGER.error("Error while reconnecting.", e);
}
Expand Down Expand Up @@ -2096,7 +2096,7 @@ private void setupTransformStrategy(RuleConfiguration configuration) {
strategy.setCleanupHandler(this::cleanupTransformStrategy);
try {
disconnect(configuration.getGracefulDisconnect());
if(!channel.isOpen()) channel.open().get();
if(!channel.isOpen()) openChannel().get();
} catch (Exception e) {
String message = String.format("Error while setting up %s", strategy.getType());
LOGGER.error(message, e);
Expand Down Expand Up @@ -2670,10 +2670,17 @@ private void disconnect(boolean graceful) throws ExecutionException, Interrupted
}

private void openChannelAndWaitForLogon() throws ExecutionException, InterruptedException {
if(!channel.isOpen()) channel.open().get();
if(!channel.isOpen()) openChannel().get();
waitUntilLoggedIn();
}

private CompletableFuture<Unit> openChannel() {
if(channel != null && sessionActive.get()) {
return channel.open();
}
return CompletableFuture.completedFuture(null);
}

private void waitUntilLoggedIn() {
long start = System.currentTimeMillis();
while (!enabled.get() && System.currentTimeMillis() - start < 2000) {
Expand Down

0 comments on commit a78d69e

Please sign in to comment.