Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Plotnikov committed Nov 29, 2023
1 parent 136baff commit 9bfc571
Showing 1 changed file with 44 additions and 27 deletions.
71 changes: 44 additions & 27 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.checkerframework.checker.units.qual.A;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -201,6 +202,7 @@ public class FixHandler implements AutoCloseable, IHandler {
private final AtomicInteger msgSeqNum = new AtomicInteger(0);
private final AtomicInteger serverMsgSeqNum = new AtomicInteger(0);
private final AtomicInteger testReqID = new AtomicInteger(0);
private final AtomicBoolean activeRecovery = new AtomicBoolean(true);
private final AtomicBoolean sessionActive = new AtomicBoolean(true);
private final AtomicBoolean enabled = new AtomicBoolean(false);
private final AtomicBoolean connStarted = new AtomicBoolean(false);
Expand Down Expand Up @@ -630,12 +632,14 @@ private Map<String, String> handleLogon(@NotNull ByteBuf message, Map<String, St
if(nextExpectedSeqNumber < seqNum) {
try {
recoveryLock.lock();
activeRecovery.set(true);
Thread.sleep(settings.getCradleSaveTimeoutMs());
strategy.getRecoveryHandler().recovery(nextExpectedSeqNumber, seqNum);
} catch (InterruptedException e) {
LOGGER.error("Error while waiting for cradle save timeout.", e);
} finally {
recoveryLock.unlock();
activeRecovery.set(false);
}
} else if (nextExpectedSeqNumber > seqNum) {
context.send(
Expand Down Expand Up @@ -781,12 +785,14 @@ private void handleResendRequest(ByteBuf message) {

try {
recoveryLock.lock();
activeRecovery.set(true);
Thread.sleep(settings.getCradleSaveTimeoutMs());
strategy.getRecoveryHandler().recovery(beginSeqNo, endSeqNo);
} catch (InterruptedException e) {
LOGGER.error("Error while waiting for cradle save timeout.", e);
} finally {
recoveryLock.unlock();
activeRecovery.set(false);
}
}
}
Expand Down Expand Up @@ -1991,38 +1997,41 @@ private void defaultCleanupHandler() {}

// <editor-fold desc="strategies scheduling and cleanup">
private void applyNextStrategy() {
try {
recoveryLock.lock();
LOGGER.info("Cleaning up current strategy {}", strategy.getState().getType());
LOGGER.info("Cleaning up current strategy {}", strategy.getState().getType());
while (activeRecovery.get()) {
LOGGER.info("Waiting for recovery to finish.");
try {
strategy.getCleanupHandler().cleanup();
} catch (Exception e) {
String message = String.format("Error while cleaning up strategy: %s", strategy.getState().getType());
LOGGER.error(message, e);
ruleErrorEvent(strategy.getState().getType(), null, e);
}

if(!sessionActive.get()) {
strategy.resetStrategyAndState(RuleConfiguration.Companion.defaultConfiguration());
executorService.schedule(this::applyNextStrategy, Duration.of(10, ChronoUnit.MINUTES).toMinutes(), TimeUnit.MINUTES);
return;
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Error while waiting for recovery to finish", e);
}
}
try {
strategy.getCleanupHandler().cleanup();
} catch (Exception e) {
String message = String.format("Error while cleaning up strategy: %s", strategy.getState().getType());
LOGGER.error(message, e);
ruleErrorEvent(strategy.getState().getType(), null, e);
}

RuleConfiguration nextStrategyConfig = scheduler.next();
Consumer<RuleConfiguration> nextStrategySetupFunction = getSetupFunction(nextStrategyConfig);
try {
nextStrategySetupFunction.accept(nextStrategyConfig);
} catch (Exception e) {
String message = String.format("Error while setting up strategy: %s", strategy.getState().getType());
LOGGER.error(message, e);
ruleErrorEvent(nextStrategyConfig.getRuleType(), null, e);
}
if(!sessionActive.get()) {
strategy.resetStrategyAndState(RuleConfiguration.Companion.defaultConfiguration());
executorService.schedule(this::applyNextStrategy, Duration.of(10, ChronoUnit.MINUTES).toMinutes(), TimeUnit.MINUTES);
return;
}

LOGGER.info("Next strategy applied: {}", nextStrategyConfig.getRuleType());
executorService.schedule(this::applyNextStrategy, nextStrategyConfig.getDuration().toMillis(), TimeUnit.MILLISECONDS);
} finally {
recoveryLock.unlock();
RuleConfiguration nextStrategyConfig = scheduler.next();
Consumer<RuleConfiguration> nextStrategySetupFunction = getSetupFunction(nextStrategyConfig);
try {
nextStrategySetupFunction.accept(nextStrategyConfig);
} catch (Exception e) {
String message = String.format("Error while setting up strategy: %s", strategy.getState().getType());
LOGGER.error(message, e);
ruleErrorEvent(nextStrategyConfig.getRuleType(), null, e);
}

LOGGER.info("Next strategy applied: {}", nextStrategyConfig.getRuleType());
executorService.schedule(this::applyNextStrategy, nextStrategyConfig.getDuration().toMillis(), TimeUnit.MILLISECONDS);
}

private Consumer<RuleConfiguration> getSetupFunction(RuleConfiguration config) {
Expand Down Expand Up @@ -2185,6 +2194,14 @@ private void setTime(ByteBuf buf) {
}

private void disconnect(boolean graceful) throws ExecutionException, InterruptedException {
while (activeRecovery.get()) {
LOGGER.info("Waiting for recovery to finish.");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Error while waiting for recovery to finish", e);
}
}
if(graceful) {
sendLogout();
waitLogoutResponse();
Expand Down

0 comments on commit 9bfc571

Please sign in to comment.