diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 844247c..1cb05af 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -920,7 +920,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi endSeqNo = msgSeqNum.get() + 1; } - AtomicBoolean skip = new AtomicBoolean(recoveryConfig.getOutOfOrder()); + AtomicBoolean skip = new AtomicBoolean(strategy.getOutOfOrder()); AtomicReference skipped = new AtomicReference<>(null); int endSeq = endSeqNo; @@ -983,13 +983,15 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi } } - if(skip.get() && recoveryConfig.getOutOfOrder()) { + if(skip.get() && strategy.getOutOfOrder()) { LOGGER.info("Skipping recovery message. OutOfOrder: {}", buf.toString(US_ASCII)); skipped.set(buf); skip.set(false); + lastProcessedSequence.set(sequence); + return true; } - if(!skip.get() && recoveryConfig.getOutOfOrder()) { + if(!skip.get() && strategy.getOutOfOrder()) { LOGGER.info("Sending recovery message. OutOfOrder: {}", skipped.get().toString(US_ASCII)); skip.set(true); channel.send(skipped.get(), strategy.getState().enrichProperties(), null, SendMode.MANGLE) @@ -1857,6 +1859,10 @@ private Map missOutgoingMessages(ByteBuf message, Map 0 ) { strategy.disableAllowMessagesBeforeRetransmissionFinishes("after " + strategy.getConfig().getDuration() + " strategy duration"); } + if(strategy.getOutOfOrder() + && Duration.between(strategy.getStartTime(), Instant.now()).compareTo(strategy.getConfig().getDuration()) > 0 ) { + strategy.disableOutOfOrder("after " + strategy.getConfig().getDuration() + " strategy duration"); + } return null; } @@ -2624,7 +2630,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { LOGGER.info("Making recovery from state: {} - {}.", beginSeqNo, endSeqNo); - boolean skip = recoveryConfig.getOutOfOrder(); + boolean skip = strategy.getOutOfOrder(); ByteBuf skipped = null; for(int i = beginSeqNo; i <= endSeqNo; i++) { @@ -2668,13 +2674,14 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { } } - if(skip && recoveryConfig.getOutOfOrder()) { + if(skip && strategy.getOutOfOrder()) { LOGGER.info("Skip recovery message out of order: {}", missedMessage.toString(US_ASCII)); skip = false; skipped = missedMessage; + continue; } - if(!skip && recoveryConfig.getOutOfOrder() && skipped != null) { + if(!skip && strategy.getOutOfOrder() && skipped != null) { LOGGER.info("Sending recovery message from state out of order: {}", skipped.toString(US_ASCII)); channel.send(skipped, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt index 3bc8f57..8dde551 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StatefulStrategy.kt @@ -64,6 +64,10 @@ class StatefulStrategy( val allowMessagesBeforeRetransmissionFinishes: Boolean get() = lock.read { _allowMessagesBeforeRetransmissionFinishes } + private var _outOfOrder: Boolean = false + val outOfOrder: Boolean + get() = lock.read { _outOfOrder } + val sendResendRequestOnLogoutReply: Boolean get() = lock.read {state.config?.sendResendRequestOnLogoutReply ?: false } @@ -116,6 +120,11 @@ class StatefulStrategy( LOGGER.info("Disabled allow messages before retransmission finishes by the '$reason' reason") } + fun disableOutOfOrder(reason: String) = lock.write { + _outOfOrder = false + LOGGER.info("Disabled outOfOrder retransmission '$reason' reason") + } + fun getReceiveMessageStrategy(func: ReceiveStrategy.() -> T) = lock.read { receiveStrategy.func() } @@ -160,6 +169,7 @@ class StatefulStrategy( lock.write { state = state.resetAndCopyMissedMessages(config) _allowMessagesBeforeRetransmissionFinishes = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false + _outOfOrder = state.config?.recoveryConfig?.outOfOrder ?: false sendStrategy.sendHandler = defaultStrategy.sendStrategy.sendHandler sendStrategy.sendPreprocessor = defaultStrategy.sendStrategy.sendPreprocessor receiveStrategy.receivePreprocessor = defaultStrategy.receiveStrategy.receivePreprocessor @@ -177,6 +187,7 @@ class StatefulStrategy( lock.write { state = state.resetAndCopyMissedMessages() _allowMessagesBeforeRetransmissionFinishes = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false + _outOfOrder = state.config?.recoveryConfig?.outOfOrder ?: false sendStrategy.sendHandler = defaultStrategy.sendStrategy.sendHandler sendStrategy.sendPreprocessor = defaultStrategy.sendStrategy.sendPreprocessor receiveStrategy.receivePreprocessor = defaultStrategy.receiveStrategy.receivePreprocessor