Skip to content

Commit

Permalink
TH2-1077 change get nearest message to get current batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Paleontolog committed Nov 25, 2020
1 parent 92c8578 commit 185e30a
Showing 1 changed file with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import mu.KotlinLogging
import java.lang.Integer.min
import java.sql.Time
import java.time.Instant
import java.time.LocalTime
import java.time.ZoneOffset
Expand Down Expand Up @@ -97,6 +98,22 @@ class SearchMessagesHandler(

}

private suspend fun getFirstMessageCurrentDay(
timestamp: Instant,
stream: String,
direction: Direction
): StoredMessageId? {
for (timeRelation in listOf(TimeRelation.BEFORE, TimeRelation.AFTER)) {
cradle.getFirstMessageIdSuspend(
timestamp,
stream,
direction,
timeRelation
)?.let { return it }
}
return null
}

private suspend fun getFirstMessageIdDifferentDays(
startTimestamp: Instant,
stream: String,
Expand All @@ -108,16 +125,17 @@ class SearchMessagesHandler(
var timestamp = startTimestamp
var messageId: StoredMessageId? = null
while (messageId == null && daysChecking >= 0) {
messageId = cradle.getFirstMessageIdSuspend(
timestamp,
stream,
direction,
// with TimeRelation.BEFORE we always select current batch
if (isCurrentDay)
TimeRelation.BEFORE
else
timelineDirection
)
messageId =
if (isCurrentDay) {
getFirstMessageCurrentDay(timestamp, stream, direction)
} else {
cradle.getFirstMessageIdSuspend(
timestamp,
stream,
direction,
timelineDirection
)
}
daysChecking -= 1
isCurrentDay = false
timestamp = nextDay(timestamp, timelineDirection)
Expand All @@ -136,7 +154,7 @@ class SearchMessagesHandler(
getFirstMessageIdDifferentDays(timestamp, stream, direction, request.timelineDirection)
if (storedMessageId != null) {
val messageBatch = cradle.getMessageBatchSuspend(storedMessageId)
.filter { it.direction == direction }

put(
Pair(stream, direction),
getNearestMessage(messageBatch, request, timestamp)?.id
Expand Down

0 comments on commit 185e30a

Please sign in to comment.