Skip to content

Commit

Permalink
fix recovery after sequence reset
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Plotnikov committed Jan 8, 2024
1 parent c33d5fb commit 58e88d1
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class MessageLoader(
processMessage: (ByteBuf) -> Boolean
) {
var timestamp: Timestamp? = null
var skipRetransmission = false
ProviderCall.withCancellation {
val backwardIterator = dataProvider.searchMessageGroups(
createSearchGroupRequest(
Expand All @@ -142,7 +143,7 @@ class MessageLoader(
var messagesToSkip = firstValidMessage.payloadSequence - fromSequence

timestamp = firstValidMessage.timestamp

var lastProcessedSequence = -1
while (backwardIterator.hasNext() && messagesToSkip > 0) {
val message = backwardIterator.next().message
if(compare(message.messageId.timestamp, previousDaySessionStart) <= 0) {
Expand All @@ -155,6 +156,13 @@ class MessageLoader(
val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue

if(sequence == 1 && lastProcessedSequence > 1 || sequence == 2 && lastProcessedSequence > 2) {
skipRetransmission = true
return@withCancellation
}

lastProcessedSequence = sequence

if(checkPossDup(buf)) {
val validMessage = firstValidMessageDetails(backwardIterator) ?: break

Expand All @@ -177,6 +185,8 @@ class MessageLoader(
}
}

if(skipRetransmission) return

val startSearchTimestamp = timestamp ?: return

K_LOGGER.info { "Loading retransmission messages from ${startSearchTimestamp.toInstant()}" }
Expand Down

0 comments on commit 58e88d1

Please sign in to comment.