Skip to content

Commit

Permalink
resumeExtract test
Browse files Browse the repository at this point in the history
resume extracting fixed
  • Loading branch information
lumber1000 committed Nov 13, 2024
1 parent fdc69dd commit 7a879a6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class MessageExtractor<B, G, RM, PM>(
streamSequence = streamSequence.dropWhile(
when {
resumeFromId != null -> {
{ msg -> msg.direction == resumeFromId.stream.direction && sequenceComparator(msg.sequence, resumeFromId.sequence).also { firstNotFound = it } }
{ msg -> (msg.direction != resumeFromId.stream.direction || sequenceComparator(msg.sequence, resumeFromId.sequence)).also { firstNotFound = it } }
}
else /* start != null */ -> {
{ msg -> timestampComparator(msg.timestamp, start).also { firstNotFound = it } }
Expand Down
57 changes: 54 additions & 3 deletions src/test/kotlin/handlers/messages/ExtractorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ExtractorTest {
return SseMessageSearchRequest(parameters, FilterPredicate(emptyList()))
}

private fun getMessage(timestamp: Instant, globalIndex: AtomicLong? = null): StoredMessage {
private fun getMessage(timestamp: Instant, globalIndex: AtomicLong? = null, direction: Direction = Direction.FIRST): StoredMessage {
val msg = mockk<StoredMessage>()

every { msg.timestamp } answers { timestamp }
Expand All @@ -97,13 +97,13 @@ class ExtractorTest {
every { msg.sessionAlias } answers { STREAM_NAME }
every { msg.pageId } answers { PageId(BOOK_ID, Instant.MIN, "page_1") }
every { msg.protocol } answers { "protocol" }
every { msg.direction } answers { Direction.FIRST }
every { msg.direction } answers { direction }
every { msg.content } answers { byteArrayOf(1, 1, 1) }
every { msg.id } answers {
StoredMessageId(
BOOK_ID,
STREAM_NAME,
Direction.FIRST,
direction,
timestamp,
index
)
Expand All @@ -114,6 +114,22 @@ class ExtractorTest {
return msg
}

private fun getMessages(
sequencePattern: List<Direction>,
startTimestamp: Instant
): List<StoredMessage> {
val indexFirst = AtomicLong(1)
val indexSecond = AtomicLong(1)

return sequencePattern.map { direction ->
getMessage(
startTimestamp,
if (direction == Direction.FIRST) indexFirst else indexSecond,
direction
)
}
}

@OptIn(DelicateCoroutinesApi::class)
private fun mockContextWithCradleService(batch: StoredGroupedMessageBatch): ProtoContext {
val context: ProtoContext = mockk()
Expand Down Expand Up @@ -222,6 +238,41 @@ class ExtractorTest {
}
}

@Test
fun resumeExtract() {
val resumeIndex = 4
val startTimestamp = Instant.parse("2022-04-21T10:00:00Z")
val messages = getMessages(
listOf(Direction.SECOND, Direction.SECOND, Direction.FIRST, Direction.SECOND, Direction.FIRST, Direction.FIRST),
startTimestamp
)
val batch = StoredGroupedMessageBatch(SESSION_GROUP, messages, PageId(BookId("1"), startTimestamp, "1"), Instant.now())
val context = mockContextWithCradleService(batch)

val request = getSearchRequest(
startTimestamp,
startTimestamp.plus(1, ChronoUnit.MINUTES),
messages[resumeIndex].id
)

val extractedMessages: List<StoredMessage>
runBlocking {
val extractor = MessageExtractor(context, request, STREAM_NAME_OBJECT, this, 1, PipelineStatus())
var extractedMessagesCollection: Collection<StoredMessage> = emptyList()
do {
val message = extractor.pollMessage()
if (message is PipelineRawBatch)
extractedMessagesCollection = message.storedBatchWrapper.trimmedMessages
} while (!message.streamEmpty)
extractedMessages = ArrayList(extractedMessagesCollection)
coroutineContext.cancelChildren()
}

assertEquals(messages.size - resumeIndex, extractedMessages.size)
assertEquals(messages[resumeIndex].id, extractedMessages[0].id)
assertEquals(messages[resumeIndex + 1].id, extractedMessages[1].id)
}

private fun getOutOfStartBatch(startTimestamp: Instant, endTimestamp: Instant): StoredGroupedMessageBatch {
val allMessages = mutableListOf<StoredMessage>()
val index = AtomicLong(1L)
Expand Down

0 comments on commit 7a879a6

Please sign in to comment.