Skip to content

Commit

Permalink
Update recovery workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Plotnikov committed Nov 29, 2023
1 parent 078df4f commit ac4ff4a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 5 deletions.
37 changes: 35 additions & 2 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,10 @@ private Map<String, String> handleLogon(@NotNull ByteBuf message, Map<String, St
if(nextExpectedSeqNumber < seqNum) {
try {
recoveryLock.lock();
Thread.sleep(settings.getCradleSaveTimeoutMs());
strategy.getRecoveryHandler().recovery(nextExpectedSeqNumber, seqNum);
} catch (InterruptedException e) {
LOGGER.error("Error while waiting for cradle save timeout.", e);
} finally {
recoveryLock.unlock();
}
Expand Down Expand Up @@ -778,7 +781,10 @@ private void handleResendRequest(ByteBuf message) {

try {
recoveryLock.lock();
Thread.sleep(settings.getCradleSaveTimeoutMs());
strategy.getRecoveryHandler().recovery(beginSeqNo, endSeqNo);
} catch (InterruptedException e) {
LOGGER.error("Error while waiting for cradle save timeout.", e);
} finally {
recoveryLock.unlock();
}
Expand Down Expand Up @@ -830,6 +836,11 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
if(!skip.get()) {
channel.send(buf, new HashMap<String, String>(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Thread.sleep(settings.getRecoverySendIntervalMs());
} catch (InterruptedException e) {
LOGGER.error("Error while waiting send interval during recovery", e);
}
}

if(skip.get() && recoveryConfig.getOutOfOrder()) {
Expand All @@ -841,6 +852,11 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
skip.set(true);
channel.send(skipped.get(), new HashMap<String, String>(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Thread.sleep(settings.getRecoverySendIntervalMs());
} catch (InterruptedException e) {
LOGGER.error("Error while waiting send interval during recovery", e);
}
}

resetHeartbeatTask();
Expand Down Expand Up @@ -1451,7 +1467,10 @@ private Map<String, String> fakeRetransmissionOutgoingProcessor(ByteBuf message,
FixField sendingTime = requireNonNull(findField(message, SENDING_TIME_TAG));
strategy.getState().addMissedMessageToCacheIfCondition(msgSeqNum.get(), Unpooled.copiedBuffer(message), x -> true);

sendingTime.insertNext(POSS_DUP_TAG, IS_POSS_DUP).insertNext(POSS_RESEND_TAG, IS_POSS_DUP);
sendingTime
.insertNext(ORIG_SENDING_TIME_TAG, sendingTime.getValue())
.insertNext(POSS_DUP_TAG, IS_POSS_DUP)
.insertNext(POSS_RESEND_TAG, IS_POSS_DUP);
updateLength(message);
updateChecksum(message);

Expand Down Expand Up @@ -2067,6 +2086,11 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {
if(!skip) {
channel.send(missedMessage, new HashMap<String, String>(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Thread.sleep(settings.getRecoverySendIntervalMs());
} catch (InterruptedException e) {
LOGGER.error("Error while waiting send interval during recovery", e);
}
}

if(skip && recoveryConfig.getOutOfOrder()) {
Expand All @@ -2077,6 +2101,11 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {
if(!skip && recoveryConfig.getOutOfOrder()) {
channel.send(skipped, new HashMap<String, String>(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Thread.sleep(settings.getRecoverySendIntervalMs());
} catch (InterruptedException e) {
LOGGER.error("Error while waiting send interval during recovery", e);
}
skip = true;
}
}
Expand Down Expand Up @@ -2142,6 +2171,7 @@ private void disconnect(boolean graceful) throws ExecutionException, Interrupted
waitLogoutResponse();
}
enabled.set(false);
Thread.sleep(settings.getDisconnectCleanUpTimeoutMs());
channel.close().get();
}

Expand Down Expand Up @@ -2187,12 +2217,15 @@ private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqN
if (settings.getTargetCompID() != null) stringBuilder.append(TARGET_COMP_ID).append(settings.getTargetCompID());
if (settings.getSenderSubID() != null) stringBuilder.append(SENDER_SUB_ID).append(settings.getSenderSubID());
stringBuilder.append(SENDING_TIME);
String now = getTime();
if(time != null) {
stringBuilder.append(time);
now = time;
} else {
stringBuilder.append(getTime());
stringBuilder.append(now);
}
if(isPossDup) {
stringBuilder.append(ORIG_SENDING_TIME).append(now);
stringBuilder.append(POSS_DUP).append(IS_POSS_DUP);
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class FixHandlerSettings implements IHandlerSettings {
private int testRequestDelay = 60;
private int reconnectDelay = 5;
private int disconnectRequestDelay = 5;
private long disconnectCleanUpTimeoutMs = 1000;
private long cradleSaveTimeoutMs = 2000;
private long recoverySendIntervalMs = 10;

private BrokenConnConfiguration brokenConnConfiguration;
/**
Expand Down Expand Up @@ -348,4 +351,28 @@ public long getMinConnectionTimeoutOnSend() {
public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) {
this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend;
}

public long getDisconnectCleanUpTimeoutMs() {
return disconnectCleanUpTimeoutMs;
}

public void setDisconnectCleanUpTimeoutMs(long disconnectCleanUpTimeoutMs) {
this.disconnectCleanUpTimeoutMs = disconnectCleanUpTimeoutMs;
}

public long getCradleSaveTimeoutMs() {
return cradleSaveTimeoutMs;
}

public void setCradleSaveTimeoutMs(long cradleSaveTimeoutMs) {
this.cradleSaveTimeoutMs = cradleSaveTimeoutMs;
}

public long getRecoverySendIntervalMs() {
return recoverySendIntervalMs;
}

public void setRecoverySendIntervalMs(long recoverySendIntervalMs) {
this.recoverySendIntervalMs = recoverySendIntervalMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever


@Disabled
class TestStrategies {

private class TestContext(
Expand Down Expand Up @@ -141,7 +141,7 @@ class TestStrategies {

val captor = argumentCaptor<ByteBuf> { }

verify(channel, timeout(defaultRuleDuration.millis() + 300)).open()
verify(channel, timeout(defaultRuleDuration.millis() + 1300)).open()
verify(channel, timeout(300)).send(any(), any(), anyOrNull(), any()) // Logon
clearInvocations(channel)

Expand All @@ -151,7 +151,7 @@ class TestStrategies {
handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId())
handler.onIncoming(channel, businessMessage(incomingSequence.incrementAndGet()), getMessageId())

verify(channel, timeout(businessRuleDuration.millis() + businessRuleCleanupDuration.millis() + 300)).open()
verify(channel, timeout(businessRuleDuration.millis() + businessRuleCleanupDuration.millis() + 1300)).open()
verify(channel, timeout(300)).send(captor.capture(), any(), anyOrNull(), any()) // Logon
clearInvocations(channel)

Expand Down

0 comments on commit ac4ff4a

Please sign in to comment.