Skip to content

Commit

Permalink
[TH2-5146] Reconnect if logon ack is not received in user defined tim…
Browse files Browse the repository at this point in the history
…eout
  • Loading branch information
Denis Plotnikov committed Dec 20, 2023
1 parent 65e2032 commit cbc3275
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 8 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.5.1)
# th2-conn-dirty-fix (1.6.0)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -56,6 +56,7 @@ This microservice allows sending and receiving messages via FIX protocol
The timeout is reset to the original value after a successful sending attempt.
If connection is not established within the specified timeout an error will be reported.
+ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._
+ *logonAckTimeout* - timeout before reconnect will be issued if no logon ack is received.

### Security settings

Expand Down Expand Up @@ -335,6 +336,9 @@ spec:

# Changelog

## 1.6.0
* Session will be reconnected if logon ack isn't received during user defined timeout.

## 1.5.1

* Property `th2.operation_timestamp` is added to metadata to each message
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version=1.5.1
release_version=1.6.0
24 changes: 18 additions & 6 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -169,7 +168,7 @@ public class FixHandler implements AutoCloseable, IHandler {
private final AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));

private final SendingTimeoutHandler sendingTimeoutHandler;
private Future<?> reconnectRequestTimer = CompletableFuture.completedFuture(null);
private AtomicReference<Future<?>> reconnectRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private volatile IChannel channel;
protected FixHandlerSettings settings;

Expand Down Expand Up @@ -430,7 +429,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
if(settings.isLogoutOnIncorrectServerSequence()) {
context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get())));
sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum));
reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS);
reconnectRequestTimer.set(executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS));
if (LOGGER.isErrorEnabled()) error("Invalid message. SeqNum is less than expected %d: %s", null, serverMsgSeqNum.get(), message.toString(US_ASCII));
} else {
context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1)));
Expand All @@ -456,6 +455,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
if(LOGGER.isInfoEnabled()) info("Logon received - %s", message.toString(US_ASCII));
boolean connectionSuccessful = checkLogon(message);
if (connectionSuccessful) {
cancelFuture(reconnectRequestTimer);
if(settings.useNextExpectedSeqNum()) {
FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG);
if(nextExpectedSeqField == null) {
Expand Down Expand Up @@ -491,7 +491,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
resetTestRequestTask();
} else {
enabled.set(false);
reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS);
reconnectRequestTimer.set(executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS));
}
break;
//extract logout reason
Expand Down Expand Up @@ -729,7 +729,7 @@ private void checkHeartbeat(ByteBuf message) {

if (receivedTestReqID != null) {
if (Objects.equals(receivedTestReqID.getValue(), Integer.toString(testReqID.get()))) {
reconnectRequestTimer.cancel(false);
cancelFuture(reconnectRequestTimer);
}
}
}
Expand Down Expand Up @@ -901,7 +901,7 @@ public void sendTestRequest() { //do private
} else {
sendLogon();
}
reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS);
reconnectRequestTimer.set(executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS));
}

public void sendLogon() {
Expand Down Expand Up @@ -943,6 +943,7 @@ public void sendLogon() {
setChecksumAndBodyLength(logon);
info("Send logon - %s", logon);
channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE);
reconnectRequestTimer.set(executorService.schedule(this::onMissingLogonAck, settings.getLogonAckTimeout(), TimeUnit.MILLISECONDS));
}

private void sendLogout() {
Expand Down Expand Up @@ -989,6 +990,7 @@ public void onClose(@NotNull IChannel channel) {
enabled.set(false);
cancelFuture(heartbeatTimer);
cancelFuture(testRequestTimer);
cancelFuture(reconnectRequestTimer);
}

@Override
Expand Down Expand Up @@ -1019,6 +1021,16 @@ private void disconnect(Boolean graceful) throws ExecutionException, Interrupted
channel.close().get();
}

private void onMissingLogonAck() {
info("Logon was not acknowledged. Reconnecting.");
try {
disconnect(false);
channel.open();
} catch (Exception e) {
error("Error while disconnecting on missing logon ack", e);
}
}

private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time) {
stringBuilder.append(BEGIN_STRING_TAG).append("=").append(settings.getBeginString());
stringBuilder.append(MSG_TYPE).append(msgType);
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public class FixHandlerSettings implements IHandlerSettings {
private int reconnectDelay = 5;
private int disconnectRequestDelay = 5;

/**
* Timeout in milliseconds during which the connection should receive logon acknowledgement message.
* Otherwise reconnect will be done.
*/
private long logonAckTimeout = 5_000;

/**
* Timeout in milliseconds during which the connection should be opened and session is logged in.
* Otherwise, the send operation will be interrupted
Expand Down Expand Up @@ -327,4 +333,12 @@ public long getMinConnectionTimeoutOnSend() {
public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) {
this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend;
}

public long getLogonAckTimeout() {
return logonAckTimeout;
}

public void setLogonAckTimeout(long logonTimeout) {
this.logonAckTimeout = logonTimeout;
}
}
37 changes: 37 additions & 0 deletions src/test/java/com/exactpro/th2/FixHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,41 @@ void onConnectionTest() {
new String(channel.getQueue().get(0).array()));
}

@Test
void reconnectOnMissingLogonAck() throws InterruptedException {
FixHandlerSettings settings = createHandlerSettings();
Channel channel = new Channel(settings, null);
FixHandler fixHandler = channel.getFixHandler();
fixHandler.onOpen(channel);
assertEquals(channel.getQueue().size(), 1);
assertEquals("8=FIXT.1.1\u00019=105\u000135=A\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=203\u0001",
new String(channel.getQueue().get(0).array()));
channel.getQueue().clear();
Thread.sleep(settings.getLogonAckTimeout() + 2000);
assertEquals(channel.getQueue().size(), 1);
assertEquals("8=FIXT.1.1\u00019=105\u000135=A\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=203\u0001",
new String(channel.getQueue().get(0).array()));
channel.close();
}

@Test
void noReconnectIfAckReceived() throws InterruptedException {
FixHandlerSettings settings = createHandlerSettings();
settings.setHeartBtInt(1000);
settings.setTestRequestDelay(1000);
Channel channel = new Channel(settings, null);
FixHandler fixHandler = channel.getFixHandler();
fixHandler.onOpen(channel);
fixHandler.onIncoming(channel, logonResponse);
assertEquals(channel.getQueue().size(), 1);
assertEquals("8=FIXT.1.1\u00019=107\u000135=A\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=1000\u00011137=9\u0001553=username\u0001554=pass\u000110=043\u0001",
new String(channel.getQueue().get(0).array()));
channel.getQueue().clear();
Thread.sleep(settings.getLogonAckTimeout() * 2);
assertEquals(channel.getQueue().size(), 0);
channel.close();
}

@Test
void logoutDisconnectTest() {
channel.clearQueue();
Expand Down Expand Up @@ -427,6 +462,7 @@ class Channel implements IChannel {

@Override
public CompletableFuture<Unit> open() {
fixHandler.onOpen(this);
return CompletableFuture.completedFuture(Unit.INSTANCE);
}

Expand All @@ -444,6 +480,7 @@ public boolean isOpen() {

@Override
public CompletableFuture<Unit> close() {
fixHandler.onClose(this);
return CompletableFuture.completedFuture(Unit.INSTANCE);
}

Expand Down

0 comments on commit cbc3275

Please sign in to comment.