Skip to content

Commit

Permalink
Fix outOfOrder
Browse files Browse the repository at this point in the history
  • Loading branch information
denis.plotnikov committed Dec 12, 2024
1 parent b7f2c89 commit 373d07d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
endSeqNo = msgSeqNum.get() + 1;
}

AtomicBoolean skip = new AtomicBoolean(recoveryConfig.getOutOfOrder());
AtomicBoolean skip = new AtomicBoolean(strategy.getOutOfOrder());
AtomicReference<ByteBuf> skipped = new AtomicReference<>(null);

int endSeq = endSeqNo;
Expand Down Expand Up @@ -983,13 +983,15 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
}
}

if(skip.get() && recoveryConfig.getOutOfOrder()) {
if(skip.get() && strategy.getOutOfOrder()) {
LOGGER.info("Skipping recovery message. OutOfOrder: {}", buf.toString(US_ASCII));
skipped.set(buf);
skip.set(false);
lastProcessedSequence.set(sequence);
return true;
}

if(!skip.get() && recoveryConfig.getOutOfOrder()) {
if(!skip.get() && strategy.getOutOfOrder()) {
LOGGER.info("Sending recovery message. OutOfOrder: {}", skipped.get().toString(US_ASCII));
skip.set(true);
channel.send(skipped.get(), strategy.getState().enrichProperties(), null, SendMode.MANGLE)
Expand Down Expand Up @@ -1857,6 +1859,10 @@ private Map<String, String> missOutgoingMessages(ByteBuf message, Map<String, St
&& Duration.between(strategy.getStartTime(), Instant.now()).compareTo(strategy.getConfig().getDuration()) > 0 ) {
strategy.disableAllowMessagesBeforeRetransmissionFinishes("after " + strategy.getConfig().getDuration() + " strategy duration");
}
if(strategy.getOutOfOrder()
&& Duration.between(strategy.getStartTime(), Instant.now()).compareTo(strategy.getConfig().getDuration()) > 0 ) {
strategy.disableOutOfOrder("after " + strategy.getConfig().getDuration() + " strategy duration");
}

return null;
}
Expand Down Expand Up @@ -2624,7 +2630,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {

LOGGER.info("Making recovery from state: {} - {}.", beginSeqNo, endSeqNo);

boolean skip = recoveryConfig.getOutOfOrder();
boolean skip = strategy.getOutOfOrder();
ByteBuf skipped = null;

for(int i = beginSeqNo; i <= endSeqNo; i++) {
Expand Down Expand Up @@ -2668,13 +2674,14 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {
}
}

if(skip && recoveryConfig.getOutOfOrder()) {
if(skip && strategy.getOutOfOrder()) {
LOGGER.info("Skip recovery message out of order: {}", missedMessage.toString(US_ASCII));
skip = false;
skipped = missedMessage;
continue;
}

if(!skip && recoveryConfig.getOutOfOrder() && skipped != null) {
if(!skip && strategy.getOutOfOrder() && skipped != null) {
LOGGER.info("Sending recovery message from state out of order: {}", skipped.toString(US_ASCII));
channel.send(skipped, strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class StatefulStrategy(
val allowMessagesBeforeRetransmissionFinishes: Boolean
get() = lock.read { _allowMessagesBeforeRetransmissionFinishes }

private var _outOfOrder: Boolean = false
val outOfOrder: Boolean
get() = lock.read { _outOfOrder }

val sendResendRequestOnLogoutReply: Boolean
get() = lock.read {state.config?.sendResendRequestOnLogoutReply ?: false }

Expand Down Expand Up @@ -116,6 +120,11 @@ class StatefulStrategy(
LOGGER.info("Disabled allow messages before retransmission finishes by the '$reason' reason")
}

fun disableOutOfOrder(reason: String) = lock.write {
_outOfOrder = false
LOGGER.info("Disabled outOfOrder retransmission '$reason' reason")
}

fun <T> getReceiveMessageStrategy(func: ReceiveStrategy.() -> T) = lock.read {
receiveStrategy.func()
}
Expand Down Expand Up @@ -160,6 +169,7 @@ class StatefulStrategy(
lock.write {
state = state.resetAndCopyMissedMessages(config)
_allowMessagesBeforeRetransmissionFinishes = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false
_outOfOrder = state.config?.recoveryConfig?.outOfOrder ?: false
sendStrategy.sendHandler = defaultStrategy.sendStrategy.sendHandler
sendStrategy.sendPreprocessor = defaultStrategy.sendStrategy.sendPreprocessor
receiveStrategy.receivePreprocessor = defaultStrategy.receiveStrategy.receivePreprocessor
Expand All @@ -177,6 +187,7 @@ class StatefulStrategy(
lock.write {
state = state.resetAndCopyMissedMessages()
_allowMessagesBeforeRetransmissionFinishes = state.config?.allowMessagesBeforeRetransmissionFinishes ?: false
_outOfOrder = state.config?.recoveryConfig?.outOfOrder ?: false
sendStrategy.sendHandler = defaultStrategy.sendStrategy.sendHandler
sendStrategy.sendPreprocessor = defaultStrategy.sendStrategy.sendPreprocessor
receiveStrategy.receivePreprocessor = defaultStrategy.receiveStrategy.receivePreprocessor
Expand Down

0 comments on commit 373d07d

Please sign in to comment.