Skip to content

Commit

Permalink
Merge branch 'TS-1928' into TS-2674
Browse files Browse the repository at this point in the history
  • Loading branch information
denis.plotnikov committed Oct 3, 2024
2 parents bf4f626 + 5b69604 commit b166e3d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ spec:
## 1.4.1

* Use keep open gRPC query to recover messages for Resend Request
* Update `com.exactpro.th2.gradle` plugin to `0.1.3`

## 1.4.0

Expand Down
32 changes: 25 additions & 7 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -887,23 +887,36 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
Function1<ByteBuf, Boolean> processMessage = (buf) -> {
FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG);
FixField msgTypeField = findField(buf, MSG_TYPE_TAG);

LOGGER.info("Processing cradle recovery message {}", buf.toString(US_ASCII));

if(seqNum == null || seqNum.getValue() == null
|| msgTypeField == null || msgTypeField.getValue() == null) {
LOGGER.info("Dropping recovery message. Missing SeqNum tag: {}", buf.toString(US_ASCII));
return true;
}
int sequence = Integer.parseInt(seqNum.getValue());
String msgType = msgTypeField.getValue();

if(sequence < beginSeqNo) return true;
if(sequence > endSeq) return false;
if(sequence < beginSeqNo) {
LOGGER.info("Dropping recovery message. SeqNum is less than BeginSeqNo: {}", buf.toString(US_ASCII));
return true;
}
if(sequence > endSeq) {
LOGGER.info("Finishing recovery. SeqNum > EndSeq: {}", buf.toString(US_ASCII));
return false;
}

if(recoveryConfig.getSequenceResetForAdmin() && ADMIN_MESSAGES.contains(msgType)) return true;
if(recoveryConfig.getSequenceResetForAdmin() && ADMIN_MESSAGES.contains(msgType)) {
LOGGER.info("Dropping recovery message. Admin message sequence reset: {}", buf.toString(US_ASCII));
return true;
}
FixField possDup = findField(buf, POSS_DUP_TAG);
if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true;

if(sequence - 1 != lastProcessedSequence.get() ) {
int seqNo = Math.max(beginSeqNo, lastProcessedSequence.get() + 1);
LOGGER.error("Messages [{}, {}] couldn't be recovered", seqNo, sequence);
LOGGER.error("Messages [{}, {}] couldn't be recovered in the middle of recovery", seqNo, sequence);
StringBuilder sequenceReset = createSequenceReset(seqNo, sequence);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)),
strategy.getState().enrichProperties(),
Expand All @@ -917,6 +930,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
updateLength(buf);
updateChecksum(buf);
if(!skip.get()) {
LOGGER.info("Sending recovery message: {}", buf.toString(US_ASCII));
channel.send(buf, strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Expand All @@ -927,11 +941,13 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
}

if(skip.get() && recoveryConfig.getOutOfOrder()) {
LOGGER.info("Skipping recovery message. OutOfOrder: {}", buf.toString(US_ASCII));
skipped.set(buf);
skip.set(false);
}

if(!skip.get() && recoveryConfig.getOutOfOrder()) {
LOGGER.info("Sending recovery message. OutOfOrder: {}", skipped.get().toString(US_ASCII));
skip.set(true);
channel.send(skipped.get(), strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
Expand Down Expand Up @@ -959,7 +975,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
if(lastProcessedSequence.get() < endSeq && msgSeqNum.get() + 1 != lastProcessedSequence.get() + 1) {
int seqNo = Math.max(lastProcessedSequence.get() + 1, beginSeqNo);
int newSeqNo = msgSeqNum.get() + 1;
LOGGER.error("Messages [{}, {}] couldn't be recovered", seqNo, newSeqNo);
LOGGER.error("Messages [{}, {}] couldn't be recovered in the end of recovery", seqNo, newSeqNo);
String seqReset = createSequenceReset(seqNo, newSeqNo).toString();
channel.send(
Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)),
Expand Down Expand Up @@ -2387,8 +2403,8 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {
updateLength(missedMessage);
updateChecksum(missedMessage);

LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII));
if(!skip) {
LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII));
channel.send(missedMessage, strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Expand All @@ -2399,11 +2415,13 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {
}

if(skip && recoveryConfig.getOutOfOrder()) {
LOGGER.info("Skip recovery message out of order: {}", missedMessage.toString(US_ASCII));
skip = false;
skipped = missedMessage;
}

if(!skip && recoveryConfig.getOutOfOrder()) {
if(!skip && recoveryConfig.getOutOfOrder() && skipped != null) {
LOGGER.info("Sending recovery message from state out of order: {}", skipped.toString(US_ASCII));
channel.send(skipped, strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Expand Down
33 changes: 28 additions & 5 deletions src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import kotlin.concurrent.withLock
import mu.KotlinLogging
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.text.Charsets.US_ASCII

class MessageLoader(
private val executor: ScheduledExecutorService,
Expand Down Expand Up @@ -142,10 +143,20 @@ class MessageLoader(
sessionGroup = sessionGroup,
sessionAlias = sessionAlias,
direction = direction
)
).also {
K_LOGGER.info { "Backward iterator params: sessionAlias - $sessionAlias from - ${it.startTimestamp} to - ${it.endTimestamp}" }
}
)

val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation
val firstValidMessage = firstValidMessageDetails(backwardIterator)

if (firstValidMessage == null) {
K_LOGGER.info { "Not found valid messages to recover." }
return@withCancellation
}
firstValidMessage.let {
K_LOGGER.info { "Backward search. First valid message seq num: ${it.payloadSequence} timestamp: ${it.timestamp} cradle sequence: ${it.messageSequence}" }
}

var messagesToSkip = firstValidMessage.payloadSequence - fromSequence

Expand All @@ -157,14 +168,19 @@ class MessageLoader(
continue
}
timestamp = message.messageId.timestamp
val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt()

K_LOGGER.debug { "Backward search: Skip message with sequence - $sequence" }

messagesToSkip -= 1
if(messagesToSkip == 0L) {

val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue
sequence ?: continue

if(sequence > 1 && lastProcessedSequence == 1 || sequence > 2 && lastProcessedSequence == 2) {
skipRetransmission = true
K_LOGGER.info { "Retransmission will be skipped. Not found valid message with sequence more than 1." }
return@withCancellation
}

Expand All @@ -175,17 +191,21 @@ class MessageLoader(

timestamp = validMessage.timestamp
if(validMessage.payloadSequence <= fromSequence) {
K_LOGGER.info { "Found valid message with start recovery sequence: ${buf.toString(US_ASCII)}" }
break
} else {
messagesToSkip = validMessage.payloadSequence - fromSequence
K_LOGGER.info { "Adjusted number of messages to skip: $messagesToSkip using ${validMessage.payloadSequence} - $fromSequence" }
}

} else {

if(sequence <= fromSequence) {
K_LOGGER.info { "Found valid message with start recovery sequence: ${buf.toString(US_ASCII)}" }
break
} else {
messagesToSkip = sequence - fromSequence
K_LOGGER.info { "Adjusted number of messages to skip: $messagesToSkip using $sequence - $fromSequence" }
}
}
}
Expand All @@ -208,11 +228,14 @@ class MessageLoader(
direction = direction,
timeRelation = TimeRelation.NEXT,
keepOpen = true,
),
).also {
K_LOGGER.info { "Forward iterator params: sessionAlias - $sessionAlias from - ${it.startTimestamp} to - ${it.endTimestamp}" }
}
)

while (iterator.hasNext()) {
val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray())
K_LOGGER.info { "Sending message to recovery processor: ${message.toString(US_ASCII)}" }
if (!processMessage(message)) break
}
}.onFailure {
Expand Down

0 comments on commit b166e3d

Please sign in to comment.