From 9bfc5710680d50b823b8e26e731cbd064e56e243 Mon Sep 17 00:00:00 2001 From: Denis Plotnikov Date: Wed, 29 Nov 2023 19:56:26 +0400 Subject: [PATCH] fix --- .../java/com/exactpro/th2/FixHandler.java | 71 ++++++++++++------- 1 file changed, 44 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index bfa251b..0a78497 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -92,6 +92,7 @@ import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.checkerframework.checker.units.qual.A; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -201,6 +202,7 @@ public class FixHandler implements AutoCloseable, IHandler { private final AtomicInteger msgSeqNum = new AtomicInteger(0); private final AtomicInteger serverMsgSeqNum = new AtomicInteger(0); private final AtomicInteger testReqID = new AtomicInteger(0); + private final AtomicBoolean activeRecovery = new AtomicBoolean(true); private final AtomicBoolean sessionActive = new AtomicBoolean(true); private final AtomicBoolean enabled = new AtomicBoolean(false); private final AtomicBoolean connStarted = new AtomicBoolean(false); @@ -630,12 +632,14 @@ private Map handleLogon(@NotNull ByteBuf message, Map seqNum) { context.send( @@ -781,12 +785,14 @@ private void handleResendRequest(ByteBuf message) { try { recoveryLock.lock(); + activeRecovery.set(true); Thread.sleep(settings.getCradleSaveTimeoutMs()); strategy.getRecoveryHandler().recovery(beginSeqNo, endSeqNo); } catch (InterruptedException e) { LOGGER.error("Error while waiting for cradle save timeout.", e); } finally { recoveryLock.unlock(); + activeRecovery.set(false); } } } @@ -1991,38 +1997,41 @@ private void defaultCleanupHandler() {} // private void applyNextStrategy() { - try { - recoveryLock.lock(); - LOGGER.info("Cleaning up current strategy {}", strategy.getState().getType()); + LOGGER.info("Cleaning up current strategy {}", strategy.getState().getType()); + while (activeRecovery.get()) { + LOGGER.info("Waiting for recovery to finish."); try { - strategy.getCleanupHandler().cleanup(); - } catch (Exception e) { - String message = String.format("Error while cleaning up strategy: %s", strategy.getState().getType()); - LOGGER.error(message, e); - ruleErrorEvent(strategy.getState().getType(), null, e); - } - - if(!sessionActive.get()) { - strategy.resetStrategyAndState(RuleConfiguration.Companion.defaultConfiguration()); - executorService.schedule(this::applyNextStrategy, Duration.of(10, ChronoUnit.MINUTES).toMinutes(), TimeUnit.MINUTES); - return; + Thread.sleep(100); + } catch (InterruptedException e) { + LOGGER.error("Error while waiting for recovery to finish", e); } + } + try { + strategy.getCleanupHandler().cleanup(); + } catch (Exception e) { + String message = String.format("Error while cleaning up strategy: %s", strategy.getState().getType()); + LOGGER.error(message, e); + ruleErrorEvent(strategy.getState().getType(), null, e); + } - RuleConfiguration nextStrategyConfig = scheduler.next(); - Consumer nextStrategySetupFunction = getSetupFunction(nextStrategyConfig); - try { - nextStrategySetupFunction.accept(nextStrategyConfig); - } catch (Exception e) { - String message = String.format("Error while setting up strategy: %s", strategy.getState().getType()); - LOGGER.error(message, e); - ruleErrorEvent(nextStrategyConfig.getRuleType(), null, e); - } + if(!sessionActive.get()) { + strategy.resetStrategyAndState(RuleConfiguration.Companion.defaultConfiguration()); + executorService.schedule(this::applyNextStrategy, Duration.of(10, ChronoUnit.MINUTES).toMinutes(), TimeUnit.MINUTES); + return; + } - LOGGER.info("Next strategy applied: {}", nextStrategyConfig.getRuleType()); - executorService.schedule(this::applyNextStrategy, nextStrategyConfig.getDuration().toMillis(), TimeUnit.MILLISECONDS); - } finally { - recoveryLock.unlock(); + RuleConfiguration nextStrategyConfig = scheduler.next(); + Consumer nextStrategySetupFunction = getSetupFunction(nextStrategyConfig); + try { + nextStrategySetupFunction.accept(nextStrategyConfig); + } catch (Exception e) { + String message = String.format("Error while setting up strategy: %s", strategy.getState().getType()); + LOGGER.error(message, e); + ruleErrorEvent(nextStrategyConfig.getRuleType(), null, e); } + + LOGGER.info("Next strategy applied: {}", nextStrategyConfig.getRuleType()); + executorService.schedule(this::applyNextStrategy, nextStrategyConfig.getDuration().toMillis(), TimeUnit.MILLISECONDS); } private Consumer getSetupFunction(RuleConfiguration config) { @@ -2185,6 +2194,14 @@ private void setTime(ByteBuf buf) { } private void disconnect(boolean graceful) throws ExecutionException, InterruptedException { + while (activeRecovery.get()) { + LOGGER.info("Waiting for recovery to finish."); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOGGER.error("Error while waiting for recovery to finish", e); + } + } if(graceful) { sendLogout(); waitLogoutResponse();