Skip to content

Commit

Permalink
[TS-2459] implemented test for reproducing problem
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jul 4, 2024
1 parent 7228051 commit c86a82d
Showing 1 changed file with 130 additions and 47 deletions.
177 changes: 130 additions & 47 deletions src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.junit.jupiter.api.assertAll
import org.mockito.kotlin.any
import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.anyVararg
import org.mockito.kotlin.argThat
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.clearInvocations
import org.mockito.kotlin.description
Expand All @@ -75,6 +76,9 @@ import kotlin.test.assertNotNull
import kotlin.text.Charsets.US_ASCII
import com.exactpro.th2.common.grpc.Event as ProtoEvent

private const val TEST_BOOK = "test-book"
private const val TEST_SCOPE = "test-scope"

class TestStrategies {

private class TestContext(
Expand Down Expand Up @@ -374,18 +378,17 @@ class TestStrategies {

@Test
fun `outgoing gap strategy test`() {
val defaultRuleDuration = Duration.of(1, SECONDS)
val businessRuleDuration = Duration.of(1, SECONDS)
val businessRuleCleanupDuration = Duration.of(1, SECONDS)
val defaultRuleDuration = Duration.of(500, MILLIS)
val businessRuleDuration = Duration.of(500, MILLIS)
val messages = mutableListOf<Triple<ByteBuf, Map<String, String>, IChannel.SendMode>>()
val testContext = createTestContext(BrokenConnConfiguration(
SchedulerType.CONSECUTIVE,
listOf(
RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(100, MILLIS)),
RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.ZERO),
RuleConfiguration(
CREATE_OUTGOING_GAP,
duration = businessRuleDuration,
cleanUpDuration = businessRuleCleanupDuration,
cleanUpDuration = Duration.ZERO,
missOutgoingMessagesConfiguration = MissMessageConfiguration(3)
),
)
Expand All @@ -395,54 +398,134 @@ class TestStrategies {

val context = testContext.context
val channel = testContext.channel
try {
testContext.fixHandler.use { handler ->
handler.onStart()
context.verifyInvocationsAndClean {
verify(this).createChannel(any(), any(), any(), any(), any(), any(), anyVararg())
channel.verifyInvocationsAndClean {
verify(
this,
times(2).description("Check channel state in the onStart and sendLogon methods"),
).isOpen
verify(this, description("open channel with default strategy first")).open()
verifySend(mapOf(35 to "A", 34 to "1"), 100)
}
verifySendEvent("successful login", "Info", 100)
}

verifyChangeStrategy(channel, defaultRuleDuration, DEFAULT, CREATE_OUTGOING_GAP, 2)
context.verifyInvocationsAndClean {
val events = captureSendEvents(2, 100)
assertAll(
{ assertEquals("successful login", events[0].name) },
{ assertContains(events[1].name, "$CREATE_OUTGOING_GAP strategy started") },
)
testContext.fixHandler.use { handler ->
handler.onStart()
context.verifyInvocationsAndClean {
verify(this).createChannel(any(), any(), any(), any(), any(), any(), anyVararg())
channel.verifyInvocationsAndClean {
verify(
this,
times(2).description("Check channel state in the onStart and sendLogon methods"),
).isOpen
verify(this, description("open channel with default strategy first")).open()
verifySend(mapOf(35 to "A", 34 to "1"), 100)
}
verifySendEvent("successful login", "Info", 100)
}

handler.onOutgoing(channel, businessMessage(3).asExpandable(), mutableMapOf())
handler.onOutgoing(channel, businessMessage(4).asExpandable(), mutableMapOf())
handler.onOutgoing(channel, businessMessage(5).asExpandable(), mutableMapOf())
handler.onOutgoing(channel, businessMessage(6).asExpandable(), mutableMapOf())
verifyChangeStrategy(channel, defaultRuleDuration, DEFAULT, CREATE_OUTGOING_GAP, 2)
context.verifyInvocationsAndClean {
val events = captureSendEvents(2, 100)
assertAll(
{ assertEquals("successful login", events[0].name) },
{ assertContains(events[1].name, "$CREATE_OUTGOING_GAP strategy started") },
)
}

handler.onIncoming(channel, resendRequest(3, 3, 5), getMessageId())
channel.verifyInvocationsAndClean {
verify(this, timeout(100).description("check status during handleResendRequest method")).isOpen
val byteBufs = captureSend(3, 100)
assertAll(
{ byteBufs[0].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "3")) },
{ byteBufs[1].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "4")) },
{ byteBufs[2].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "5")) },
)
handler.onOutgoing(channel, businessMessage(3).asExpandable(), mutableMapOf())
handler.onOutgoing(channel, businessMessage(4).asExpandable(), mutableMapOf())
handler.onOutgoing(channel, businessMessage(5).asExpandable(), mutableMapOf())
handler.onOutgoing(channel, businessMessage(6).asExpandable(), mutableMapOf())

handler.onIncoming(channel, resendRequest(3, 3, 5), getMessageId())
channel.verifyInvocationsAndClean {
verify(this, timeout(100).description("check status during handleResendRequest method")).isOpen
val byteBufs = captureSend(3, 100)
assertAll(
{ byteBufs[0].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "3")) },
{ byteBufs[1].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "4")) },
{ byteBufs[2].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "5")) },
)
}

verifyChangeStrategy(channel, businessRuleDuration, CREATE_OUTGOING_GAP, DEFAULT, 7)
context.verifyInvocationsAndClean {
val events = captureSendEvents(2, 100)
assertAll(
{ assertEquals("successful login", events[0].name) },
{ assertContains(events[1].name, "$CREATE_OUTGOING_GAP strategy finished") },
)

}
}
}

@Test
fun `outgoing gap strategy - long recovery in case of mixing recovery message with non-recovery messages test`() {
val defaultRuleDuration = Duration.of(500, MILLIS)
val businessRuleDuration = Duration.of(500, MILLIS)
val messages = mutableListOf<Triple<ByteBuf, Map<String, String>, IChannel.SendMode>>()
val testContext = createTestContext(BrokenConnConfiguration(
SchedulerType.CONSECUTIVE,
listOf(
RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.ZERO),
RuleConfiguration(
CREATE_OUTGOING_GAP,
duration = businessRuleDuration,
cleanUpDuration = Duration.ZERO,
allowMessagesBeforeRetransmissionFinishes = true,
missOutgoingMessagesConfiguration = MissMessageConfiguration(3)
),
)
)) { msg, mode, mtd ->
messages.add(Triple(msg, mode, mtd))
}

val context = testContext.context
val channel = testContext.channel
val handler = testContext.fixHandler

var outgoingSequence = 0
var incomingSequence = 0
whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer {
val byteBuf = it.arguments[0] as ByteBuf
LOGGER.info { "channel.send ${byteBuf.toString(US_ASCII)}" }

val seq = requireNotNull(byteBuf.findField(34)?.value?.toInt())
val posDup = byteBuf.findField(43)?.value == "Y"

if (posDup) {
if (outgoingSequence + 1 == seq) {
outgoingSequence = seq
}
} else {
if (byteBuf.findField(35)?.value == "A") {
incomingSequence += 1
handler.onIncoming(channel, logonResponse(incomingSequence), getMessageId())
}
if (outgoingSequence + 1 == seq) {
outgoingSequence = seq
} else {
incomingSequence += 1
handler.onIncoming(channel, resendRequest(incomingSequence, outgoingSequence, 0), getMessageId())
}
}

verifyChangeStrategy(channel, businessRuleDuration, CREATE_OUTGOING_GAP, DEFAULT, 7)
context.verifyInvocationsAndClean {
verifySendEvent("successful login", "Info", 100)
CompletableFuture.completedFuture(getMessageId())
}

handler.use {
handler.onStart()
verify(context, timeout(defaultRuleDuration.toMillis() + 100)).send(argThat {
toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started")
}, anyOrNull())

val future = CompletableFuture.runAsync {
while (!Thread.currentThread().isInterrupted) {
val byteBuf = businessMessage(0).asExpandable()
handler.onOutgoing(channel, byteBuf, mutableMapOf())
if (!byteBuf.isEmpty()) {
channel.send(byteBuf, mutableMapOf())
}
}
}
} finally {
channel.close()

verify(context, timeout(businessRuleDuration.toMillis() * 10)).send(argThat {
toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished")
}, anyOrNull())

future.cancel(true)
}
}

Expand Down Expand Up @@ -870,7 +953,7 @@ class TestStrategies {
this.also {
val captor = argumentCaptor<Event> { }
verify(this, timeout(timeout)).send(captor.capture(), anyOrNull())
val event = captor.allValues.single().toProto("test-book", "test-scope")
val event = captor.allValues.single().toProto(TEST_BOOK, TEST_SCOPE)
assertAll(
{ name?.let { assertContains(event.name, name) } },
{ type?.let { assertContains(event.type, type) } },
Expand All @@ -883,7 +966,7 @@ class TestStrategies {
): List<ProtoEvent> {
val captor = argumentCaptor<Event> { }
verify(this, timeout(timeout).times(times)).send(captor.capture(), anyOrNull())
return captor.allValues.map { it.toProto("test-book", "test-scope") }
return captor.allValues.map { it.toProto(TEST_BOOK, TEST_SCOPE) }
}

private fun ByteBuf.assertContains(values: Map<Int, String>) {
Expand Down

0 comments on commit c86a82d

Please sign in to comment.