From 58e88d1aa5f0f69a071c91f573ddff70e2abd514 Mon Sep 17 00:00:00 2001 From: Denis Plotnikov Date: Mon, 8 Jan 2024 12:20:04 +0400 Subject: [PATCH] fix recovery after sequence reset --- .../com/exactpro/th2/conn/dirty/fix/MessageLoader.kt | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt index 2a4977d6..75e5ba1c 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt @@ -127,6 +127,7 @@ class MessageLoader( processMessage: (ByteBuf) -> Boolean ) { var timestamp: Timestamp? = null + var skipRetransmission = false ProviderCall.withCancellation { val backwardIterator = dataProvider.searchMessageGroups( createSearchGroupRequest( @@ -142,7 +143,7 @@ class MessageLoader( var messagesToSkip = firstValidMessage.payloadSequence - fromSequence timestamp = firstValidMessage.timestamp - + var lastProcessedSequence = -1 while (backwardIterator.hasNext() && messagesToSkip > 0) { val message = backwardIterator.next().message if(compare(message.messageId.timestamp, previousDaySessionStart) <= 0) { @@ -155,6 +156,13 @@ class MessageLoader( val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue + if(sequence == 1 && lastProcessedSequence > 1 || sequence == 2 && lastProcessedSequence > 2) { + skipRetransmission = true + return@withCancellation + } + + lastProcessedSequence = sequence + if(checkPossDup(buf)) { val validMessage = firstValidMessageDetails(backwardIterator) ?: break @@ -177,6 +185,8 @@ class MessageLoader( } } + if(skipRetransmission) return + val startSearchTimestamp = timestamp ?: return K_LOGGER.info { "Loading retransmission messages from ${startSearchTimestamp.toInstant()}" }