Skip to content

Commit

Permalink
CradleServiceTest
Browse files Browse the repository at this point in the history
reverse search test
`fullName` field removed from serialization in CommonStreamName & StreamName
  • Loading branch information
lumber1000 committed Nov 26, 2024
1 parent 4caebc7 commit d89f937
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,12 +18,14 @@ package com.exactpro.th2.rptdataprovider.entities.internal

import com.exactpro.cradle.BookId
import com.exactpro.cradle.Direction
import com.fasterxml.jackson.annotation.JsonIgnore
import java.time.Instant

class CommonStreamName(
val bookId: BookId,
val name: String
) {
@JsonIgnore
internal val fullName = "${bookId.name}:$name"

override fun toString(): String {
Expand Down Expand Up @@ -55,6 +57,7 @@ class StreamName(
val name: String
get() = common.name

@JsonIgnore
internal val fullName = "${common.fullName}:$direction"

override fun toString(): String {
Expand All @@ -78,7 +81,6 @@ class StreamName(
}
}


data class StreamPointer(
val stream: StreamName,
val sequence: Long,
Expand All @@ -98,5 +100,4 @@ data class StreamPointer(
timestamp = timestamp,
hasStarted = hasStarted
)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.rptdataprovider.services.cradle

import com.exactpro.cradle.CradleManager
import com.exactpro.cradle.CradleStorage
import com.exactpro.cradle.messages.GroupedMessageFilter
import com.exactpro.cradle.messages.StoredGroupedMessageBatch
import com.exactpro.cradle.resultset.CradleResultSet
import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration
import com.exactpro.th2.rptdataprovider.entities.configuration.CustomConfigurationClass
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.junit.jupiter.api.Test

import org.junit.jupiter.api.Assertions.*

class CradleServiceTest {
@Test
fun `retrieve empty stream`() {
val configuration = Configuration(CustomConfigurationClass())
val cradleManager = mockk<CradleManager>()
val cradleStorage = mockk<CradleStorage>()
val resultSet = mockk<CradleResultSet<StoredGroupedMessageBatch>>()

every { resultSet.hasNext() } answers { false }
every { cradleManager.storage } answers { cradleStorage }
every { cradleStorage.getGroupedMessageBatches(any()) } answers { resultSet }

val cradleService = CradleService(configuration, cradleManager)

val coroutineScope = CoroutineScope(Dispatchers.Default)

runBlocking {
val channel = cradleService.getGroupedMessages(coroutineScope, GroupedMessageFilter(null, null))
var itemCounter = 0

withTimeout(50) {
for (item in channel) {
itemCounter++
}
}

assertEquals(0, itemCounter)
}
}
}
62 changes: 53 additions & 9 deletions src/test/kotlin/handlers/messages/ExtractorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package handlers.messages
import com.exactpro.cradle.BookId
import com.exactpro.cradle.Direction
import com.exactpro.cradle.PageId
import com.exactpro.cradle.TimeRelation
import com.exactpro.cradle.messages.StoredGroupedMessageBatch
import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.cradle.messages.StoredMessageId
Expand Down Expand Up @@ -54,6 +55,7 @@ import java.time.temporal.ChronoUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.stream.Stream
import kotlin.math.max
import kotlin.math.min
import kotlin.random.Random

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Expand All @@ -73,7 +75,8 @@ class ExtractorTest {
private fun getSearchRequest(
startTimestamp: Instant?,
endTimestamp: Instant,
resumeId: StoredMessageId? = null
resumeId: StoredMessageId? = null,
searchDirection: TimeRelation? = null
): SseMessageSearchRequest<ProtoRawMessage, Message> {
val parameters = mutableMapOf(
"stream" to listOf(STREAM_NAME),
Expand All @@ -88,6 +91,16 @@ class ExtractorTest {
if (resumeId != null) {
parameters["messageId"] = listOf(resumeId.toString())
}

if (searchDirection != null) {
parameters["searchDirection"] = listOf(
when (searchDirection) {
TimeRelation.AFTER -> "next"
TimeRelation.BEFORE -> "previous"
}
)
}

return SseMessageSearchRequest(parameters, FilterPredicate(emptyList()))
}

Expand Down Expand Up @@ -148,7 +161,16 @@ class ExtractorTest {
val cradle = mockk<CradleService>()
coEvery {
cradle.getSessionGroup(any(), any(), any(), any())
} answers { SESSION_GROUP }
} answers {
val from = invocation.args[2] as Instant?
val to = invocation.args[3] as Instant?

if (from != null && to != null && from.isAfter(to)) {
throw IllegalArgumentException("`from` timestamp is after `to` timestamp")
}

SESSION_GROUP
}

coEvery { cradle.getGroupedMessages(any(), any()) } answers {
runBlocking {
Expand Down Expand Up @@ -251,7 +273,8 @@ class ExtractorTest {
val timestampIncrementMillis: Long = 1_000,
val isUnorderedBatch: Boolean = false,
val byResumeIdIndex: Int? = 4,
val byStartTimestampIndex: Int? = null
val byStartTimestampIndex: Int? = null,
val searchDirection: TimeRelation? = null
)

private fun resumeExtractParamsProvider() = Stream.of(
Expand All @@ -261,7 +284,9 @@ class ExtractorTest {
ResumeExtractArgs(byResumeIdIndex = null, byStartTimestampIndex = 5),
ResumeExtractArgs(byResumeIdIndex = 3, byStartTimestampIndex = null),
ResumeExtractArgs(byResumeIdIndex = 4, byStartTimestampIndex = 5),
ResumeExtractArgs(byResumeIdIndex = 5, byStartTimestampIndex = 4)
ResumeExtractArgs(byResumeIdIndex = 5, byStartTimestampIndex = 4),
ResumeExtractArgs(byResumeIdIndex = 5, byStartTimestampIndex = 4),
ResumeExtractArgs(byResumeIdIndex = null, byStartTimestampIndex = 1, searchDirection = TimeRelation.BEFORE)
)

private fun MutableList<StoredMessage>.swap(idx1: Int, idx2: Int) {
Expand All @@ -281,6 +306,7 @@ class ExtractorTest {
startTimestamp,
args.timestampIncrementMillis
)
val searchDirection = args.searchDirection ?: TimeRelation.AFTER

val batchMessages = arrayListOf<StoredMessage>().apply { addAll(messages) }

Expand All @@ -298,10 +324,16 @@ class ExtractorTest {

val resumeId = if (resumeIndex != null) messages[resumeIndex].id else null

val intervalMillis = messages.size * args.timestampIncrementMillis + 1

val request = getSearchRequest(
requestTimestamp,
startTimestamp.plus(1, ChronoUnit.MINUTES),
resumeId
when (searchDirection){
TimeRelation.AFTER -> startTimestamp.plusMillis(intervalMillis)
TimeRelation.BEFORE -> startTimestamp
},
resumeId,
searchDirection
)

val extractedMessages: List<StoredMessage>
Expand All @@ -317,13 +349,25 @@ class ExtractorTest {
coroutineContext.cancelChildren()
}

val expectedResumeIndex: Int = max(resumeIndex ?: 0, startTimestampIndex ?: 0)
val expectedSize = messages.size - expectedResumeIndex
val expectedResumeIndex: Int = when(searchDirection){
TimeRelation.AFTER -> max(resumeIndex ?: 0, startTimestampIndex ?: 0)
TimeRelation.BEFORE -> min(resumeIndex ?: Int.MAX_VALUE, startTimestampIndex ?: Int.MAX_VALUE)
}

val expectedSize: Int = when(searchDirection){
TimeRelation.AFTER -> messages.size - expectedResumeIndex
TimeRelation.BEFORE -> expectedResumeIndex + 1
}

assertEquals(expectedSize, extractedMessages.size)

for (i in extractedMessages.indices) {
assertEquals(messages[expectedResumeIndex + i].id, extractedMessages[i].id)
val msgIdx: Int = when(searchDirection){
TimeRelation.AFTER -> expectedResumeIndex + i
TimeRelation.BEFORE -> expectedResumeIndex - i
}

assertEquals(messages[msgIdx].id, extractedMessages[i].id)
}
}

Expand Down

0 comments on commit d89f937

Please sign in to comment.