diff --git a/README.md b/README.md index 923fa67..0a6c606 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,7 @@ spec: ## 1.4.1 * Use keep open gRPC query to recover messages for Resend Request +* Update `com.exactpro.th2.gradle` plugin to `0.1.3` ## 1.4.0 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 1879d8e..dbc6314 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -887,23 +887,36 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi Function1 processMessage = (buf) -> { FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); FixField msgTypeField = findField(buf, MSG_TYPE_TAG); + + LOGGER.info("Processing cradle recovery message {}", buf.toString(US_ASCII)); + if(seqNum == null || seqNum.getValue() == null || msgTypeField == null || msgTypeField.getValue() == null) { + LOGGER.info("Dropping recovery message. Missing SeqNum tag: {}", buf.toString(US_ASCII)); return true; } int sequence = Integer.parseInt(seqNum.getValue()); String msgType = msgTypeField.getValue(); - if(sequence < beginSeqNo) return true; - if(sequence > endSeq) return false; + if(sequence < beginSeqNo) { + LOGGER.info("Dropping recovery message. SeqNum is less than BeginSeqNo: {}", buf.toString(US_ASCII)); + return true; + } + if(sequence > endSeq) { + LOGGER.info("Finishing recovery. SeqNum > EndSeq: {}", buf.toString(US_ASCII)); + return false; + } - if(recoveryConfig.getSequenceResetForAdmin() && ADMIN_MESSAGES.contains(msgType)) return true; + if(recoveryConfig.getSequenceResetForAdmin() && ADMIN_MESSAGES.contains(msgType)) { + LOGGER.info("Dropping recovery message. Admin message sequence reset: {}", buf.toString(US_ASCII)); + return true; + } FixField possDup = findField(buf, POSS_DUP_TAG); if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; if(sequence - 1 != lastProcessedSequence.get() ) { int seqNo = Math.max(beginSeqNo, lastProcessedSequence.get() + 1); - LOGGER.error("Messages [{}, {}] couldn't be recovered", seqNo, sequence); + LOGGER.error("Messages [{}, {}] couldn't be recovered in the middle of recovery", seqNo, sequence); StringBuilder sequenceReset = createSequenceReset(seqNo, sequence); channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), strategy.getState().enrichProperties(), @@ -917,6 +930,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi updateLength(buf); updateChecksum(buf); if(!skip.get()) { + LOGGER.info("Sending recovery message: {}", buf.toString(US_ASCII)); channel.send(buf, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { @@ -927,11 +941,13 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi } if(skip.get() && recoveryConfig.getOutOfOrder()) { + LOGGER.info("Skipping recovery message. OutOfOrder: {}", buf.toString(US_ASCII)); skipped.set(buf); skip.set(false); } if(!skip.get() && recoveryConfig.getOutOfOrder()) { + LOGGER.info("Sending recovery message. OutOfOrder: {}", skipped.get().toString(US_ASCII)); skip.set(true); channel.send(skipped.get(), strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); @@ -959,7 +975,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi if(lastProcessedSequence.get() < endSeq && msgSeqNum.get() + 1 != lastProcessedSequence.get() + 1) { int seqNo = Math.max(lastProcessedSequence.get() + 1, beginSeqNo); int newSeqNo = msgSeqNum.get() + 1; - LOGGER.error("Messages [{}, {}] couldn't be recovered", seqNo, newSeqNo); + LOGGER.error("Messages [{}, {}] couldn't be recovered in the end of recovery", seqNo, newSeqNo); String seqReset = createSequenceReset(seqNo, newSeqNo).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), @@ -2387,8 +2403,8 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { updateLength(missedMessage); updateChecksum(missedMessage); - LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII)); if(!skip) { + LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII)); channel.send(missedMessage, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { @@ -2399,11 +2415,13 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { } if(skip && recoveryConfig.getOutOfOrder()) { + LOGGER.info("Skip recovery message out of order: {}", missedMessage.toString(US_ASCII)); skip = false; skipped = missedMessage; } - if(!skip && recoveryConfig.getOutOfOrder()) { + if(!skip && recoveryConfig.getOutOfOrder() && skipped != null) { + LOGGER.info("Sending recovery message from state out of order: {}", skipped.toString(US_ASCII)); channel.send(skipped, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt index 23992a5..406b44d 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt @@ -47,6 +47,7 @@ import kotlin.concurrent.withLock import mu.KotlinLogging import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit +import kotlin.text.Charsets.US_ASCII class MessageLoader( private val executor: ScheduledExecutorService, @@ -142,10 +143,20 @@ class MessageLoader( sessionGroup = sessionGroup, sessionAlias = sessionAlias, direction = direction - ) + ).also { + K_LOGGER.info { "Backward iterator params: sessionAlias - $sessionAlias from - ${it.startTimestamp} to - ${it.endTimestamp}" } + } ) - val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation + val firstValidMessage = firstValidMessageDetails(backwardIterator) + + if (firstValidMessage == null) { + K_LOGGER.info { "Not found valid messages to recover." } + return@withCancellation + } + firstValidMessage.let { + K_LOGGER.info { "Backward search. First valid message seq num: ${it.payloadSequence} timestamp: ${it.timestamp} cradle sequence: ${it.messageSequence}" } + } var messagesToSkip = firstValidMessage.payloadSequence - fromSequence @@ -157,14 +168,19 @@ class MessageLoader( continue } timestamp = message.messageId.timestamp + val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) + val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() + + K_LOGGER.debug { "Backward search: Skip message with sequence - $sequence" } + messagesToSkip -= 1 if(messagesToSkip == 0L) { - val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) - val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue + sequence ?: continue if(sequence > 1 && lastProcessedSequence == 1 || sequence > 2 && lastProcessedSequence == 2) { skipRetransmission = true + K_LOGGER.info { "Retransmission will be skipped. Not found valid message with sequence more than 1." } return@withCancellation } @@ -175,17 +191,21 @@ class MessageLoader( timestamp = validMessage.timestamp if(validMessage.payloadSequence <= fromSequence) { + K_LOGGER.info { "Found valid message with start recovery sequence: ${buf.toString(US_ASCII)}" } break } else { messagesToSkip = validMessage.payloadSequence - fromSequence + K_LOGGER.info { "Adjusted number of messages to skip: $messagesToSkip using ${validMessage.payloadSequence} - $fromSequence" } } } else { if(sequence <= fromSequence) { + K_LOGGER.info { "Found valid message with start recovery sequence: ${buf.toString(US_ASCII)}" } break } else { messagesToSkip = sequence - fromSequence + K_LOGGER.info { "Adjusted number of messages to skip: $messagesToSkip using $sequence - $fromSequence" } } } } @@ -208,11 +228,14 @@ class MessageLoader( direction = direction, timeRelation = TimeRelation.NEXT, keepOpen = true, - ), + ).also { + K_LOGGER.info { "Forward iterator params: sessionAlias - $sessionAlias from - ${it.startTimestamp} to - ${it.endTimestamp}" } + } ) while (iterator.hasNext()) { val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray()) + K_LOGGER.info { "Sending message to recovery processor: ${message.toString(US_ASCII)}" } if (!processMessage(message)) break } }.onFailure {