Skip to content

Commit

Permalink
fixed: filtered out next message after resumeId message (second messa…
Browse files Browse the repository at this point in the history
…ge in stream)
  • Loading branch information
lumber1000 committed Nov 27, 2024
1 parent d89f937 commit 11a3cae
Showing 1 changed file with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class MessageExtractor<B, G, RM, PM>(
BEFORE -> REVERSE
}

private val sequenceComparator = when (order) {
DIRECT -> { l1: Long, l2: Long -> l1 < l2 }
REVERSE -> { l1: Long, l2: Long -> l2 < l1 }
private val sequenceComparator: (Long, Long) -> Boolean = when (order) {
DIRECT -> { resumeSequence, messageSequence -> messageSequence >= resumeSequence }
REVERSE -> { resumeSequence, messageSequence -> messageSequence <= resumeSequence }
}

private val timestampComparator = when (order) {
Expand Down Expand Up @@ -183,8 +183,13 @@ class MessageExtractor<B, G, RM, PM>(

val resumeIndex = if (isResumeIdNotFound) {
resumeFromId!! // if isResumeIdNotFound==true, resumeFromId is always not null
val idx = orderedMessages.indexOfFirst { msg -> (msg.direction == resumeFromId.stream.direction && !sequenceComparator(msg.sequence, resumeFromId.sequence)).also { isResumeIdNotFound = it } }
if (idx == -1) orderedMessages.size else idx
val idx = orderedMessages.indexOfFirst { msg -> (msg.direction == resumeFromId.stream.direction && sequenceComparator(resumeFromId.sequence, msg.sequence)) }
if (idx == -1) {
orderedMessages.size
} else {
isResumeIdNotFound = false
idx
}
} else 0

val startIndex = if (isStartTimestampNotFound) {
Expand Down

0 comments on commit 11a3cae

Please sign in to comment.