Skip to content

Commit

Permalink
Added recovery logs
Browse files Browse the repository at this point in the history
  • Loading branch information
denis.plotnikov committed Sep 30, 2024
1 parent 10d5315 commit fdb3a35
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 11 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ spec:
## 1.4.1

* Use keep open gRPC query to recover messages for Resend Request
* Update `com.exactpro.th2.gradle` plugin to `0.1.3`

## 1.4.0

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id "application"
id "com.exactpro.th2.gradle.component" version "0.1.1"
id "com.exactpro.th2.gradle.component" version "0.1.3"
id 'org.jetbrains.kotlin.jvm' version '1.8.22'
id "org.jetbrains.kotlin.kapt" version "1.8.22"
}
Expand Down
32 changes: 25 additions & 7 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -854,23 +854,36 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
Function1<ByteBuf, Boolean> processMessage = (buf) -> {
FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG);
FixField msgTypeField = findField(buf, MSG_TYPE_TAG);

LOGGER.info("Processing cradle recovery message {}", buf.toString(US_ASCII));

if(seqNum == null || seqNum.getValue() == null
|| msgTypeField == null || msgTypeField.getValue() == null) {
LOGGER.info("Dropping recovery message. Missing SeqNum tag: {}", buf.toString(US_ASCII));
return true;
}
int sequence = Integer.parseInt(seqNum.getValue());
String msgType = msgTypeField.getValue();

if(sequence < beginSeqNo) return true;
if(sequence > endSeq) return false;
if(sequence < beginSeqNo) {
LOGGER.info("Dropping recovery message. SeqNum is less than BeginSeqNo: {}", buf.toString(US_ASCII));
return true;
}
if(sequence > endSeq) {
LOGGER.info("Finishing recovery. SeqNum > EndSeq: {}", buf.toString(US_ASCII));
return false;
}

if(recoveryConfig.getSequenceResetForAdmin() && ADMIN_MESSAGES.contains(msgType)) return true;
if(recoveryConfig.getSequenceResetForAdmin() && ADMIN_MESSAGES.contains(msgType)) {
LOGGER.info("Dropping recovery message. Admin message sequence reset: {}", buf.toString(US_ASCII));
return true;
}
FixField possDup = findField(buf, POSS_DUP_TAG);
if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true;

if(sequence - 1 != lastProcessedSequence.get() ) {
int seqNo = Math.max(beginSeqNo, lastProcessedSequence.get() + 1);
LOGGER.error("Messages [{}, {}] couldn't be recovered", seqNo, sequence);
LOGGER.error("Messages [{}, {}] couldn't be recovered in the middle of recovery", seqNo, sequence);
StringBuilder sequenceReset = createSequenceReset(seqNo, sequence);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)),
strategy.getState().enrichProperties(),
Expand All @@ -884,6 +897,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
updateLength(buf);
updateChecksum(buf);
if(!skip.get()) {
LOGGER.info("Sending recovery message: {}", buf.toString(US_ASCII));
channel.send(buf, strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Expand All @@ -894,11 +908,13 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
}

if(skip.get() && recoveryConfig.getOutOfOrder()) {
LOGGER.info("Skipping recovery message. OutOfOrder: {}", buf.toString(US_ASCII));
skipped.set(buf);
skip.set(false);
}

if(!skip.get() && recoveryConfig.getOutOfOrder()) {
LOGGER.info("Sending recovery message. OutOfOrder: {}", skipped.get().toString(US_ASCII));
skip.set(true);
channel.send(skipped.get(), strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
Expand Down Expand Up @@ -926,7 +942,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
if(lastProcessedSequence.get() < endSeq && msgSeqNum.get() + 1 != lastProcessedSequence.get() + 1) {
int seqNo = Math.max(lastProcessedSequence.get() + 1, beginSeqNo);
int newSeqNo = msgSeqNum.get() + 1;
LOGGER.error("Messages [{}, {}] couldn't be recovered", seqNo, newSeqNo);
LOGGER.error("Messages [{}, {}] couldn't be recovered in the end of recovery", seqNo, newSeqNo);
String seqReset = createSequenceReset(seqNo, newSeqNo).toString();
channel.send(
Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)),
Expand Down Expand Up @@ -2323,8 +2339,8 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {
updateLength(missedMessage);
updateChecksum(missedMessage);

LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII));
if(!skip) {
LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII));
channel.send(missedMessage, strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Expand All @@ -2335,11 +2351,13 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {
}

if(skip && recoveryConfig.getOutOfOrder()) {
LOGGER.info("Skip recovery message out of order: {}", missedMessage.toString(US_ASCII));
skip = false;
skipped = missedMessage;
}

if(!skip && recoveryConfig.getOutOfOrder()) {
if(!skip && recoveryConfig.getOutOfOrder() && skipped != null) {
LOGGER.info("Sending recovery message from state out of order: {}", skipped.toString(US_ASCII));
channel.send(skipped, strategy.getState().enrichProperties(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
try {
Expand Down
25 changes: 22 additions & 3 deletions src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import kotlin.concurrent.withLock
import mu.KotlinLogging
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.text.Charsets.US_ASCII

class MessageLoader(
private val executor: ScheduledExecutorService,
Expand Down Expand Up @@ -145,7 +146,15 @@ class MessageLoader(
)
)

val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation
val firstValidMessage = firstValidMessageDetails(backwardIterator)

if (firstValidMessage == null) {
K_LOGGER.info { "Not found valid messages to recover." }
return@withCancellation
}
firstValidMessage.let {
K_LOGGER.info { "Backward search. First valid message seq num: ${it.payloadSequence} timestamp: ${it.timestamp} cradle sequence: ${it.messageSequence}" }
}

var messagesToSkip = firstValidMessage.payloadSequence - fromSequence

Expand All @@ -157,14 +166,19 @@ class MessageLoader(
continue
}
timestamp = message.messageId.timestamp
val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt()

K_LOGGER.debug { "Backward search: Skip message with sequence - $sequence" }

messagesToSkip -= 1
if(messagesToSkip == 0L) {

val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue
sequence ?: continue

if(sequence > 1 && lastProcessedSequence == 1 || sequence > 2 && lastProcessedSequence == 2) {
skipRetransmission = true
K_LOGGER.info { "Retransmission will be skipped. Not found valid message with sequence more than 1." }
return@withCancellation
}

Expand All @@ -175,17 +189,21 @@ class MessageLoader(

timestamp = validMessage.timestamp
if(validMessage.payloadSequence <= fromSequence) {
K_LOGGER.info { "Found valid message with start recovery sequence: ${buf.toString(US_ASCII)}" }
break
} else {
messagesToSkip = validMessage.payloadSequence - fromSequence
K_LOGGER.info { "Adjusted number of messages to skip: $messagesToSkip using ${validMessage.payloadSequence} - $fromSequence" }
}

} else {

if(sequence <= fromSequence) {
K_LOGGER.info { "Found valid message with start recovery sequence: ${buf.toString(US_ASCII)}" }
break
} else {
messagesToSkip = sequence - fromSequence
K_LOGGER.info { "Adjusted number of messages to skip: $messagesToSkip using $sequence - $fromSequence" }
}
}
}
Expand Down Expand Up @@ -213,6 +231,7 @@ class MessageLoader(

while (iterator.hasNext()) {
val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray())
K_LOGGER.info { "Sending message to recovery processor: ${message.toString(US_ASCII)}" }
if (!processMessage(message)) break
}
}.onFailure {
Expand Down

0 comments on commit fdb3a35

Please sign in to comment.