From 7a879a6c144b3f34583f0adc61d8c12ed3f6baa8 Mon Sep 17 00:00:00 2001 From: lumber1000 <45400511+lumber1000@users.noreply.github.com> Date: Thu, 14 Nov 2024 03:26:43 +0400 Subject: [PATCH] resumeExtract test resume extracting fixed --- .../handlers/messages/MessageExtractor.kt | 2 +- .../kotlin/handlers/messages/ExtractorTest.kt | 57 ++++++++++++++++++- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt index 4a9f49ec..369f218a 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt @@ -185,7 +185,7 @@ class MessageExtractor( streamSequence = streamSequence.dropWhile( when { resumeFromId != null -> { - { msg -> msg.direction == resumeFromId.stream.direction && sequenceComparator(msg.sequence, resumeFromId.sequence).also { firstNotFound = it } } + { msg -> (msg.direction != resumeFromId.stream.direction || sequenceComparator(msg.sequence, resumeFromId.sequence)).also { firstNotFound = it } } } else /* start != null */ -> { { msg -> timestampComparator(msg.timestamp, start).also { firstNotFound = it } } diff --git a/src/test/kotlin/handlers/messages/ExtractorTest.kt b/src/test/kotlin/handlers/messages/ExtractorTest.kt index f540992d..67124dee 100644 --- a/src/test/kotlin/handlers/messages/ExtractorTest.kt +++ b/src/test/kotlin/handlers/messages/ExtractorTest.kt @@ -85,7 +85,7 @@ class ExtractorTest { return SseMessageSearchRequest(parameters, FilterPredicate(emptyList())) } - private fun getMessage(timestamp: Instant, globalIndex: AtomicLong? = null): StoredMessage { + private fun getMessage(timestamp: Instant, globalIndex: AtomicLong? = null, direction: Direction = Direction.FIRST): StoredMessage { val msg = mockk() every { msg.timestamp } answers { timestamp } @@ -97,13 +97,13 @@ class ExtractorTest { every { msg.sessionAlias } answers { STREAM_NAME } every { msg.pageId } answers { PageId(BOOK_ID, Instant.MIN, "page_1") } every { msg.protocol } answers { "protocol" } - every { msg.direction } answers { Direction.FIRST } + every { msg.direction } answers { direction } every { msg.content } answers { byteArrayOf(1, 1, 1) } every { msg.id } answers { StoredMessageId( BOOK_ID, STREAM_NAME, - Direction.FIRST, + direction, timestamp, index ) @@ -114,6 +114,22 @@ class ExtractorTest { return msg } + private fun getMessages( + sequencePattern: List, + startTimestamp: Instant + ): List { + val indexFirst = AtomicLong(1) + val indexSecond = AtomicLong(1) + + return sequencePattern.map { direction -> + getMessage( + startTimestamp, + if (direction == Direction.FIRST) indexFirst else indexSecond, + direction + ) + } + } + @OptIn(DelicateCoroutinesApi::class) private fun mockContextWithCradleService(batch: StoredGroupedMessageBatch): ProtoContext { val context: ProtoContext = mockk() @@ -222,6 +238,41 @@ class ExtractorTest { } } + @Test + fun resumeExtract() { + val resumeIndex = 4 + val startTimestamp = Instant.parse("2022-04-21T10:00:00Z") + val messages = getMessages( + listOf(Direction.SECOND, Direction.SECOND, Direction.FIRST, Direction.SECOND, Direction.FIRST, Direction.FIRST), + startTimestamp + ) + val batch = StoredGroupedMessageBatch(SESSION_GROUP, messages, PageId(BookId("1"), startTimestamp, "1"), Instant.now()) + val context = mockContextWithCradleService(batch) + + val request = getSearchRequest( + startTimestamp, + startTimestamp.plus(1, ChronoUnit.MINUTES), + messages[resumeIndex].id + ) + + val extractedMessages: List + runBlocking { + val extractor = MessageExtractor(context, request, STREAM_NAME_OBJECT, this, 1, PipelineStatus()) + var extractedMessagesCollection: Collection = emptyList() + do { + val message = extractor.pollMessage() + if (message is PipelineRawBatch) + extractedMessagesCollection = message.storedBatchWrapper.trimmedMessages + } while (!message.streamEmpty) + extractedMessages = ArrayList(extractedMessagesCollection) + coroutineContext.cancelChildren() + } + + assertEquals(messages.size - resumeIndex, extractedMessages.size) + assertEquals(messages[resumeIndex].id, extractedMessages[0].id) + assertEquals(messages[resumeIndex + 1].id, extractedMessages[1].id) + } + private fun getOutOfStartBatch(startTimestamp: Instant, endTimestamp: Instant): StoredGroupedMessageBatch { val allMessages = mutableListOf() val index = AtomicLong(1L)